In [1]:
try:
    import boto3
    from botocore.exceptions import ClientError
    import pandas as pd
    import json
    import _pickle as pickle
    import bz2
    import sys
    from io import BytesIO
    import os

except Exception as e:
    print("Error: {}".format(e))

bucketname = "dataset-store-und"

'''
Attributes of Item to be saved in DynamoDB:
uuid -> ID of user
dataset_id -> ID or name of dataset
pid -> ID of the last completed process
data -> compressed data in case of file_size < 400KB | s3 object key (dataset_name) otherwise
data_stored_on -> True if data is stored on s3 | False if stored in DynamoDB
'''

def create_db_table(dynamodb=None):
    if not dynamodb:
        dynamodb = boto3.resource('dynamodb')

    table = dynamodb.create_table(
        TableName = 'Datasets',
        KeySchema = [
            {
                'AttributeName': 'uuid',
                'KeyType': 'HASH' #Partition Key
            },
            {
                'AttributeName': 'dataset_id',
                'KeyType': 'RANGE' #Sort Key
            }
        ],
        AttributeDefinitions = [
            {
                'AttributeName': 'uuid',
                'AttributeType': 'N'
            },
            {
                'AttributeName': 'dataset_id',
                'AttributeType': 'S'
            }
        ],
        ProvisionedThroughput = {
            'ReadCapacityUnits': 10,
            'WriteCapacityUnits': 10
        }
    )

In [2]:
create_db_table()

In [3]:
def insert_data(uuid, itemname, dynamodb=None):
    if not dynamodb:
        dynamodb = boto3.resource('dynamodb')

    db_table = dynamodb.Table('Datasets')

    with open(itemname,"r") as file:
        df = pd.read_csv(file)
    data = pickle.dumps(df)

    #Compress using bz2
    compress_data = bz2.compress(data)

    #check file size
    data_size = sys.getsizeof(compress_data)

    if(data_size < 400000):
        # save in dynamo
        try:
            db_table.put_item(
                Item = {
                    'uuid': uuid,
                    'dataset_id': itemname,
                    'pid': 1,
                    'data': compress_data,
                    'data_stored_on': False
                }
            )
        except Exception as e:
            print("Error: {}".format(e))

    else:
        #save on s3
        s3 = boto3.resource('s3')
        filename = itemname.split('.')
        filename = filename[0]+".txt"

        bz2_body = BytesIO()
        bz2_f = bz2.BZ2File(bz2_body, 'wb', compresslevel=9)
        bz2_f.write(compress_data)
        bz2_f.close()

        try:
            obj = s3.Object(bucketname,itemname)
            obj.put(ContentType = 'text/plain', ContentEncoding = 'bz2', Body = bz2_body.getvalue())
        
        except Exception as e:
            print("Error: ".format(e))

        db_table.put_item(
            Item = {
                'uuid': uuid,
                'dataset_id': itemname,
                'pid': 1,
                'data': itemname,
                'data_stored_on': True
            }
        )


In [4]:
for x in os.listdir():
    print(x)

.git
.gitignore
.ipynb_checkpoints
Creating Table.ipynb
env
flights.csv
Get Items from DB.ipynb
Large_File_data_Storage_on_DynamoDB_S3.ipynb
Use the existing Table.ipynb


In [6]:
insert_data(uuid=1,itemname="flights.csv")

In [105]:
def get_data(uuid, dataset_name=None, dynamodb=None):
    if not dynamodb:
        dynamodb = boto3.resource('dynamodb')

    table = dynamodb.Table('Datasets')

    try:
        response = table.get_item(Key = { 'uuid': uuid, 'dataset_id': dataset_name })
    except ClientError as e:
        print(e.response['Error']['Message'])
    else:
        data_stored_on = response['Item']['data_stored_on']

        if data_stored_on:
            # fetch from s3
            try:
                s3 = boto3.resource('s3')
                obj = s3.Object(bucketname, dataset_name)
                compressed_file = obj.get()['Body']
            
                with bz2.open(compressed_file,"rb") as f:
                        compressed_data = f.read()

                decompress_data = bz2.decompress(compressed_data)

            except Exception as e:
                print("Error while fetching from s3: {}".format(e))

        else:
            #fetch from dynamo
            compressed_data = response['Item']['data']
            decompress_data = bz2.decompress(compressed_data.value)

        unpickle_data = pickle.loads(decompress_data)
        print(unpickle_data)

In [106]:
data_file = get_data(uuid=1,dataset_name="flights.csv")

     year      month  passengers
0    1949    January         112
1    1949   February         118
2    1949      March         132
3    1949      April         129
4    1949        May         121
..    ...        ...         ...
139  1960     August         606
140  1960  September         508
141  1960    October         461
142  1960   November         390
143  1960   December         432

[144 rows x 3 columns]
