In [37]:
'''

@Author: Vighnesh Harish Bilgi
@Date: 2022-11-20
@Last Modified by: Vighnesh Harish Bilgi
@Last Modified time: 2022-11-20
@Title : To print twitter records as dictionaries

'''

'\n\n@Author: Vighnesh Harish Bilgi\n@Date: 2022-11-20\n@Last Modified by: Vighnesh Harish Bilgi\n@Last Modified time: 2022-11-20\n@Title : To print twitter records as dictionaries\n\n'

In [38]:
import boto3
import json
import os
import pandas as pd

In [39]:
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'
os.environ['AWS_ACCESS_KEY_ID'] = os.environ.get('test1_access_key')
os.environ['AWS_SECRET_ACCESS_KEY'] = os.environ.get('test1_secret_access_key')
KINESIS_DATA_STREAM = 'kds-twitter-sda'

### Fucntions to connect Kinesis Stream , get shards and get tweets from stream as pandas dataframe.

In [40]:
# df = pd.DataFrame()
list_of_records = []

def look_end_of_stream(kinesis_client,my_shard_id):
    """

    Description:
        A shard iterator specifies the shard position from which to start reading data records sequentially. 
        The position is specified using the sequence number of a data record in a shard. A sequence number is the identifier associated with every record ingested in the stream, 
        and is assigned when a record is put into the stream. 
        We use ShardIteratorType as LATEST to look
        at the end of the stream for new incoming data.
    Parameter:
        Kinesis kinesis_client,
        string shard_id
    Return:
        string my_shard_iterator

    """

    
    shard_iterator = kinesis_client.get_shard_iterator(StreamName=KINESIS_DATA_STREAM,
                                                    ShardId=my_shard_id,
                                                    ShardIteratorType='LATEST')
    my_shard_iterator = shard_iterator['ShardIterator']
    return my_shard_iterator                                                  

def connect_to_stream(kinesis_client):
    """

    Description:
        To Describe the specified Kinesis data stream. 
        The information returned includes the stream name, Amazon Resource Name (ARN), creation time, enhanced metric configuration, and shard map.
    Parameter:
        Kinesis kinesis_client
    Return:
        string shard_id

    """

    response = kinesis_client.describe_stream(StreamName=KINESIS_DATA_STREAM)
    my_shard_id = response['StreamDescription']['Shards'][0]['ShardId']

    return my_shard_id

def get_twitter_records(kinesis_client,my_shard_iterator):
    """

    Description:
       Gets data records from a Kinesis data stream's shard.
        By speciying shard iterator using the ShardIterator parameter, 
        the shard iterator specifies the position in the shard from which you want to start reading data records sequentially.

        After getting records from the shard we convert it into a list of dictionaries
    Parameter:
        Kinesis kinesis_client,
        string shard_iterator
    Return:
        list records

    """
    # count = 0
    record_response = kinesis_client.get_records(
        ShardIterator=my_shard_iterator,
        Limit=100
    )
    # list_of_records = []
    while 'NextShardIterator' in record_response:
        # read up to 100 records at a time from the shard number
        record_response = kinesis_client.get_records(
        ShardIterator=record_response['NextShardIterator'],
        Limit=100
        )
        # Print only if we have something
        if(record_response['Records']):
            print (record_response)

            list_of_records.append(record_response['Records'][0])

def records_to_dict(list_of_records):
    """

    Description:
        Extracting values of nested dictionary from 'Data' key from list_of_records
    Parameter:
        list_of_records
    Return:
        list records

    """

    records = []

    for d in list_of_records:
        new_rec = {}
        for k in d:
            if k == 'Data':
                data_dict = json.loads(d[k].decode('utf-8'))
                for tweet_info in data_dict:
                    new_rec[tweet_info] = data_dict[tweet_info]
            else:    
                new_rec[k] = d[k]

        records.append(new_rec)

    return records    

def tweets_as_df(records):
    """

    Description:
        Convert list of dictionaries 'records' into a pandas dataframe
    Parameter:
        list records
    Return:
        Dataframe df

    """
    df = pd.DataFrame()
    for r in records:
        # print(record)
        df = df.append(r, ignore_index=True)

    return df    

def word_occurences(text):
    """

    Description:
        to count occurence of each word in a string and return result as a dictionary.
    Parameter:
        string text
    Return:
        dictionary counts

    """
    counts = dict()
    words = text.split()

    for word in words:
        if word in counts:
            counts[word] += 1
        else:
            counts[word] = 1

    return counts

def count_words_of_tweet(df):
    
    """

    Description:
        To add two columns : 'word_count_on_text' which will have count of number of words in a tweet 
        and 'word_occurences_of_text' which will have dictionary of word occurences in the tweet.
    Parameter:
        Dataframe df
    Return:
        Dataframe df

    """
    list_of_word_count = []
    for index,row in df.iterrows():
        list_of_word_count.append(len(row['tweet_text'].strip().split()))

    df['word_count_on_text'] = list_of_word_count

    # list_tweet_text = list(df['tweet_text'])
    word_occurences_on_text = []
    for index,row in df.iterrows():
        word_occurences_on_text.append(word_occurences(row['tweet_text']))
    # for i in range(len(list_tweet_text)):
    #     word_occurences_on_text.append(word_occurences(list_tweet_text[i]))

    df['word_occurences_on_text'] = word_occurences_on_text

    return df



### Functions to add pandas dataframe to DynamoDB Table

