In [1]:
import boto3
import pandas as pd

In [2]:
boto3.setup_default_session(profile_name='ck-pythonist')

In [40]:
transaction={"transactionid":"tr202303261455","customerid":"kp","Date":"2023-03-26",
             "Items":[{"Item":"Apple","Qty":10,"Rate":25,"Amount":125,"Category":"Fruits&Vegetables","uom":"nos"},
                      {"Item":"Orange","Qty":10,"Rate":50,"Amount":500,"Category":"Fruits&Vegetables","uom":"nos"},
                      {"Item":"Badam","Qty":500,"Rate":1,"Amount":500,"Category":"DryFruits&Nuts","uom":"gms"},
                      {"Item":"Walnut","Qty":500,"Rate":2,"Amount":1000,"Category":"DryFruits&Nuts","uom":"gms"}
                      ]}

In [41]:
def TabletoDynamoDBJson(data):

    "converts dataframe to dynamodb compatible json"

    pk="TransactionID"

    sk="Item"

    if any([x not in list(data.columns) for x in [pk,sk]]):

        raise Exception ("Partition Key Or Sort Key Missing In Data")
    
    dynamoDBJson=[]

    for iD,row in data.iterrows():

        objRow=dict(row)

        objIteration={}

        for key,val in objRow.items():

            if type(val) in [int,float]:

                objIteration[key]={"N":str(val)}
            
            elif pd.isnull(val)==True:

                objIteration[key]={"NULL":True}
            
            else:

                objIteration[key]={"S":str(val)}
        
        dynamoDBJson.append(objIteration)
    

    # batchwise split

    batchRecords=[dynamoDBJson[n:n+20] for n in range(0, len(dynamoDBJson), 20)]
    

    return batchRecords


In [42]:
def ConvertTransactionToDynamoJson(transaction):

    "converts transaction to a dynamodb compatible json"

    dynamoDB=boto3.client("dynamodb",region_name="us-east-2")

    transactionTable=pd.DataFrame.from_dict(transaction["Items"])

    for key,val in transaction.items():

        if key=="Items":

            continue

        transactionTable.loc[:,key]=val

    # Columns    

    transactionTable.columns=[x.lower() for x in list(transactionTable.columns)]

    mapPKSK={"transactionid":"TransactionID","item":"Item"}

    transactionTable.rename(columns=mapPKSK,inplace=True)

    # Batch DynamoDB Json

    batchRecords=TabletoDynamoDBJson(transactionTable)

    # Push To DynamoDB

    for batch in batchRecords:

        respBatchWrite=dynamoDB.batch_write_item(RequestItems={
            "Transactions":[{"PutRequest":{"Item":x}} for x in batch]})
    
    

    return {"Status":"Transaction Posted To Database"}


In [43]:
status=ConvertTransactionToDynamoJson(transaction)

In [44]:
status

{'Status': 'Transaction Posted To Database'}

In [45]:
def DynamoDBTable(transactionid):

    "retrieves transactionid from dynamoDB"

    dynamoDB=boto3.client("dynamodb",region_name="us-east-2")

    query="select * from Transactions where TransactionID='%s'"%transactionid

    results=dynamoDB.execute_statement(Statement=query,Limit=1000)

    return results

In [46]:
results=DynamoDBTable(transaction["transactionid"])

In [47]:
results["Items"]

