In [14]:
'''

@Author: Vighnesh Harish Bilgi
@Date: 2022-11-25
@Last Modified by: Vighnesh Harish Bilgi
@Last Modified time: 2022-11-25
@Title : Ideation Project - 4. Fetching tables from AWS Athena and sending to Kinesis Data Stream

'''

'\n\n@Author: Vighnesh Harish Bilgi\n@Date: 2022-11-25\n@Last Modified by: Vighnesh Harish Bilgi\n@Last Modified time: 2022-11-25\n@Title : Ideation Project - 4. Fetching tables from AWS Athena and sending to Kinesis Data Stream\n\n'

In [15]:
import boto3
import time
import json
import pandas as pd

In [16]:
import os
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'
os.environ['AWS_ACCESS_KEY_ID'] = os.environ.get('test1_access_key')
os.environ['AWS_SECRET_ACCESS_KEY'] = os.environ.get('test1_secret_access_key')

In [17]:
DATABASE_NAME = 'dynamodb_tables'
RESULT_OUTPUT_LOCATION = 's3://athena-dynamo-output/'
KINESIS_DATA_STREAM = 'my-input-stream'

### 1. Custom functions to fetch tables from Athena and convert into Pandas Dataframe.

In [18]:

def connect_to_athena():
    """

    Description:
        To connect to AWS Athena service.
    Parameter:
        No parameters
    Return:
        ServiceResource athena

    """
    athena =  boto3.client('athena')
    return athena


def has_query_succeeded(athena,execution_id):
    """

    Description:
        To verify if the query has executed
    Parameter:
        ServiceResource athena,
        string execution_id
    Return:
        bool     

    """
    state = "RUNNING"
    max_execution = 5

    while max_execution > 0 and state in ["RUNNING", "QUEUED"]:
        max_execution -= 1
        response = athena.get_query_execution(QueryExecutionId=execution_id)
        if (
            "QueryExecution" in response
            and "Status" in response["QueryExecution"]
            and "State" in response["QueryExecution"]["Status"]
        ):
            state = response["QueryExecution"]["Status"]["State"]
            if state == "SUCCEEDED":
                return True

        time.sleep(30)


def get_all_rows(athena,query):
    """

    Description:
        To execute a query on a Athena table.
    Parameter:
        ServiceResource athena,
        string query
    Return:
        dictionary results    

    """

    # query = f"SELECT * from {DATABASE_NAME}.{table_name} order by {order_key}"
    response = athena.start_query_execution(
        QueryString=query,
        ResultConfiguration={"OutputLocation": RESULT_OUTPUT_LOCATION}
    )

    return response["QueryExecutionId"]

def get_query_results(athena,execution_id):
    response = athena.get_query_results(
        QueryExecutionId=execution_id
    )

    results = response['ResultSet']['Rows']
    return results

def query_to_df(query_result):
    """

    Description:
        To convert the query result into a Pandas Dataframe
    Parameter:
        dicitonary query_result
    Return:
        Dataframe df

    """

    header = []
    header_result = query_result[0]
    for key in header_result:
        for record in header_result[key]:
            for data_type in record:
                header.append(record[data_type]) 

    record_result = query_result[1:]
    data = []
    for result in record_result:
        row = []
        for key in result:
            for record in result[key]:
                for data_type in record:
                    row.append(record[data_type])
        data.append(row)             

    # Create the pandas DataFrame
    df = pd.DataFrame(data, columns=header)
    df = df.dropna()
    # print 1st 5 records of dataframe.
    print(df.head(5))

    return df

#### 1a. Fetch 'parquet_bucket' table from Athena and convert into Pandas Dataframe and then into a list of dictionaries.

In [19]:
athena = connect_to_athena()

# 1. Query Athena table 'parquet_bukcet'
table_name = 'parquet_bukcet'
query = f"SELECT id,userId,title,title_word_count,body,body_word_count from {DATABASE_NAME}.{table_name} order by id"
execution_id = get_all_rows(athena,query)
print(f"Get all Rows execution id: {execution_id}")

query_status = has_query_succeeded(athena,execution_id=execution_id)
print(f"Query state: {query_status}")

# 2. Query Results
query_result = get_query_results(athena,execution_id=execution_id)

# 3. Print query result as a pandas dataframe
print("Reading all records of the table:")
dataset_df = query_to_df(query_result)

# 4. convert specific columns into int
convert_dict = {'id': int, 'userId': int, 'title_word_count': int, 'body_word_count': int}

dataset_df = dataset_df.astype(convert_dict)
print('Verifying if data types are changed')
print(dataset_df.dtypes)

# 5. Converting pandas dataframe to list of dictionaries 
dataset_list_of_dict = dataset_df.to_dict('records')

# 6. Print 1st 5 elements of dataset_list_of_dict
print("Print 1st 5 elements of dataset_list_of_dict: ")
count = 0
for i in dataset_list_of_dict:
    if count < 5:
        print(i)
    count = count + 1    

Get all Rows execution id: 59b8bcc7-4611-475a-8a93-85c0484806dc
Query state: True
Reading all records of the table:
  id userId                                              title  \
