# Umsetzung der Batch-Schicht (Lambda-Architektur) mit S3, Redshift and Apache Kafka

### Ziele:
- Tweets-Datenstrom vom KafkaProducer in S3 übertragen
- Batch-sicht berechnen (jede 24 Stunden

### Nötige Bibliotheken

In [2]:
from kafka import KafkaConsumer
from io import StringIO
import boto3
import time
import random

<a id="1"></a>
### KafkaConsumer-Konfiguration
- Einen KafkaConsumer erzeugen, der Tweets vom KafkaProducer aufnimmt
- Zu einem Kafka-Stream subscriben

In [98]:
consumer = KafkaConsumer(
                        bootstrap_servers='localhost:9092',
                        auto_offset_reset='latest',  # Reset partition offsets upon OffsetOutOfRangeError
                        group_id='test',   # must have a unique consumer group id 
                        consumer_timeout_ms=1000)  
                                # How long to listen for messages - we do it for 10 seconds 
                                # because we poll the kafka broker only each couple of hours

consumer.subscribe('tweets-lambda1')

<a id="2"></a>
### Defining a Amazon Web Services S3 storage client
- Authorisierungsinformation

In [99]:
s3_resource = boto3.resource(
    's3',
    aws_access_key_id='AKIAINL6QJLHMBXXFG7A',
    aws_secret_access_key='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'
)

s3_client = s3_resource.meta.client
bucket_name = 'tweets-lambda-architecture'


<a id="3"></a>
### Rohdaten in S3 bucket speichern
- Rohdaten jede Stunde in S3 speichern



In [6]:
def store_twitter_data(path):
    csv_buffer = StringIO() # Buffer als String in S3 speichern

    for message in consumer: # iteriert über die letzte Daten, die wir noch nicht gesehen haben"
        csv_buffer.write(message.value.decode() + '\n') 
    s3_resource.Object(bucket_name,path).put(Body=csv_buffer.getvalue())

<a id="4"></a>
### Daten in Redshift exportieren und Batch-Sicht berechnen
- mit Psycog in Redshift authentifizieren
- letzte Daten aus S3 in Redshift für die Sichtberechnung exportieren

In [3]:
import psycopg2
config = { 'dbname': 'lambda', 
           'user':'nelson',
           'pwd':'LambdaApp1',
           'host':'batch-view.ctgixhn76zcs.eu-central-1.redshift.amazonaws.com',
           'port':'5439'
         }
conn =  psycopg2.connect(dbname=config['dbname'], host=config['host'], 
                              port=config['port'], user=config['user'], 
                              password=config['pwd'])

In [4]:
def copy_files(conn, path):
    curs = conn.cursor()
    curs.execute("""
        copy 
            batch_raw(id,created_at,location)
        from 
            's3://tweets-lambda-architecture/""" + path + """'  
            access_key_id 'AKIAINL6QJLHMBXXFG7A'
            secret_access_key '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'
            delimiter ';'
            region 'eu-central-1'
    """)
    curs.close()
    conn.commit()


### Batch-Sicht berechnen
- Rohdaten abgfragen und aggregieren

In [5]:
def compute_batch_layer(conn):
    curs = conn.cursor()
    curs.execute(""" 
        drop table if exists batch_layer;

        with raw_dedup as (
        SELECT
            distinct id,created_at,location
        FROM
            batch_raw
        ),
        batch_result as (
            SELECT
                location,
                count(id) as count_id
            FROM
                raw_dedup
            group by 
                location
        )
        select 
            *
        INTO
            batch_layer
        FROM
            batch_result""")
    curs.close()
    conn.commit()

<a id="5"></a>
### Deployment 
- perform the task every couple of hours and wait in between

In [42]:
def periodic_work(interval):
    while True:
        path = 'tweets/'+ time.strftime("%Y/%m/%d/%H") + '_tweets_' + str(random.randint(1,1000)) + '.log'
        store_twitter_data(path)
        copy_files(conn, path)
        #interval should be an integer, the number of seconds to wait
        time.sleep(interval)

In [30]:
periodic_work(60 * 60) ## 60 minutes !

In [106]:
# Ende des Tages ausführen
compute_batch_layer(conn)

