# Data Ingestion | Phase 1

We need to set up an Ingestion pipeline with streaming data coming in via **Amazon Kinesis**. 

We need to process the data in **Lambda function** and eventually store the JSON data in a **DynamoDB Table**.

We'll be creating the Amazon Kinesis Data Stream, Lambda and DynamoDB table from the **AWS Console** initially.

Note: At the end of this Notebook, we need **CloudFormation Template** that automates this.


### As a data engineer, I need to set up a Kinesis Stream (named FastKinesis) from the AWS Console

Use `boto3` to verify if the stream is actually created (TDD)

#### Important: use the minimum possible Shards (1)

In [1]:
# import boto3
import boto3
# boto3.client('kinesis')
datastream = boto3.client('kinesis',
                          aws_access_key_id=AWS_KEY_ID,
                          aws_secret_access_key=AWS_SECRET,
                          region_name=REGION)
#some_list = boto3.get_streams()
response = datastream.list_streams()

# assert 'FastKinesis' in some_list
print(response['StreamNames'])

['FastKinesis']


### As a data engineer, I need to create `Kinesis class` with relevant functions to interact with the stream

#### Important: Use Boto3


# Example:

In [17]:
class Kinesis:
    STREAM_NAME = 'stream'
    
    def __init__(self):
        pass
        
    def send_data(self, stream_name, data):
        print(stream_name, data)

In [22]:
import time

def iam_producer():
    """I'm a producer and I generate data every 500ms"""
    kinesis = Kinesis()
    for i in range(10):
        time.sleep(0.5)
        kinesis.send_data(STREAM_NAME, 'someJSONdata')

In [23]:
iam_producer()

stream someJSONdata
stream someJSONdata
stream someJSONdata
stream someJSONdata
stream someJSONdata
stream someJSONdata
stream someJSONdata
stream someJSONdata
stream someJSONdata
stream someJSONdata


## Test Driven Development

Send the data using the Class & Functions to your Stream.

Verify the data by getting the same data back using Functions in the Kinesis class.

Rely on boto3 for all interactions with AWS (but feel free to find better libraries for kinesis)

In [2]:
import json,uuid

class KinesisStream(object):
    
    def __init__(self, stream):
        self.stream = stream

    def _connected_client(self):
        """ Connect to Kinesis Streams """
        return boto3.client('kinesis',
                            region_name=REGION,
                            aws_access_key_id=AWS_KEY_ID,
                            aws_secret_access_key=AWS_SECRET)

    def send_stream(self, data,partition_key=None):
        
        if partition_key == None:
            partition_key = uuid.uuid4()

        client = self._connected_client()
        return client.put_record(
            StreamName=self.stream,
            Data=json.dumps(data),
            PartitionKey=str(partition_key)
        )

In [4]:
data = {'Name': 'Hamza'}
stream = KinesisStream('FastKinesis')
stream.send_stream(data=data)

{'ShardId': 'shardId-000000000000',
 'SequenceNumber': '49623169493734310862053045190958577595106604881782243330',
 'ResponseMetadata': {'RequestId': 'e62f11c6-f2f3-2d80-bfc5-e0ba309033b3',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'e62f11c6-f2f3-2d80-bfc5-e0ba309033b3',
   'x-amz-id-2': '6KcE/4OT6I/Rcxmpe091VkwxhWikcM9ObrakrYfGKdrps1aNAqQirH9ffdobb0RHA+B3fFzXSUjADA+dTq8K0MFhyNpWGCEd',
   'date': 'Thu, 21 Oct 2021 04:40:28 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '110'},
  'RetryAttempts': 0}}

### As a data engineer, I need to create a Lambda function from the console that consumes the Kinesis Stream. 

Create the Lambda function via AWS Console, and verify by boto3 by getting the list of functions.

**TDD: Your function name must be in the list returned by boto3**

Hint: Make sure the region is us-east-1 in your console and boto3 configurations

In [72]:
lambda_funcs=boto3.client('lambda',
                          aws_access_key_id=AWS_KEY_ID,
                          aws_secret_access_key=AWS_SECRET,
                          region_name=REGION)

response = lambda_funcs.list_functions()
print(response['Functions'][0]['FunctionName'])

kinesis_consumer




### As a data engineer, I need to create a DynamoDB table via AWS Console

the table "People" will have two columns 1) first_name 2) last_name

Important: use minimum possible RCU and WCUs

## Test Driven Development

use boto3 DynamoDB client to verify the table is created.

Only when the boto3 returns the table name this task is done.

In [8]:
dynamo_client=boto3.client('dynamodb',aws_access_key_id=AWS_KEY_ID,
                          aws_secret_access_key=AWS_SECRET,
                          region_name=REGION)
response=dynamo_client.list_tables()
print(response['TableNames'])

['People', 'student']


# Task # 6

### As a data engineer, I need to create a DynamoDB class with functions to handle basic CRUD operations

In [43]:
class DynamoDB_Helper:
    def __init__(self, table,region,AWS_KEY_ID,AWS_SECRET):
        self.table = table
        self.region = region
        self.AWS_KEY_ID = AWS_KEY_ID
        self.AWS_SECRET = AWS_SECRET
        
    def dynamoDB_table_connection(self):
        dynamodb = boto3.resource('dynamodb',aws_access_key_id=self.AWS_KEY_ID,
                                             aws_secret_access_key=self.AWS_SECRET,
                                             region_name=self.region)
        return dynamodb.Table(self.table)
    
    def dynamoDB_put(self,data):
        
        table=self.dynamoDB_table_connection()
        
        table.put_item(Item=data)
        
    def dynamoDB_get(self,key):
        
        table=self.dynamoDB_table_connection()
        
        response = table.get_item(
        Key=key
        )
        
        item = response['Item']
        
        return item
    
    def dynamoDB_update(self,pk,update_key,val):
        
        table=self.dynamoDB_table_connection()
        
        table.update_item(
        Key=pk,
        UpdateExpression='SET {} = :val1'.format(update_key),
        ExpressionAttributeValues={
        ':val1': val
        }
        )
        
    def dynamoDB_delete(self,delete_key):
        
        table=self.dynamoDB_table_connection()
        
        table.delete_item(
        Key=delete_key
        )
        

In [46]:
data={'first_name': 'Lionel','last_name': 'Messi',}
search_key={'first_name': 'Jane',}
dynamoDB = DynamoDB_Helper('People','us-east-1',AWS_KEY_ID,AWS_SECRET)
dynamoDB.dynamoDB_put(data=data)
a=dynamoDB.dynamoDB_get(key=search_key)
print(a)
dynamoDB.dynamoDB_update(search_key,'last_name','new')
dynamoDB.dynamoDB_delete(search_key)