0  1      1  sunt aut facere repellat provident occaecati e...   
1  2      1                                       qui est esse   
2  3      1  ea molestias quasi exercitationem repellat qui...   
3  4      1                               eum et est occaecati   
4  5      1                                 nesciunt quas odio   

  title_word_count                                               body  \
0                9  quia et suscipit suscipit recusandae consequun...   
1                3  est rerum tempore vitae sequi sint nihil repre...   
2                9  et iusto sed quo iure voluptatem occaecati omn...   
3                4  ullam et saepe reiciendis voluptatem adipisci ...   
4                3  repudiandae veniam quaerat sunt sed alias aut ...   

  body_word_count  
0              23  
1       

#### 1b. Fetch 'title_parquet' table from Athena and convert into Pandas Dataframe and then into a list of dictionaries.

In [51]:
# 1. Query Athena table 'title_parquet'
table_name = 'title_parquet'
query = f"SELECT title_word,count from {DATABASE_NAME}.{table_name} order by count,title_word"
execution_id = get_all_rows(athena,query)
print(f"Get all Rows execution id: {execution_id}")

query_status = has_query_succeeded(athena,execution_id=execution_id)
print(f"Query state: {query_status}")

# 2. Query Results
query_result = get_query_results(athena,execution_id=execution_id)

# 3. Print query result as a pandas dataframe
print("Reading all records of the table:")
title_count_df = query_to_df(query_result)

# 4. convert specific columns into int and renaming count column to frequency
convert_dict = {'count': int}

title_count_df = title_count_df.astype(convert_dict)
print('Verifying if data types are changed')
print(title_count_df.dtypes)

title_count_df.rename(columns = {'count':'frequency'}, inplace = True)

# 5. Converting pandas dataframe to list of dictionaries 
title_count_list_of_dict = title_count_df.to_dict('records')

# 6. Print 1st 5 elements of title_count_list_of_dict
print("Print 1st 5 elements of title_count_list_of_dict: ")
count = 0
for i in title_count_list_of_dict:
    if count < 5:
        print(i)
    count = count + 1    

Get all Rows execution id: 09373506-8349-4a6b-a159-b761add4fdb5
Query state: True
Reading all records of the table:
    title_word count
0    accusamus     1
1      aliquam     1
2   architecto     1
3    assumenda     1
4  consectetur     1
Verifying if data types are changed
title_word    object
count          int32
dtype: object
Print 1st 5 elements of title_count_list_of_dict: 
{'title_word': 'accusamus', 'frequency': 1}
{'title_word': 'aliquam', 'frequency': 1}
{'title_word': 'architecto', 'frequency': 1}
{'title_word': 'assumenda', 'frequency': 1}
{'title_word': 'consectetur', 'frequency': 1}


#### 1c. Fetch 'body_parquet' table from Athena and convert into Pandas Dataframe and then into a list of dictionaries.

In [50]:
# 1. Query Athena table 'body_parquet'
table_name = 'body_parquet'
query = f"SELECT body_word,count from {DATABASE_NAME}.{table_name} order by count,body_word"
execution_id = get_all_rows(athena,query)
print(f"Get all Rows execution id: {execution_id}")

query_status = has_query_succeeded(athena,execution_id=execution_id)
print(f"Query state: {query_status}")

# 2. Query Results
query_result = get_query_results(athena,execution_id=execution_id)

# 3. Print query result as a pandas dataframe
print("Reading all records of the table:")
body_count_df = query_to_df(query_result)

# 4. convert specific columns into int and rename count column to frequency
convert_dict = {'count': int}

body_count_df = body_count_df.astype(convert_dict)
print('Verifying if data types are changed')
print(body_count_df.dtypes)

body_count_df.rename(columns = {'count':'frequency'}, inplace = True)

# 5. Converting pandas dataframe to list of dictionaries 
body_count_list_of_dict = body_count_df.to_dict('records')

# 6. Print 1st 5 elements of body_count_list_of_dict
print("Print 1st 5 elements of body_count_list_of_dict: ")
count = 0
for i in body_count_list_of_dict:
    if count < 5:
        print(i)
    count = count + 1  

Get all Rows execution id: b172eb51-e979-4faf-90cf-5b07199e6819
Query state: True
Reading all records of the table:
  body_word count
0      unde     3
1    fugiat     4
2      ipsa     4
3    labore     4
4     alias     5
Verifying if data types are changed
body_word    object
count         int32
dtype: object
Print 1st 5 elements of body_count_list_of_dict: 
{'body_word': 'unde', 'frequency': 3}
{'body_word': 'fugiat', 'frequency': 4}
{'body_word': 'ipsa', 'frequency': 4}
{'body_word': 'labore', 'frequency': 4}
{'body_word': 'alias', 'frequency': 5}


### 2. Transfering Athena records of Athena tables as list of dictionaries to Kineses Data Analytics

#### 2a. Transfering dataset_list_of_dict to KDA

In [80]:
kinesis_client = boto3.client('kinesis')
counter = 0

