In [None]:
'''

@Author: Vighnesh Harish Bilgi
@Date: 2022-11-07
@Last Modified by: Vighnesh Harish Bilgi
@Last Modified time: 2022-11-07
@Title : DynamnoDB BOTO3 programs

'''

In [8]:
import boto3
import pandas as pd
import json
from boto3.dynamodb.conditions import Key, Attr

In [2]:
import os
os.environ['AWS_DEFAULT_REGION'] = 'ap-south-1'
os.environ['AWS_ACCESS_KEY_ID'] = os.environ.get('test1_access_key')
os.environ['AWS_SECRET_ACCESS_KEY'] = os.environ.get('test1_secret_access_key')

### 1. CRUD operations on DynamoDB tables using BOTO3

#### 1.a. C: CREATE

In [66]:
def connect_to_dynamoDB():
    """

    Description:
        To connect to AWS DynamoDB service.
    Parameter:
        No parameters
    Return:
        ServiceResource dyDB

    """
    dyDB =  boto3.resource('dynamodb')
    return dyDB

def create_items(table,employee_details):
    """

    Description:
        To create items in a table of DynamoDB.
    Parameter:
        dynamodb.table table
    Return:
        No values returned.

    """
    table.put_item(
        Item={
                'EMPID': employee_details[0],
                'DateOfJoin': employee_details[1],
                'FirstName': employee_details[2],
                'LastName': employee_details[3],
                'Role': employee_details[4],
                'Department': employee_details[5],
                'ContactNo': employee_details[6]
            }
    )

def main():

    dyDB = connect_to_dynamoDB()

    # Create the DynamoDB table.
    table = dyDB.create_table(
        TableName='EmployeeBOTO3',
        KeySchema=[
            {
                'AttributeName': 'EMPID',
                'KeyType': 'HASH'
            },
            {
                'AttributeName': 'DateOfJoin',
                'KeyType': 'RANGE'
            }
        ],
        AttributeDefinitions=[
            {
                'AttributeName': 'EMPID',
                'AttributeType': 'S'
            },
            {
                'AttributeName': 'DateOfJoin',
                'AttributeType': 'S'
            },
        ],
        ProvisionedThroughput={
            'ReadCapacityUnits': 5,
            'WriteCapacityUnits': 5
        }
    )

    # Wait until the table exists.
    table.wait_until_exists()

    # Print out some data about the table.
    print(f"No. of items in the table :{table.item_count}")
    print(f"DateTime creation of Table : {table.creation_date_time}")

    # table = dyDB.Table('EmployeeBOTO3')

    # Adding Items to the table
    employee_1 = ["EMP001","20-10-2015","John","Smith","Data Engineer","IT","9876543210"]
    create_items(table,employee_1)
    employee_2 = ["EMP002","19-04-2019","Rahul","Kumar","Backend Developer","IT","9102837465"]
    create_items(table,employee_2)
    employee_3 = ["EMP003","13-07-2011","Rahul","Kumar","Business Development","CEO","8555090324"]
    create_items(table,employee_3)
    employee_4 = ["EMP004","19-04-2019","Vighnesh","Bilgi","Data Analyst","IT","9049480396"]
    create_items(table,employee_4)

if __name__ == "__main__":
    main()

No. of items in the table :0
DateTime creation of Table : 2022-11-07 19:28:29.704000+05:30


#### 1.b. R: READ/RETRIEVE

In [62]:
def connect_to_dynamoDB():
    """

    Description:
        To connect to AWS DynamoDB service.
    Parameter:
        No parameters
    Return:
        ServiceResource dyDB

    """
    dyDB =  boto3.resource('dynamodb')
    return dyDB

def get_items_by_key(table,employee_details):
    """

    Description:
        To get items in a table of DynamoDB by key.
    Parameter:
        dynamodb.table table,
        list employee_details
    Return:
        dynamodb.item item

    """
    emp_key = table.get_item(
        Key={
                'EMPID': employee_details[0],
                'DateOfJoin': employee_details[1]
            }
    )
    item =  emp_key['Item']
    return item     
    
def get_items_by_scan(table,dept):
    """

    Description:
        To get items in a table of DynamoDB by scan.
    Parameter:
        dynamodb.table table,
        string doj
    Return:
        dynamodb.item item

    """
    response = table.scan(
        FilterExpression=Attr('Department').eq(dept)
    )
    items = response['Items']
    return items

