In [1]:
'''

@Author: Vighnesh Harish Bilgi
@Date: 2022-11-25
@Last Modified by: Vighnesh Harish Bilgi
@Last Modified time: 2022-11-25
@Title : Genrerate random records and upload them to dynamoDB

'''

'\n\n@Author: Vighnesh Harish Bilgi\n@Date: 2022-11-25\n@Last Modified by: Vighnesh Harish Bilgi\n@Last Modified time: 2022-11-25\n@Title : Genrerate random records and upload them to dynamoDB\n\n'

In [2]:
import boto3
import pandas as pd
import names
import random
import time
# import 
from io import StringIO

In [3]:
import os
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'
os.environ['AWS_ACCESS_KEY_ID'] = os.environ.get('test1_access_key')
os.environ['AWS_SECRET_ACCESS_KEY'] = os.environ.get('test1_secret_access_key')
TABLE_NAME = 'Auto_Load_Table'
BUCKET_NAME = 'auto-load-bucket'

### Custom fucntions to create dynamoDB Table

In [4]:
def connect_to_dynamoDB():
    """

    Description:
        To connect to AWS DynamoDB service.
    Parameter:
        No parameters
    Return:
        ServiceResource dyDB

    """
    dyDB =  boto3.resource('dynamodb')
    return dyDB

def create_items(table,dataset_records):
    """

    Description:
        To create items in a table of DynamoDB.
    Parameter:
        dynamodb.table table
    Return:
        No values returned.

    """
    table.put_item(
        Item={
                'id': dataset_records[0],
                'name': dataset_records[1]
            }
    )

def create_table(dyDB,table_name):
    """

    Description:
        To create a dynamoDB table if it doen't exist
    Parameter:
        ServiceResource dyDB
    Return:
        no value returned

    """
    
    dynamodb_client = boto3.client('dynamodb')
    existing_tables = dynamodb_client.list_tables()['TableNames']

    if table_name not in existing_tables:
        # Create the DynamoDB table.
        dyDB.create_table(
            TableName=table_name,
            KeySchema=[
                {
                    'AttributeName': 'id',
                    'KeyType': 'HASH'
                },
                {
                    'AttributeName': 'name',
                    'KeyType': 'RANGE'
                }
            ],
            AttributeDefinitions=[
                {
                    'AttributeName': 'id',
                    'AttributeType': 'N'
                },
                {
                    'AttributeName': 'name',
                    'AttributeType': 'S'
                },
            ],
            ProvisionedThroughput={
                'ReadCapacityUnits': 5,
                'WriteCapacityUnits': 5
            }
        )


### Custom fucntions to connect to S3 resource

In [5]:
def connect_to_s3_client():
    """

    Description:
        To connect to AWS S3 service.
    Parameter:
        No parameters
    Return:
        ServiceResource s3
    """
    # s3 =  boto3.resource('s3')
    client = boto3.client("s3")
    return client


def connect_to_s3_resource():
    """

    Description:
        To connect to AWS S3 service through an IAM user.
    Parameter:
        No parameters
    Return:
        ServiceResource s3
    """
    s3 =  boto3.resource(service_name = 's3')
    return s3


### Custom fucntions to create upload data to DynamoDB and send each item from DynamoDB as .csv file to S3 Bucket

In [None]:
def generate_and_upload(table,n):
    """

    Description:
        Generating 'n' records of 'id' and 'name' 
        where random 5 digit integers as id and 
        random names from 'names' module and uploading them to dynamoDB Table 'table'
    Parameter:
        dynamodb.table table,
        integer n
    Return:
        No values returned
        
    """
    print(f"Entering records into DynamoDB Table '{TABLE_NAME}' ...")
    for i in range(n):
        
        record = [random.randint(10000,99999), names.get_full_name()]
        print(f"Record #{i+1} data : {record}")
        create_items(table,record)

        # wait for 5 seconds before next iteration
        time.sleep(5)


def dynamo_to_s3(table):
    """

    Description:
        Fetching each item from dynamoDB Table 'table' as .csv file and uploading them to S3 bucket
    Parameter:
        dynamodb.table table
    Return:
        No values returned

    """
    s3 = connect_to_s3_resource()
    
    table_details = table.scan()
    table_items = table_details['Items']

    print(f"Displaying objects in {BUCKET_NAME}:")

    count = 1
    for item in table_items:

        file_name = f'record#{count}.csv'

        csv_buffer = StringIO()
        df = pd.DataFrame(item, index=[0]) 
        df.to_csv(csv_buffer, index= False)
        s3.Object(BUCKET_NAME, f'data-output/{file_name}').put(Body=csv_buffer.getvalue())

        print(f"{file_name} is uploaded to S3 Bucket '{BUCKET_NAME}'")

        count = count + 1

        # wait for 30 seconds before next iteration
        time.sleep(30)



