## APACHE KAFKA PROJECT 01 - CONSUMER
Creating the consumer to send data from Apache Kafka to a AWS S3 Bucket.
The data is a real time simulation with a stock market file (CSV).

#### Libraries

In [None]:
import pandas as pd
from kafka import KafkaConsumer
from time import sleep
from json import dumps, loads
import json
from dotenv import load_dotenv
import os
import boto3
import io

#### Loading the environment variables

In [None]:
load_dotenv()

bootstrap_servers = os.environ.get('BOOTSTRAP_SERVERS')
topic_name = os.environ.get('TOPIC_NAME')
aws_access_key = os.environ.get('AWS_ACCESS_KEY')
aws_secret_key = os.environ.get('AWS_SECRET_KEY')
s3_region = os.environ.get('S3_REGION')
bucket_name = os.environ.get('BUCKET_NAME')

#### Creating the consumer

In [None]:
consumer = KafkaConsumer(topic_name,
                         bootstrap_servers = bootstrap_servers,
                         value_deserializer = lambda x: loads(x.decode('utf-8')))

#### Creating the S3 object to upload the data

In [None]:
s3 = boto3.client(
    service_name = 's3',
    region_name = s3_region,
    aws_access_key_id = aws_access_key,
    aws_secret_access_key = aws_secret_key
)

#### Sending the data from Apache Kafka to AWS S3 Bucket

In [None]:
for count, i in enumerate(consumer):
    json_file = json.dumps(i.value, default=str)
    s3.put_object(Body=json_file, Bucket=bucket_name, Key='stock_market_{}.json'.format(count))