def get_items_by_query(table,empID):
    """

    Description:
        To get items in a table of DynamoDB by query.
    Parameter:
        dynamodb.table table,
        string doj
    Return:
        dynamodb.item item

    """
    response = table.query(
        KeyConditionExpression=Key('EMPID').eq(empID)
    )
    items = response['Items']
    return items    


def main():

    dyDB = connect_to_dynamoDB()

    table = dyDB.Table('EmployeeBOTO3')

    table_details = table.scan()
    table_items = table_details['Items']

    # printing all items from the table
    print("Prinitng Every Item in the table:")
    for item in table_items:
        print(item)

    # printing specific item from the table by key
    print("\nPrinitng Item of EMPID 'EMP004' in the table by key:")
    employee_key = ["EMP004","19-04-2019"]
    print(get_items_by_key(table,employee_key))

    # printing items from the table by scan
    print("\nPrinitng Employees in IT department in the table by scan:")
    dept = "IT"
    print(get_items_by_scan(table,dept))

    # printing items from the table by query
    print("\nPrinitng Employee details of EMPID : EMP001 in the table by query:")
    empID = "EMP001"
    print(get_items_by_query(table,empID))



if __name__ == "__main__":
    main()

Prinitng Every Item in the table:
{'Role': 'Data Analyst', 'DateOfJoin': '19-04-2019', 'Department': 'IT', 'EMPID': 'EMP004', 'ContactNo': '9049480396', 'FirstName': 'Vighnesh', 'LastName': 'Bilgi'}
{'Role': 'Backend Developer', 'DateOfJoin': '19-04-2019', 'Department': 'IT', 'EMPID': 'EMP002', 'ContactNo': '9102837465', 'FirstName': 'Rahul', 'LastName': 'Kumar'}
{'Role': 'Data Engineer', 'DateOfJoin': '20-10-2015', 'Department': 'IT', 'EMPID': 'EMP001', 'ContactNo': '9876543210', 'FirstName': 'John', 'LastName': 'Smith'}
{'Role': 'Business Development', 'DateOfJoin': '13-07-2011', 'Department': 'CEO', 'EMPID': 'EMP003', 'ContactNo': '8555090324', 'FirstName': 'Rahul', 'LastName': 'Kumar'}

Prinitng Item of EMPID 'EMP004' in the table by key:
{'Role': 'Data Analyst', 'DateOfJoin': '19-04-2019', 'Department': 'IT', 'EMPID': 'EMP004', 'ContactNo': '9049480396', 'FirstName': 'Vighnesh', 'LastName': 'Bilgi'}

