**Purpose**

This jupter notebok file contains the script of the function process_batch_files_lambda() that's used for batch processing. When a new file is generated using Batch_Prices.ipynb, the function handles the data and loads into the s3 while making sure that there are no duplicates.
The actual execution is actually done in AWS lambda service, but the code used here is written just as simulation to make sure that there are no errors or any bugs. Once the code runs without any errors/bugs, it will be executed in AWS lambda.

In [8]:
import pandas as pd
from datetime import datetime, timedelta
from random import randrange
from pandasql import sqldf
import awswrangler as wr

In [2]:
#df_new_file = pd.read_csv('s3://stock-market-raw-data-us-east-1/stg_price_by_date/test_data.csv')
df_new_file = pd.read_csv('s3://stock-market-raw-data-us-east-1/stg_price_by_date/20250131175440.csv')

In [5]:
df_athena = wr.athena.read_sql_query('SELECT * FROM price_by_date', database = 'stock_market')

In [534]:
#convert string to date python date object

df_new_records['close_date'] = df_new_records['close_date'].apply(lambda x: datetime.strptime(x, '%Y-%m-%d').date()) 

#OR

#df_new_records['close_date'] = pd.to_datetime(df_new_records['close_date']).dt.date 

In [535]:
#create the partition column
df_new_records['p_year'] = pd.to_datetime(df_new_records['close_date']).dt.year

In [536]:
# Convert the DataFrame to a PyArrow Table
table = pa.Table.from_pandas(df_new_records)

In [537]:
# Write to Parquet with partitioning by year
s3_path = 's3://stock-market-raw-data-us-east-1/price_by_date/'
pq.write_to_dataset(table, root_path = s3_path, partition_cols=['p_year'])

In [543]:
path = 's3://stock-market-raw-data-us-east-1/stg_price_by_date/20250131163557.csv'
process_batch_files(path)

  df_athena = pd.read_sql('select * from AwsDataCatalog.stock_market.price_by_date', conn)


In [None]:
s3://stock-market-raw-data-us-east-1/stg_price_by_date/20250131172624.csv

In [54]:
df_new_file = pd.read_csv('s3://stock-market-raw-data-us-east-1/stg_price_by_date/20250131175440.csv')
df_new_file['date'] = df_new_file['date'].apply(lambda x: datetime.strptime(x, '%Y-%m-%d').date()) 
df_athena = wr.athena.read_sql_query('SELECT * FROM price_by_date', database = 'stock_market')
merged_df = pd.merge(df_new_file, df_athena, how='left', left_on=['company', 'date'], right_on=['company', 'close_date'] )

In [55]:
merged_df

Unnamed: 0,company,date,close_price_x,close_date,close_price_y,p_year
0,NVDA,2024-12-08,151,2024-12-08,151,2024
1,AAPL,2024-12-08,207,2024-12-08,207,2024
2,MSFT,2024-12-08,417,2024-12-08,417,2024
3,AMZN,2024-12-08,213,2024-12-08,213,2024
4,GOOGL,2024-12-08,195,2024-12-08,195,2024
5,META,2024-12-08,696,2024-12-08,696,2024
6,TSLA,2024-12-08,372,2024-12-08,372,2024
7,WMT,2024-12-08,98,2024-12-08,98,2024
8,JPM,2024-12-08,242,2024-12-08,242,2024
9,V,2024-12-08,350,2024-12-08,350,2024


In [59]:
def process_batch_files_lambda(event, context):
    
    import pandas as pd
    from datetime import datetime, timedelta
    from random import randrange
    import awswrangler as wr

    
    source_bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key']
    path = source_bucket + key
    
    df_new_file = wr.s3.read_csv(path)
    df_new_file['date'] = df_new_file['date'].apply(lambda x: datetime.strptime(x, '%Y-%m-%d').date()) 

    
    df_athena = wr.athena.read_sql_query('SELECT * FROM price_by_date', database = 'stock_market')
    
    # Perform the left join and get records in df_new_file and not in df_athena
    merged_df = pd.merge(df_new_file, df_athena, how='left', left_on=['company', 'date'], right_on=['company', 'close_date'] )
    df_new_records = merged_df[merged_df['close_price_y'].isnull()][['company', 'date', 'close_price_x']]
    df_new_records.columns = ['company', 'close_date', 'close_price']
        
    df_new_records['p_year'] = pd.to_datetime(df_new_records['close_date']).dt.year
    
    dest_path = 's3://stock-market-raw-data-us-east-1/price_by_date/'

    wr.s3.to_parquet(df=df_new_records, path=dest_path, index=False, dataset=True, partition_cols=['p_year'])


In [60]:
event = \
{  
   "Records":[  
      {  
         "eventVersion":"2.2",
         "eventSource":"aws:s3",
         "awsRegion":"us-west-2",
         "eventTime":"The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, when Amazon S3 finished processing the request",
         "eventName":"event-type",
         "userIdentity":{  
            "principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event"
         },
         "requestParameters":{  
            "sourceIPAddress":"ip-address-where-request-came-from"
         },
         "responseElements":{  
            "x-amz-request-id":"Amazon S3 generated request ID",
            "x-amz-id-2":"Amazon S3 host that processed the request"
         },
         "s3":{  
            "s3SchemaVersion":"1.0",
            "configurationId":"ID found in the bucket notification configuration",
            "bucket":{  
               "name":"s3://stock-market-raw-data-us-east-1/",
               "ownerIdentity":{  
                  "principalId":"Amazon-customer-ID-of-the-bucket-owner"
               },
               "arn":"arn:aws:s3:::stock-market-raw-data-us-east-1"
            },
            "object":{  
               "key":"stg_price_by_date/20250131192045.csv",
               "size":"object-size in bytes",
               "eTag":"object eTag",
               "versionId":"object version if bucket is versioning-enabled, otherwise null",
               "sequencer": "a string representation of a hexadecimal value used to determine event sequence, only used with PUTs and DELETEs"
            }
         },
         "glacierEventData": {
            "restoreEventData": {
               "lifecycleRestorationExpiryTime": "The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, of Restore Expiry",
               "lifecycleRestoreStorageClass": "Source storage class for restore"
            }
         }
      }
   ]
}

In [61]:
context = ''
process_batch_files_lambda(event, context)