for r in dataset_list_of_dict:
    # Send message to Kinesis DataStream

    # sleep() is included here to send as much/ all records/dictionaries to 'dataset_output' table
    time.sleep(3.25)
    response = kinesis_client.put_record(
        StreamName = KINESIS_DATA_STREAM,
        Data = json.dumps(r),
        PartitionKey = str(hash(r['id']))
    )
    counter = counter + 1
    print('Message sent #' + str(counter))

    
    # If the message was not sucessfully sent print an error message
    if response['ResponseMetadata']['HTTPStatusCode'] != 200:
        print('Error!')
        print(response)

Message sent #1
Message sent #2
Message sent #3
Message sent #4
Message sent #5
Message sent #6
Message sent #7
Message sent #8
Message sent #9
Message sent #10
Message sent #11
Message sent #12
Message sent #13
Message sent #14
Message sent #15
Message sent #16
Message sent #17
Message sent #18
Message sent #19
Message sent #20
Message sent #21
Message sent #22
Message sent #23
Message sent #24
Message sent #25
Message sent #26
Message sent #27
Message sent #28
Message sent #29
Message sent #30
Message sent #31
Message sent #32
Message sent #33
Message sent #34
Message sent #35
Message sent #36
Message sent #37
Message sent #38
Message sent #39
Message sent #40
Message sent #41
Message sent #42
Message sent #43
Message sent #44
Message sent #45
Message sent #46
Message sent #47
Message sent #48
Message sent #49
Message sent #50
Message sent #51
Message sent #52
Message sent #53
Message sent #54
Message sent #55
Message sent #56
Message sent #57
Message sent #58
Message sent #59
Messag

#### 2b. Transfering title_count_list_of_dict to KDA

In [81]:
kinesis_client = boto3.client('kinesis')
counter = 0

for r in title_count_list_of_dict:
    # Send message to Kinesis DataStream

    # sleep() is included here to send as much/ all records/dictionaries to 'title_count' table
    time.sleep(2.75)
    response = kinesis_client.put_record(
        StreamName = KINESIS_DATA_STREAM,
        Data = json.dumps(r),
        PartitionKey = str(hash(r['title_word']))
    )
    counter = counter + 1
    print('Message sent #' + str(counter))

    
    # If the message was not sucessfully sent print an error message
    if response['ResponseMetadata']['HTTPStatusCode'] != 200:
        print('Error!')
        print(response)

Message sent #1
Message sent #2
Message sent #3
Message sent #4
Message sent #5
Message sent #6
Message sent #7
Message sent #8
Message sent #9
Message sent #10
Message sent #11
Message sent #12
Message sent #13
Message sent #14
Message sent #15
Message sent #16
Message sent #17
Message sent #18
Message sent #19
Message sent #20
Message sent #21
Message sent #22
Message sent #23
Message sent #24
Message sent #25
Message sent #26
Message sent #27
Message sent #28
Message sent #29
Message sent #30
Message sent #31
Message sent #32
Message sent #33
Message sent #34
Message sent #35
Message sent #36
Message sent #37
Message sent #38
Message sent #39
Message sent #40
Message sent #41
Message sent #42
Message sent #43
Message sent #44
Message sent #45
Message sent #46
Message sent #47
Message sent #48
Message sent #49
Message sent #50
Message sent #51
Message sent #52
Message sent #53
Message sent #54
Message sent #55
Message sent #56
Message sent #57
Message sent #58
Message sent #59
Messag

In [82]:
kinesis_client = boto3.client('kinesis')
counter = 0

for r in body_count_list_of_dict:
    # Send message to Kinesis DataStream

    # sleep() is included here to send as much/ all records/dictionaries to 'body_count' table
    time.sleep(7)
    response = kinesis_client.put_record(
        StreamName = KINESIS_DATA_STREAM,
        Data = json.dumps(r),
        PartitionKey = str(hash(r['body_word']))
    )
    counter = counter + 1
    print('Message sent #' + str(counter))

    
    # If the message was not sucessfully sent print an error message
    if response['ResponseMetadata']['HTTPStatusCode'] != 200:
        print('Error!')
        print(response)

Message sent #1
Message sent #2
Message sent #3
Message sent #4
Message sent #5
Message sent #6
Message sent #7
Message sent #8
Message sent #9
Message sent #10
Message sent #11
Message sent #12
Message sent #13
Message sent #14
Message sent #15
Message sent #16
Message sent #17
Message sent #18
Message sent #19
Message sent #20
Message sent #21
Message sent #22
Message sent #23
Message sent #24
Message sent #25
Message sent #26
Message sent #27
Message sent #28
Message sent #29
Message sent #30
Message sent #31
Message sent #32
Message sent #33
Message sent #34
Message sent #35
Message sent #36
Message sent #37
Message sent #38
Message sent #39
Message sent #40
Message sent #41
Message sent #42
Message sent #43
Message sent #44
Message sent #45
Message sent #46
Message sent #47
Message sent #48
Message sent #49
Message sent #50
Message sent #51
Message sent #52
Message sent #53
Message sent #54
Message sent #55
Message sent #56
Message sent #57
Message sent #58
Message sent #59
Messag