[{'uom': {'S': 'nos'},
  'date': {'S': '2023-03-26'},
  'rate': {'N': '25'},
  'Item': {'S': 'Apple'},
  'qty': {'N': '10'},
  'category': {'S': 'Fruits&Vegetables'},
  'amount': {'N': '125'},
  'TransactionID': {'S': 'tr202303261455'},
  'customerid': {'S': 'kp'}},
 {'uom': {'S': 'gms'},
  'date': {'S': '2023-03-26'},
  'rate': {'N': '1'},
  'Item': {'S': 'Badam'},
  'qty': {'N': '500'},
  'category': {'S': 'DryFruits&Nuts'},
  'amount': {'N': '500'},
  'TransactionID': {'S': 'tr202303261455'},
  'customerid': {'S': 'kp'}},
 {'uom': {'S': 'nos'},
  'date': {'S': '2023-03-26'},
  'rate': {'N': '50'},
  'Item': {'S': 'Orange'},
  'qty': {'N': '10'},
  'category': {'S': 'Fruits&Vegetables'},
  'amount': {'N': '500'},
  'TransactionID': {'S': 'tr202303261455'},
  'customerid': {'S': 'kp'}},
 {'uom': {'S': 'gms'},
  'date': {'S': '2023-03-26'},
  'rate': {'N': '2'},
  'Item': {'S': 'Walnut'},
  'qty': {'N': '500'},
  'category': {'S': 'DryFruits&Nuts'},
  'amount': {'N': '1000'},
  'Transa

In [48]:
import json

In [49]:
def Djson2HiveJson(response):

    "converts dynamodb json to hive json"

    hiveJsonL=[]

    for item in response["Items"]:

        obj={}

        for key,val in item.items():

            if "S" in val.keys():

                obj[key]=val["S"]
            
            else:

                obj[key]=float(val["N"])
        
        hiveJsonL.append(obj)
    

    jsonH=json.dumps(hiveJsonL)

    jsonH=jsonH.replace("}, ","}\n")

    jsonH=jsonH[1:-1]

    

    return jsonH

In [50]:
jsonH=Djson2HiveJson(results)

In [20]:
s3=boto3.client("s3",region_name="us-east-2")

In [52]:
transaction

{'transactionid': 'tr202303261455',
 'customerid': 'kp',
 'Date': '2023-03-26',
 'Items': [{'Item': 'Apple',
   'Qty': 10,
   'Rate': 25,
   'Amount': 125,
   'Category': 'Fruits&Vegetables',
   'uom': 'nos'},
  {'Item': 'Orange',
   'Qty': 10,
   'Rate': 50,
   'Amount': 500,
   'Category': 'Fruits&Vegetables',
   'uom': 'nos'},
  {'Item': 'Badam',
   'Qty': 500,
   'Rate': 1,
   'Amount': 500,
   'Category': 'DryFruits&Nuts',
   'uom': 'gms'},
  {'Item': 'Walnut',
   'Qty': 500,
   'Rate': 2,
   'Amount': 1000,
   'Category': 'DryFruits&Nuts',
   'uom': 'gms'}]}

In [53]:
s3.put_object(Body=jsonH,Bucket="data-analytics-ck",
              Key="TransactionTable/%s.json"%transaction["transactionid"])

{'ResponseMetadata': {'RequestId': '86JZFV27E83N9H9W',
  'HostId': 'XN15Aujji0K//kyX4rY/P35IlOOYYeWEK5twSYh/kThcaBNMRd3iNXuqB31kjYjAAz4BbB8K8Hw=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'XN15Aujji0K//kyX4rY/P35IlOOYYeWEK5twSYh/kThcaBNMRd3iNXuqB31kjYjAAz4BbB8K8Hw=',
   'x-amz-request-id': '86JZFV27E83N9H9W',
   'date': 'Sun, 26 Mar 2023 09:27:28 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"a4f8024b953da45a45d3d5ba85ced9a7"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"a4f8024b953da45a45d3d5ba85ced9a7"',
 'ServerSideEncryption': 'AES256'}

# 3. Lambda Function (Run SQL Queries On Top Of Athena Table)

In [54]:
query='''select date as "Transaction Date",
	item as "Item",
	count (distinct transactionid) as "Num Of Transactions",
	count(distinct customerid) as "Num Of Customer ID",
	sum(qty) as "Sold Units",
	sum(amount) as "Revenue"
from transactions
group by date,
	item,
	transactionid,
	customerid'''

In [55]:
print(query)

select date as "Transaction Date",
	item as "Item",
	count (distinct transactionid) as "Num Of Transactions",
	count(distinct customerid) as "Num Of Customer ID",
	sum(qty) as "Sold Units",
	sum(amount) as "Revenue"
from transactions
group by date,
	item,
	transactionid,
	customerid


In [24]:
athena=boto3.client("athena",region_name="us-east-2")

In [25]:
outputLocation=""

In [56]:
import pandas as pd

In [27]:
str(pd.Timestamp.now().value).ljust(35,"0")

'16798414193780220000000000000000000'

In [29]:
respSQL=athena.start_query_execution(QueryString=query,
    ClientRequestToken=str(pd.Timestamp.now().value).ljust(35,"0"),
    QueryExecutionContext={
        'Database': 'transactionsystem',
        'Catalog': 'awsdatacatalog'
    },
    ResultConfiguration={
        'OutputLocation': 's3://data-analytics-ck/DailySalesReport/'
    })

In [30]:
respSQL

{'QueryExecutionId': 'a8069a51-ca93-4b0c-92d9-da0371cbce98',
 'ResponseMetadata': {'RequestId': 'b2c84382-2064-48f4-84c8-dc3b88e905ce',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Sun, 26 Mar 2023 09:13:59 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '59',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'b2c84382-2064-48f4-84c8-dc3b88e905ce'},
  'RetryAttempts': 0}}

In [31]:
"DailySalesReport/"+respSQL["QueryExecutionId"]+'.csv'

'DailySalesReport/a8069a51-ca93-4b0c-92d9-da0371cbce98.csv'

In [34]:
str(pd.Timestamp.now())[:10]

'2023-03-26'

In [57]:
def DailyReportGeneration():

    "runs athena query and generates daily sales report"

    currentDate=str(pd.Timestamp.now())[:10]

    # Report Generation Query

    query='''select date as "Transaction Date",
                item as "Item",
                count (distinct transactionid) as "Num Of Transactions",
                count(distinct customerid) as "Num Of Customer ID",
                sum(qty) as "Sold Units",
                sum(amount) as "Revenue"
            from transactions
            where date=date('%s')
            group by date,
                item,
                transactionid,
                customerid'''%currentDate

    # Run Query On Athena

    respSQL=athena.start_query_execution(QueryString=query,
            ClientRequestToken=str(pd.Timestamp.now().value).ljust(35,"0"),
            QueryExecutionContext={
                'Database': 'transactionsystem',
                'Catalog': 'awsdatacatalog'
            },
            ResultConfiguration={
                'OutputLocation': 's3://data-analytics-ck/DailySalesReport/'
            })
    
    # Output Location

    opLocation= 's3://data-analytics-ck/DailySalesReport/'+respSQL["QueryExecutionId"]+'.csv'
    

    return {"Status":"Report Generated Successfully",
            "Report Date":currentDate,
            "File Location":opLocation}

    
    


In [58]:
DailyReportGeneration()

{'Status': 'Report Generated Successfully',
 'Report Date': '2023-03-26',
 'File Location': 's3://data-analytics-ck/DailySalesReport/5a02482c-8b33-46ca-bc4a-e18e49da802f.csv'}