In [2]:
import boto3
import base64
import json
from kafka import KafkaConsumer
from io import BytesIO

In [1]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('../configurations/ec2_access.config'))

In [None]:
# AWS credentials
AWS_ACCESS_KEY_ID = config.get('AWS', 'ACCESS_KEY')
AWS_SECRET_ACCESS_KEY = config.get('AWS', 'SECRET_KEY')
AWS_REGION_NAME = config.get('AWS', 'REGION')

LABELED_BUCKET_NAME = config.get('AWS', 'LABELED_IMAGE_BUCKET_NAME')

In [None]:
# Kafka configuration
bootstrap_servers = 'localhost:9092'
consumer_topic = 'labeled_image_topic'

# AWS S3 configuration
s3_bucket_name = LABELED_BUCKET_NAME
s3 = boto3.client('s3',
                aws_access_key_id=AWS_ACCESS_KEY_ID,
                aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                region_name=AWS_REGION_NAME)

def upload_image_to_s3(image_data, label):
    # Convert base64 image data to binary
    image_binary = base64.b64decode(image_data)
    
    # Create a BytesIO stream from the binary data
    image_stream = BytesIO(image_binary)
    
    # Define the image filename with the label
    filename = f"{label}.png"
    
    # Upload image to S3
    s3.upload_fileobj(image_stream, s3_bucket_name, filename)
    print(f"Uploaded {filename} to {s3_bucket_name}")

def consume_and_upload():
    # Consume messages from the labeled_image_topic
    consumer = KafkaConsumer(
        consumer_topic,
        bootstrap_servers=bootstrap_servers,
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )
    
    for message in consumer:
        image_data = message.value['image']
        label = message.value['label']
        upload_image_to_s3(image_data, label)

if __name__ == '__main__':
    consume_and_upload()
