# Implementing the batch layer of lambda architecture using S3, Redshift and Apache Kafka

### Purpose:
- store all the tweets that were produced by Kafka Producer into S3
- export them into Redshift
- perform aggregation on the tweets to get the desired output of batch layer
- achieve this by: 
    - every couple of hours get the latest unseen tweets produced by the Kafka Producer and store them into a S3 archive
    - every night run a sql query to compute the result of batch layer

### Contents: 
- [Defining the Kafka consumer](#1)
- [Defining a Amazon Web Services S3 storage client](#2)
- [Writing the data into a S3 bucket](#3)
- [Exporting data from S3 bucket to Amazon Redshift using COPY command](#4)
- [Aggregating "raw" tweets in Redshift](#5)
- [Deployment](#6)

### Required libraries

In [1]:
from kafka import KafkaConsumer
from io import StringIO
import boto3
import time
import random

<a id="1"></a>
### Defining the Kafka consumer
- setting the location of Kafka Broker
- specifying the group_id and consumer_timeout
- subsribing to a topic

In [2]:
consumer = KafkaConsumer(
                        bootstrap_servers='localhost:9092',
                        auto_offset_reset='latest',  # Reset partition offsets upon OffsetOutOfRangeError
                        group_id='test',   # must have a unique consumer group id 
                        consumer_timeout_ms=1000)  
                                # How long to listen for messages - we do it for 10 seconds 
                                # because we poll the kafka broker only each couple of hours

consumer.subscribe('tweets-lambda1')

<a id="2"></a>
### Defining a Amazon Web Services S3 storage client
- setting the autohrizaition and bucket

In [3]:
s3_resource = boto3.resource(
    's3',
    aws_access_key_id='AKIAIXUPHT6ERRMQYINQ',
    aws_secret_access_key='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',
)

s3_client = s3_resource.meta.client
bucket_name = 'lambda-architecture123'


<a id="3"></a>
### Writing the data into a S3 bucket
- polling the Kafka Broker
- aggregating the latest messages into a single object in the bucket



In [4]:
def store_twitter_data(path):
    csv_buffer = StringIO() # S3 storage is object storage -> our document is just a large string

    for message in consumer: # this acts as "get me an iterator over the latest messages I haven't seen"
        csv_buffer.write(message.value.decode() + '\n') 
#        print(message)
    s3_resource.Object(bucket_name,path).put(Body=csv_buffer.getvalue())

<a id="4"></a>
### Exporting data from S3 bucket to Amazon Redshift using COPY command
- authenticate and create a connection using psycopg module
- export data using COPY command from S3 to Redshift "raw" table

In [19]:
import psycopg2
config = { 'dbname': 'lambda', 
           'user':'dorian',
           'pwd':'Demo1234',
           'host':'data-warehouse.c3glymsgdgty.us-east-1.redshift.amazonaws.com',
           'port':'5439'
         }
conn =  psycopg2.connect(dbname=config['dbname'], host=config['host'], 
                              port=config['port'], user=config['user'], 
                              password=config['pwd'])

In [6]:
def copy_files(conn, path):
    curs = conn.cursor()
    curs.execute(""" 
        copy 
            batch_raw
        from 
            's3://lambda-architecture123/""" + path + """'  
            access_key_id 'AKIAIXUPHT6ERRMQYINQ'
            secret_access_key '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'
            delimiter ';'
            region 'eu-central-1'
    """)
    curs.close()
    conn.commit()


### Computing the batch layer output
- querying the raw tweets stored in redshift to get the desired batch layer output

In [7]:
def compute_batch_layer(conn):
    curs = conn.cursor()
    curs.execute(""" 
        drop table if exists batch_layer;

        with raw_dedup as (
        SELECT
            distinct id,created_at,followers_count,location,favorite_count,retweet_count
        FROM
            batch_raw
        ),
        batch_result as (
            SELECT
                location,
                count(id) as count_id,
                sum(followers_count) as sum_followers_count,
                sum(favorite_count) as sum_favorite_count,
                sum(retweet_count) as sum_retweet_count
            FROM
                raw_dedup
            group by 
                location
        )
        select 
            *
        INTO
            batch_layer
        FROM
            batch_result""")
    curs.close()
    conn.commit()

In [8]:
# compute_batch_layer(conn)

<a id="5"></a>
### Deployment 
- perform the task every couple of hours and wait in between

In [9]:
def periodic_work(interval):
    while True:
        path = 'tweets/'+ time.strftime("%Y/%m/%d/%H") + '_tweets_' + str(random.randint(1,1000)) + '.log'
        store_twitter_data(path)
        copy_files(conn, path)
        #interval should be an integer, the number of seconds to wait
        time.sleep(interval)

In [10]:
# periodic_work(60 * 60) ## 60 minutes !

In [22]:
path = 'tweets/'+ time.strftime("%Y/%m/%d/%H") + '_tweets_' + str(random.randint(1,1000)) + '.log'

store_twitter_data(path)

In [23]:
copy_files(conn, path)

ERROR:root:An unexpected error occurred while tokenizing input
The following traceback may be corrupted or invalid
The error message is: ('EOF in multi-line string', (1, 4))



DatabaseError: SSL SYSCALL error: Operation timed out


In [21]:
# run at the end of the day
compute_batch_layer(conn)

In [14]:
#conn.close()