In [41]:
def connect_to_dynamoDB():
    """

    Description:
        To connect to AWS DynamoDB service.
    Parameter:
        No parameters
    Return:
        ServiceResource dyDB

    """
    dyDB =  boto3.resource('dynamodb')
    return dyDB

def create_items(table,twitter_records):
    """

    Description:
        To create items in a table of DynamoDB.
    Parameter:
        dynamodb.table table
    Return:
        No values returned.

    """
    table.put_item(
        Item={
                'tweet_id': twitter_records[0],
                'ArrivalTimestamp': twitter_records[1],
                'tweet_text': twitter_records[2],
                'user_id': twitter_records[3],
                'display_name': twitter_records[4],
                'user_name': twitter_records[5],
                'word_count_on_text': twitter_records[6],
                'word_occurences_on_text': twitter_records[6]
            }
    )

def create_dynamoDB_Table(dyDB):
    """

    Description:
        To create a dynamoDB table if it doen't exist
    Parameter:
        ServiceResource dyDB
    Return:
        no value returned

    """
    
    dynamodb_client = boto3.client('dynamodb')
    table_name = 'Twitter_Data'
    existing_tables = dynamodb_client.list_tables()['TableNames']

    if table_name not in existing_tables:
        # Create the DynamoDB table.
        dyDB.create_table(
            TableName='Twitter_Data',
            KeySchema=[
                {
                    'AttributeName': 'tweet_id',
                    'KeyType': 'HASH'
                },
                {
                    'AttributeName': 'ArrivalTimestamp',
                    'KeyType': 'RANGE'
                }
            ],
            AttributeDefinitions=[
                {
                    'AttributeName': 'tweet_id',
                    'AttributeType': 'S'
                },
                {
                    'AttributeName': 'ArrivalTimestamp',
                    'AttributeType': 'S'
                },
            ],
            ProvisionedThroughput={
                'ReadCapacityUnits': 5,
                'WriteCapacityUnits': 5
            }
        )

### Caling Kinesis Custom Functions

In [42]:
session = boto3.Session()
kinesis_client = session.client('kinesis')

my_shard_id = connect_to_stream(kinesis_client)

my_shard_iterator = look_end_of_stream(kinesis_client,my_shard_id)

list_of_records = get_twitter_records(kinesis_client,my_shard_iterator)
# print(list_of_records)


{'Records': [{'SequenceNumber': '49635381542013579737529347482481836813256816175007924226', 'ApproximateArrivalTimestamp': datetime.datetime(2022, 11, 21, 19, 17, 6, 804000, tzinfo=tzlocal()), 'Data': b'{"tweet_id": "1594688831961370625", "tweet_text": "#AI\\n\\ud83c\\udfafNovember 19, 2022, #DW News, Why #China\'s plan to save its #economy won\'t work | DW #Business Special\\nhttps://t.co/F3PIYVGC1l\\nhttps://t.co/Wrp9ZiJrHt\\nhttps://t.co/Hi3WlGyw6X\\nhttps://t.co/8MBYgM18mQ\\nhttps://t.co/uAfXOmMCLS\\nhttps://t.co/GbySXxJcKu\\nhttps://t.co/CabC3oxUPQ", "user_id": "815265799002869762", "user_name": "Fernando Machuca PhD", "user_username": "gfernandoamb"}', 'PartitionKey': 'fceb0110-e4d3-4d2d-8717-1ddb999fda7a'}], 'NextShardIterator': 'AAAAAAAAAAEyh4suULQDwELkpMOxKOdCYbo25yNDwE1S13aPz9YUUfQkBPPGOau4dhWuKlqL5SEWo5ubUzFEy1oNTfAy0tlVG4B5sKjO87NLc+V94dwpNrDW04Fxg7kG7LEeIam616M+BgspOe6C5diLRmSqq8DXTH8qyvWz+S6mQ4tsrMMN4BiK1UpQtYJJGQcB1q4LQ9T0CBbG5FPGYYYBEeRj9eKiBU+vGbCIlxuYMS5m14pRmQ==', 'M

KeyboardInterrupt: 

### Calling DynamoDB Custom Functions

In [43]:
records = records_to_dict(list_of_records)

df = tweets_as_df(records)

df = count_words_of_tweet(df)

dyDB = connect_to_dynamoDB()

create_dynamoDB_Table(dyDB)
table = dyDB.Table('Twitter_Data')
table.wait_until_exists()

# Print out some data about the table.
print(f"DateTime creation of Table : {table.creation_date_time}")
print(f"No. of items in the table :{table.item_count}")

# table = dyDB.Table('EmployeeBOTO3')

for index,row in df.iterrows():
    record_as_tuple = row['tweet_id'],str(row['ApproximateArrivalTimestamp']),row['tweet_text'],row['user_id'],row['user_name'],row['user_username'],row['word_count_on_text'],row['word_occurences_on_text']
    record_as_list = list(record_as_tuple)
    create_items(table,record_as_list)


  df = df.append(r, ignore_index=True)
  df = df.append(r, ignore_index=True)
  df = df.append(r, ignore_index=True)
  df = df.append(r, ignore_index=True)
  df = df.append(r, ignore_index=True)
  df = df.append(r, ignore_index=True)
  df = df.append(r, ignore_index=True)
  df = df.append(r, ignore_index=True)
  df = df.append(r, ignore_index=True)
  df = df.append(r, ignore_index=True)
  df = df.append(r, ignore_index=True)


DateTime creation of Table : 2022-11-21 18:49:15.030000+05:30
No. of items in the table :0
