In [12]:
!pip install pyarrow fastparquet boto3 -q

In [14]:
# AWS params

from google.colab import userdata
AWS_PARAMS ={
    'aws_access_key_id': userdata.get('AWS_ACCESS_KEY_ID'),
    'aws_secret_access_key': userdata.get('AWS_SECRET_ACCESS_KEY'),
    'region_name': userdata.get('AWS_REGION')
}

In [3]:
import pandas as pd
import boto3
import json
from datetime import datetime
from io import BytesIO

In [4]:
def get_s3_client():
  return boto3.client('s3', **AWS_PARAMS)

In [5]:
def read_json_from_s3(s3_client, bucket, prefix):
  response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
  if 'Contents' not in response:
    return None

  last_file_key = response['Contents'][-1]['Key']
  obj = s3_client.get_object(Bucket=bucket, Key=last_file_key)

  print(f'Reading {last_file_key}')
  return json.loads(obj['Body'].read().decode('utf-8'))

In [6]:
# apply some bussiness rules and data cleaning
def transform_to_silver(raw_data):
  mapping = {
      'symbol': 'ticker',
      'longName': 'company_name',
      'currentPrice': 'price',
      'marketCap': 'market_cap',
      'totalRevenue': 'revenue',
      'download_timestamp': 'processed_at'
  }

  df_bronze = pd.DataFrame([raw_data])

  # select existing columns in either mapping and raw data
  existing_cols = [c for c in mapping.keys() if c in df_bronze.columns]
  df_silver = df_bronze[existing_cols].rename(columns=mapping)

  # timestamp to datetime
  if 'processed_at' in df_silver.columns:
    df_silver['processed_at'] = pd.to_datetime(df_silver['processed_at'])

  return df_silver

In [7]:
def upload_parquet_to_s3(s3_client, df, bucket, key):
  parquet_buffer = BytesIO()
  df.to_parquet(parquet_buffer, index=False, compression='snappy')
  s3_client.put_object(Bucket=bucket, Key=key, Body=parquet_buffer.getvalue())

In [8]:
# list of all tickers with data in bronze layer
def list_bronze_tickers(s3_client, bucket, prefix):
  response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix, Delimiter='/')
  tickers = []
  for content in response.get('CommonPrefixes', []):
    folder_name = content['Prefix'].split('/')[-2] # example:'ticker=AAPL'
    ticker = folder_name.split('=')[-1] # example:'AAPL'
    tickers.append(ticker)
  return tickers

In [9]:
# Orchestrator Function
def run_silver_pipeline(bucket_name):
  s3 = get_s3_client()
  bronze_base_path = 'bronze/yahoo_finance/'

  tickers = list_bronze_tickers(s3, bucket_name, bronze_base_path)
  print(f'Ticker for processing: {tickers}')

  for ticker in tickers:
    print(f'Processing {ticker} assets')
    # reading from source (bronze -> memory)
    bronze_prefix = f'{bronze_base_path}ticker={ticker}/'
    data = read_json_from_s3(s3, bucket_name, bronze_prefix)

    if data:
      # trasnformation
      df_silver = transform_to_silver(data)

      # Load (memory -> silver)
      date_str = datetime.now().strftime('%Y-%m-%d')
      timestamp = datetime.now().strftime('%H%M%S')
      silver_key = f'silver/yahoo_finance/market_analysis/ticker={ticker}/loading_date={date_str}/data_{timestamp}.parquet'

      upload_parquet_to_s3(s3, df_silver, bucket_name, silver_key)
      print(f'Silver pipeline for {ticker} done.')

    else:
      print(f'No data found for {ticker}.')

In [10]:
if __name__ == '__main__':
  bucket_name = 'stock-market-monitoring'

  print('Initializing silver pipeline ...')
  run_silver_pipeline(bucket_name)
  print('Done.')

Initializing silver pipeline ...
Ticker for processing: ['AAPL', 'AMZN', 'GOOGL', 'META', 'MSFT', 'NFLX', 'NVDA', 'TSLA', 'TSM']
Processing AAPL assets
Reading bronze/yahoo_finance/ticker=AAPL/extraction_date=2026-01-16/202729.json
Silver pipeline for AAPL done.
Processing AMZN assets
Reading bronze/yahoo_finance/ticker=AMZN/extraction_date=2026-01-16/202732.json
Silver pipeline for AMZN done.
Processing GOOGL assets
Reading bronze/yahoo_finance/ticker=GOOGL/extraction_date=2026-01-16/202731.json
Silver pipeline for GOOGL done.
Processing META assets
Reading bronze/yahoo_finance/ticker=META/extraction_date=2026-01-16/202734.json
Silver pipeline for META done.
Processing MSFT assets
Reading bronze/yahoo_finance/ticker=MSFT/extraction_date=2026-01-16/202730.json
Silver pipeline for MSFT done.
Processing NFLX assets
Reading bronze/yahoo_finance/ticker=NFLX/extraction_date=2026-01-16/202738.json
Silver pipeline for NFLX done.
Processing NVDA assets
Reading bronze/yahoo_finance/ticker=NVDA/

In [11]:
# validating final result
def validate_silver_data(ticker, bucket_name):
  s3 = get_s3_client()
  date_str = datetime.now().strftime('%Y-%m-%d')
  silver_path = f'silver/yahoo_finance/market_analysis/ticker={ticker}/loading_date=2026-01-12/data.parquet'

  # reading parquet file from s3 bucket
  obj = s3.get_object(Bucket=bucket_name, Key=silver_path)
  df_validation = pd.read_parquet(BytesIO(obj['Body'].read()))

  print(f'Verifying data from Silver Layer for ({ticker}) ---')
  print(f'Shape: {df_validation.shape}')
  display(df_validation.head())
  print(f'\nData types: \n{df_validation.dtypes}')

validate_silver_data('AAPL', 'stock-market-monitoring')

Verifying data from Silver Layer for (AAPL) ---
Shape: (1, 6)


Unnamed: 0,ticker,company_name,price,market_cap,revenue,processed_at
0,AAPL,Apple Inc.,260.25,3845545787392,416161005568,2026-01-12 22:59:52.615440



Data types: 
ticker                  object
company_name            object
price                  float64
market_cap               int64
revenue                  int64
processed_at    datetime64[ns]
dtype: object