Prinitng Employees in IT department in the table by scan:
[{'Role': 'Data Analyst

#### 1.c. U : UPDATE

In [67]:
def connect_to_dynamoDB():
    """

    Description:
        To connect to AWS DynamoDB service.
    Parameter:
        No parameters
    Return:
        ServiceResource dyDB

    """
    dyDB =  boto3.resource('dynamodb')
    return dyDB

def update_items_by_key(table,employee_details):
    """

    Description:
        To update items in a table of DynamoDB by key.
    Parameter:
        dynamodb.table table,
        list employee_details
    Return:
        No values returned.

    """
    table.update_item(
        Key={
                'EMPID': employee_details[0],
                'DateOfJoin': employee_details[1]
            },

    UpdateExpression='SET #role = :val1, #dept = :val2 ',
    ExpressionAttributeValues={
        ":val1": employee_details[2],
        ":val2": employee_details[3]
    },
    ExpressionAttributeNames={
        "#role": "Role",
        "#dept": "Department"
    }
    )

def main():

    dyDB = connect_to_dynamoDB()

    table = dyDB.Table('EmployeeBOTO3')

    table_details = table.scan()

    #changing role and department of John smith
    new_emp_details = ["EMP001","20-10-2015","Marketing Analyst","Marketing"]
    update_items_by_key(table,new_emp_details)

    table_details = table.scan()
    table_items = table_details['Items']

    # printing all items from the table
    print("Prinitng Every Item in the table:")
    for item in table_items:
        print(item)


if __name__ == "__main__":
    main()

Prinitng Every Item in the table:
{'Role': 'Data Analyst', 'DateOfJoin': '19-04-2019', 'Department': 'IT', 'EMPID': 'EMP004', 'ContactNo': '9049480396', 'FirstName': 'Vighnesh', 'LastName': 'Bilgi'}
{'Role': 'Backend Developer', 'DateOfJoin': '19-04-2019', 'Department': 'IT', 'EMPID': 'EMP002', 'ContactNo': '9102837465', 'FirstName': 'Rahul', 'LastName': 'Kumar'}
{'Role': 'Marketing Analyst', 'DateOfJoin': '20-10-2015', 'Department': 'Marketing', 'EMPID': 'EMP001', 'ContactNo': '9876543210', 'FirstName': 'John', 'LastName': 'Smith'}
{'Role': 'Business Development', 'DateOfJoin': '13-07-2011', 'Department': 'CEO', 'EMPID': 'EMP003', 'ContactNo': '8555090324', 'FirstName': 'Rahul', 'LastName': 'Kumar'}


#### 1.d. D : DELETE

In [68]:
def connect_to_dynamoDB():
    """

    Description:
        To connect to AWS DynamoDB service.
    Parameter:
        No parameters
    Return:
        ServiceResource dyDB

    """
    dyDB =  boto3.resource('dynamodb')
    return dyDB

def delete_items(table,table_items):
    """

    Description:
        To delete items in a table of DynamoDB.
    Parameter:
        dynamodb.table table,
        list table_items
    Return:
        No values returned.

    """
    with table.batch_writer() as batch:
        for item in table_items:
            response = batch.delete_item(Key={
                "EMPID": item["EMPID"], # Change key and value names
                "DateOfJoin": item["DateOfJoin"]
            })

def main():

    dyDB = connect_to_dynamoDB()

    table = dyDB.Table('EmployeeBOTO3')

    table_details = table.scan()
    table_items = table_details['Items']

    # printing all items from the table
    print("Prinitng Every Item in the table BEFORE Deletion:")
    for item in table_items:
        print(item)
    print(f"No. of items in the table :{table.item_count}")
    
    delete_items(table,table_items)
    table_details = table.scan()
    table_items = table_details['Items']

    # printing all items from the table
    print("Prinitng Every Item in the table AFTER Deletion:")
    for item in table_items:
        print(item)
    print(f"No. of items in the table :{table.item_count}")

    #Deleting table itself
    table.delete()

if __name__ == "__main__":
    main()

Prinitng Every Item in the table BEFORE Deletion:
{'Role': 'Data Analyst', 'DateOfJoin': '19-04-2019', 'Department': 'IT', 'EMPID': 'EMP004', 'ContactNo': '9049480396', 'FirstName': 'Vighnesh', 'LastName': 'Bilgi'}
{'Role': 'Backend Developer', 'DateOfJoin': '19-04-2019', 'Department': 'IT', 'EMPID': 'EMP002', 'ContactNo': '9102837465', 'FirstName': 'Rahul', 'LastName': 'Kumar'}
{'Role': 'Marketing Analyst', 'DateOfJoin': '20-10-2015', 'Department': 'Marketing', 'EMPID': 'EMP001', 'ContactNo': '9876543210', 'FirstName': 'John', 'LastName': 'Smith'}
{'Role': 'Business Development', 'DateOfJoin': '13-07-2011', 'Department': 'CEO', 'EMPID': 'EMP003', 'ContactNo': '8555090324', 'FirstName': 'Rahul', 'LastName': 'Kumar'}
No. of items in the table :0
Prinitng Every Item in the table AFTER Deletion:
No. of items in the table :0


### 2. Export and Import DynamoDB From/To S3 Bucket

#### 2a. Export To DynamoDB To S3 Bucket.

In [22]:
def connect_to_dynamoDB():
    """

    Description:
        To connect to AWS DynamoDB service.
    Parameter:
        No parameters
    Return:
        ServiceResource dyDB

    """
    dyDB =  boto3.resource('dynamodb')
    return dyDB

def connect_to_s3():
    """

    Description:
        To connect to AWS S3 service through an IAM user.
    Parameter:
        No parameters
    Return:
        ServiceResource s3
    """
    s3 =  boto3.resource(service_name = 's3')
    return s3

def create_bucket(s3):
    """

    Description:
       Creating Bucket 'crud-bucket-demo' after connecting to s3 service.
    Parameter:
        ServiceResource s3
    Return:
        No values returned.
    """

    s3.create_bucket(Bucket = 'boto3-import-bucket',  CreateBucketConfiguration={'LocationConstraint': 'ap-south-1'})
    print("Printing all bucket names to verify if - boto3-import-bucket is created:")
    for bucket in s3.buckets.all():
        print(bucket.name)

def create_items(table,employee_details):
    """

    Description:
        To create items in a table of DynamoDB.
    Parameter:
        dynamodb.table table
    Return:
        No values returned.

    """
    table.put_item(
        Item={
                'EMPID': employee_details[0],
                'DateOfJoin': employee_details[1],
                'FirstName': employee_details[2],
                'LastName': employee_details[3],
                'Role': employee_details[4],
                'Department': employee_details[5],
                'ContactNo': employee_details[6]
            }
    )

def main():

    s3 = connect_to_s3()
    create_bucket(s3)

    dyDB = connect_to_dynamoDB()

    # Create the DynamoDB table.
    table = dyDB.create_table(
        TableName='BOTO3_Employee_Export',
        KeySchema=[
            {
                'AttributeName': 'EMPID',
                'KeyType': 'HASH'
            },
            {
                'AttributeName': 'DateOfJoin',
                'KeyType': 'RANGE'
            }
        ],
        AttributeDefinitions=[
            {
                'AttributeName': 'EMPID',
                'AttributeType': 'S'
            },
            {
                'AttributeName': 'DateOfJoin',
                'AttributeType': 'S'
            },
        ],
        ProvisionedThroughput={
            'ReadCapacityUnits': 5,
            'WriteCapacityUnits': 5
        }
    )

    # Wait until the table exists.
    table.wait_until_exists()
    
    # Print out some data about the table.
    print(f"No. of items in the table :{table.item_count}")
    print(f"DateTime creation of Table : {table.creation_date_time}")

    # Adding Items to the table
    employee_1 = ["EMP001","20-10-2015","John","Smith","Data Engineer","IT","9876543210"]
    create_items(table,employee_1)
    employee_2 = ["EMP002","19-04-2019","Rahul","Kumar","Backend Developer","IT","9102837465"]
    create_items(table,employee_2)
    employee_3 = ["EMP003","13-07-2011","Rahul","Kumar","Business Development","CEO","8555090324"]
    create_items(table,employee_3)
    employee_4 = ["EMP004","19-04-2019","Vighnesh","Bilgi","Data Analyst","IT","9049480396"]
    create_items(table,employee_4)

    table = dyDB.Table('BOTO3_Employee_Export')

    response = table.scan()
    df = pd.DataFrame(response['Items'])
    df.to_json('BOTO3_Employee_Export.json',orient= 'records')
 
    # Upload temp file to S3
    s3.Bucket('boto3-import-bucket').upload_file('BOTO3_Employee_Export.json', 'BOTO3_Employee_Export.json')

if __name__ == "__main__":
    main()

Printing all bucket names to verify if - boto3-import-bucket is created:
boto3-import-bucket
export-example
import-bucket-example
No. of items in the table :0
DateTime creation of Table : 2022-11-08 12:47:00.103000+05:30


#### 2b. Import From S3 Bucket to DynamoDB

In [23]:
def connect_to_dynamoDB():
    """

    Description:
        To connect to AWS DynamoDB service.
    Parameter:
        No parameters
    Return:
        ServiceResource dyDB

    """
    dyDB =  boto3.resource('dynamodb')
    return dyDB

def connect_to_s3_client():
    """

    Description:
        To connect to AWS S3 service through an IAM user.
    Parameter:
        No parameters
    Return:
        ServiceResource s3_client
    """
    s3_client =  boto3.client(service_name = 's3')
    return s3_client

def main():

    s3_client = connect_to_s3_client()

    dyDB = connect_to_dynamoDB()

    json_obj = s3_client.get_object(Bucket = 'boto3-import-bucket', Key = 'BOTO3_Employee_Export.json')
    json_read = json_obj['Body'].read()
    json_data = json.loads(json_read)
    #print(json_data)

    # Create the DynamoDB table.
    table = dyDB.create_table(
        TableName='BOTO3_Employee_Import',
        KeySchema=[
            {
                'AttributeName': 'EMPID',
                'KeyType': 'HASH'
            },
            {
                'AttributeName': 'DateOfJoin',
                'KeyType': 'RANGE'
            }
        ],
        AttributeDefinitions=[
            {
                'AttributeName': 'EMPID',
                'AttributeType': 'S'
            },
            {
                'AttributeName': 'DateOfJoin',
                'AttributeType': 'S'
            },
        ],
        ProvisionedThroughput={
            'ReadCapacityUnits': 5,
            'WriteCapacityUnits': 5
        }
    )

    table = dyDB.Table('BOTO3_Employee_Import')

    for i in json_data:
        table.put_item(Item=i)


if __name__ == "__main__":
    main()