### 1. Create S3 Bucket

In [6]:
s3 = connect_to_s3_resource()
client = connect_to_s3_client()

# creating new bucket
client.create_bucket(Bucket = BUCKET_NAME,ACL = 'public-read-write')
print(f"Printing all bucket names to verify if - {BUCKET_NAME} is created:")
for bucket in s3.buckets.all():
    print(bucket.name)

Printing all bucket names to verify if - auto-load-bucket is created:
athena-dynamo-output
auto-load-bucket
aws-glue-assets-949401335332-us-east-1
aws-logs-949401335332-us-east-1
body-parquet
dataset-athena-bucket
dataset-input-bucket
output-stream-bucket
parquet-bukcet
redshift-dataset-input
redshift-twitter-input-bucket
title-parquet
twitter-streaming-output-bucket


### 2. Send data to DynamoDB Table

In [7]:
dyDB = connect_to_dynamoDB()

create_table(dyDB,TABLE_NAME)

table = dyDB.Table(TABLE_NAME)
table.wait_until_exists()
print(f"DateTime creation of Table : {table.creation_date_time}")

n = int(input("Enter number of times to run loop:"))

generate_and_upload(table,n)

DateTime creation of Table : 2022-12-01 10:28:00.795000+05:30
Entering records into DynamoDB Table 'Auto_Load_Table' ...
Record #1 data : [90426, 'Fernando Sloss']
Record #2 data : [95188, 'Patrick Piper']
Record #3 data : [55209, 'Justine Williams']
Record #4 data : [85452, 'Latrice Jamal']
Record #5 data : [76609, 'Dianne Taylor']
Record #6 data : [11601, 'Jesse Presswood']
Record #7 data : [36731, 'Jayson Bosch']
Record #8 data : [72019, 'Timothy Dwyer']
Record #9 data : [52358, 'Anita Schneider']
Record #10 data : [39418, 'Courtney Demma']
Record #11 data : [74844, 'Michael Luff']
Record #12 data : [90338, 'Joyce Kopecky']
Record #13 data : [62148, 'Lynn Watson']
Record #14 data : [76048, 'Flora Akers']
Record #15 data : [22638, 'Betty Berry']
Record #16 data : [90727, 'Paul Conley']
Record #17 data : [23344, 'Daniel Garcia']
Record #18 data : [11069, 'Neal Carlos']
Record #19 data : [13880, 'Jean Janousek']
Record #20 data : [57730, 'Josephine Hampton']


### 3. Send items from DynamoDB Table as .csv file to S3 bucket

In [8]:
table = dyDB.Table(TABLE_NAME)
dynamo_to_s3(table)

Displaying objects in auto-load-bucket:
record#1.csv is uploaded to S3 Bucket 'auto-load-bucket'
record#2.csv is uploaded to S3 Bucket 'auto-load-bucket'
record#3.csv is uploaded to S3 Bucket 'auto-load-bucket'
record#4.csv is uploaded to S3 Bucket 'auto-load-bucket'
record#5.csv is uploaded to S3 Bucket 'auto-load-bucket'
record#6.csv is uploaded to S3 Bucket 'auto-load-bucket'
record#7.csv is uploaded to S3 Bucket 'auto-load-bucket'
record#8.csv is uploaded to S3 Bucket 'auto-load-bucket'
record#9.csv is uploaded to S3 Bucket 'auto-load-bucket'
record#10.csv is uploaded to S3 Bucket 'auto-load-bucket'
record#11.csv is uploaded to S3 Bucket 'auto-load-bucket'
record#12.csv is uploaded to S3 Bucket 'auto-load-bucket'
record#13.csv is uploaded to S3 Bucket 'auto-load-bucket'
record#14.csv is uploaded to S3 Bucket 'auto-load-bucket'
record#15.csv is uploaded to S3 Bucket 'auto-load-bucket'
record#16.csv is uploaded to S3 Bucket 'auto-load-bucket'
record#17.csv is uploaded to S3 Bucket 'a