In [1]:
import pandas as pd
import json
from google.cloud import storage
import io
from datetime import datetime, timedelta, timezone
import pandas_gbq

# Initialize the GCS client
client = storage.Client()

# Replace with your actual bucket name
BUCKET_NAME = "stock-data-bucket-d23000ec"
PREFIX = "pubsuboutput/"  # Only list blobs within this folder

bucket = client.bucket(BUCKET_NAME)



In [2]:
time_window_minutes = 60
cutoff_time = datetime.now(timezone.utc) - timedelta(minutes=time_window_minutes)
print(f"Processing files updated after: {cutoff_time.isoformat()}")

# List all blobs within the prefix
blobs = list(bucket.list_blobs(prefix=PREFIX))

# Filter blobs: skip folder placeholders and only include files updated after cutoff_time
new_blobs = [
    blob for blob in blobs
    if not blob.name.endswith("/") and blob.updated and blob.updated > cutoff_time]

if not new_blobs:
  print("No new records to process")
else:
  print("New files to process:")
for blob in new_blobs:
    print(f" - {blob.name} (updated: {blob.updated})")


Processing files updated after: 2025-02-24T16:51:57.036254+00:00
New files to process:
 - pubsuboutput/output2025-02-24T16:50:00.000Z-2025-02-24T16:55:00.000Z-pane-0-last-00-of-10 (updated: 2025-02-24 16:55:48.191000+00:00)
 - pubsuboutput/output2025-02-24T16:50:00.000Z-2025-02-24T16:55:00.000Z-pane-0-last-01-of-10 (updated: 2025-02-24 16:55:48.262000+00:00)
 - pubsuboutput/output2025-02-24T16:50:00.000Z-2025-02-24T16:55:00.000Z-pane-0-last-07-of-10 (updated: 2025-02-24 16:55:48.267000+00:00)
 - pubsuboutput/output2025-02-24T16:50:00.000Z-2025-02-24T16:55:00.000Z-pane-0-last-08-of-10 (updated: 2025-02-24 16:55:48.261000+00:00)
 - pubsuboutput/output2025-02-24T16:50:00.000Z-2025-02-24T16:55:00.000Z-pane-0-last-09-of-10 (updated: 2025-02-24 16:55:48.173000+00:00)
 - pubsuboutput/output2025-02-24T17:00:00.000Z-2025-02-24T17:05:00.000Z-pane-0-last-02-of-10 (updated: 2025-02-24 17:05:37.163000+00:00)
 - pubsuboutput/output2025-02-24T17:00:00.000Z-2025-02-24T17:05:00.000Z-pane-0-last-03-of-1

In [3]:
stock_records = []      # for stock price messages
company_records = []

In [4]:
for blob in new_blobs:
    data_str = blob.download_as_text()
    lines = data_str.strip().split("\n")
    records = [json.loads(line) for line in lines if line.strip()]
    for record in records:
        message_type = record.get("message_type", "").lower()
        if message_type == "stock_price":
            stock_records.append(record)
        else:
            # Assume any message that is not a stock price belongs to company data (info, financials, news)
            company_records.append(record)

In [5]:
expected_stock_quotes_cols =['c', 'd', 'dp', 'h', 'l', 'o', 'pc', 't', 'message_type', 'symbol',
       'fetched_at']

if stock_records:
    df_stock = pd.DataFrame(stock_records)
    print("Stock Price DataFrame:")
    print(df_stock.head())
else:
    print("No stock price records found.")
    df_stock = pd.DataFrame(columns=expected_stock_quotes_cols)

# Create DataFrame for company info/financials/news messages (if any)


Stock Price DataFrame:
          c       d      dp       h       l       o      pc           t  \
0  248.2050  2.6550  1.0812  248.86  243.84  243.84  245.55  1740416014   
1  248.1271  2.5771  1.0495  248.86  243.84  243.84  245.55  1740416073   
2  248.1591  2.6091  1.0626  248.86  243.84  243.84  245.55  1740415831   
3  247.9100  2.3600  0.9611  248.86  243.84  243.84  245.55  1740415892   
4  248.1000  2.5500  1.0385  248.86  243.84  243.84  245.55  1740415951   

  message_type symbol            fetched_at  
0  stock_price   AAPL  2025-02-24T11:53:37Z  
1  stock_price   AAPL  2025-02-24T11:54:37Z  
2  stock_price   AAPL  2025-02-24T11:50:36Z  
3  stock_price   AAPL  2025-02-24T11:51:37Z  
4  stock_price   AAPL  2025-02-24T11:52:37Z  


In [6]:
expected_company_cols= ['country','currency','exchange','finnhubIndustry','ipo','logo','marketCapitalization','name','phone','shareOutstanding','ticker','weburl','message_type','symbol','fetched_at']

if company_records:
    df_company = pd.DataFrame(company_records)
    print("Company Info DataFrame:")
    print(df_company.head())
else:
    print("No company info records found.")
    df_company = pd.DataFrame(columns=expected_company_cols)


No company info records found.


### Transformations for company data

In [7]:
df_company.drop(columns='estimateCurrency',axis = 1,inplace = True,errors='ignore')
df_company.rename(columns={
    "finnhubIndustry": "Industry",
    "marketCapitalization": "Market Capitalization",
    "shareOutstanding": "Share Outstanding",
    "ipo": "Ipo Date"
}, inplace=True)


In [8]:
df_company["Ipo Dat"] = pd.to_datetime(df_company["Ipo Date"]).dt.date

# Ensure numeric columns are floats:
df_company["Market Capitalization"] = df_company["Market Capitalization"].astype(float)
df_company["Share Outstanding"] = df_company["Share Outstanding"].astype(float)

# Optionally, adjust fetched_at if needed; you may want to keep it as a TIMESTAMP.
df_company["fetched_at"] = pd.to_datetime(df_company["fetched_at"])

In [9]:
df_company.head()

Unnamed: 0,country,currency,exchange,Industry,Ipo Date,logo,Market Capitalization,name,phone,Share Outstanding,ticker,weburl,message_type,symbol,fetched_at,Ipo Dat


## Stock Data Transformations

In [10]:

# Convert 't' (Unix timestamp) to datetime (UTC by default)
df_stock['event_time'] = pd.to_datetime(df_stock['t'], unit='s')

# Convert UTC time to US Eastern Time (or any US time zone)
df_stock['event_time'] = df_stock['event_time'].dt.tz_localize('UTC').dt.tz_convert('US/Eastern')
df_stock.drop(columns=['t'], inplace=True)  # optionally remove original 't'


df_stock['fetched_at'] = pd.to_datetime(df_stock['fetched_at'])
df_stock['event_time_sec'] = df_stock['event_time'].dt.floor('s')
df_stock['event_time_sec'] = df_stock['event_time'].dt.strftime('%Y-%m-%d %H:%M:%S')


df_stock.rename(columns={
  'c': 'current_price',
  'o': 'open_price',
  'h': 'high_price',
  'l': 'low_price',
  'pc': 'prev_close_price',
  'd': 'abs_change',
  'dp': 'pct_change'
}, inplace=True)

In [11]:
df_stock

Unnamed: 0,current_price,abs_change,pct_change,high_price,low_price,open_price,prev_close_price,message_type,symbol,fetched_at,event_time,event_time_sec
0,248.205,2.655,1.0812,248.86,243.84,243.84,245.55,stock_price,AAPL,2025-02-24 11:53:37+00:00,2025-02-24 11:53:34-05:00,2025-02-24 11:53:34
1,248.1271,2.5771,1.0495,248.86,243.84,243.84,245.55,stock_price,AAPL,2025-02-24 11:54:37+00:00,2025-02-24 11:54:33-05:00,2025-02-24 11:54:33
2,248.1591,2.6091,1.0626,248.86,243.84,243.84,245.55,stock_price,AAPL,2025-02-24 11:50:36+00:00,2025-02-24 11:50:31-05:00,2025-02-24 11:50:31
3,247.91,2.36,0.9611,248.86,243.84,243.84,245.55,stock_price,AAPL,2025-02-24 11:51:37+00:00,2025-02-24 11:51:32-05:00,2025-02-24 11:51:32
4,248.1,2.55,1.0385,248.86,243.84,243.84,245.55,stock_price,AAPL,2025-02-24 11:52:37+00:00,2025-02-24 11:52:31-05:00,2025-02-24 11:52:31
5,248.01,2.46,1.0018,248.86,243.84,243.84,245.55,stock_price,AAPL,2025-02-24 12:01:01+00:00,2025-02-24 12:00:58-05:00,2025-02-24 12:00:58
6,247.71,2.16,0.8797,248.86,243.84,243.84,245.55,stock_price,AAPL,2025-02-24 12:02:01+00:00,2025-02-24 12:01:59-05:00,2025-02-24 12:01:59
7,247.5,1.95,0.7941,248.86,243.84,243.84,245.55,stock_price,AAPL,2025-02-24 12:03:01+00:00,2025-02-24 12:02:58-05:00,2025-02-24 12:02:58
8,247.72,2.17,0.8837,248.86,243.84,243.84,245.55,stock_price,AAPL,2025-02-24 12:04:02+00:00,2025-02-24 12:03:59-05:00,2025-02-24 12:03:59
9,247.7815,2.2315,0.9088,248.86,243.84,243.84,245.55,stock_price,AAPL,2025-02-24 12:05:02+00:00,2025-02-24 12:04:57-05:00,2025-02-24 12:04:57


### Creating difference columns

In [12]:
df_stock['diff_from_open'] = df_stock['current_price'] - df_stock['open_price']
df_stock['diff_from_prev_close'] = df_stock['current_price'] - df_stock['prev_close_price']



### Aggregating the daily changes

In [13]:
df_stock['date'] = df_stock['event_time'].dt.date

daily_agg = df_stock.groupby(['symbol', 'date']).agg({ 'current_price': ['min', 'max', 'mean'], 'pct_change': 'mean'}).reset_index()

daily_agg.columns = ['symbol','date','min_price','max_price','avg_price','avg_pct_change']


In [14]:
daily_agg

Unnamed: 0,symbol,date,min_price,max_price,avg_price,avg_pct_change
0,AAPL,2025-02-24,247.5,248.23,247.906587,0.95971


In [15]:
df_stock.sort_values(by = ['symbol','event_time'],inplace=True)

In [16]:
df_stock

Unnamed: 0,current_price,abs_change,pct_change,high_price,low_price,open_price,prev_close_price,message_type,symbol,fetched_at,event_time,event_time_sec,diff_from_open,diff_from_prev_close,date
2,248.1591,2.6091,1.0626,248.86,243.84,243.84,245.55,stock_price,AAPL,2025-02-24 11:50:36+00:00,2025-02-24 11:50:31-05:00,2025-02-24 11:50:31,4.3191,2.6091,2025-02-24
3,247.91,2.36,0.9611,248.86,243.84,243.84,245.55,stock_price,AAPL,2025-02-24 11:51:37+00:00,2025-02-24 11:51:32-05:00,2025-02-24 11:51:32,4.07,2.36,2025-02-24
4,248.1,2.55,1.0385,248.86,243.84,243.84,245.55,stock_price,AAPL,2025-02-24 11:52:37+00:00,2025-02-24 11:52:31-05:00,2025-02-24 11:52:31,4.26,2.55,2025-02-24
0,248.205,2.655,1.0812,248.86,243.84,243.84,245.55,stock_price,AAPL,2025-02-24 11:53:37+00:00,2025-02-24 11:53:34-05:00,2025-02-24 11:53:34,4.365,2.655,2025-02-24
1,248.1271,2.5771,1.0495,248.86,243.84,243.84,245.55,stock_price,AAPL,2025-02-24 11:54:37+00:00,2025-02-24 11:54:33-05:00,2025-02-24 11:54:33,4.2871,2.5771,2025-02-24
5,248.01,2.46,1.0018,248.86,243.84,243.84,245.55,stock_price,AAPL,2025-02-24 12:01:01+00:00,2025-02-24 12:00:58-05:00,2025-02-24 12:00:58,4.17,2.46,2025-02-24
6,247.71,2.16,0.8797,248.86,243.84,243.84,245.55,stock_price,AAPL,2025-02-24 12:02:01+00:00,2025-02-24 12:01:59-05:00,2025-02-24 12:01:59,3.87,2.16,2025-02-24
7,247.5,1.95,0.7941,248.86,243.84,243.84,245.55,stock_price,AAPL,2025-02-24 12:03:01+00:00,2025-02-24 12:02:58-05:00,2025-02-24 12:02:58,3.66,1.95,2025-02-24
8,247.72,2.17,0.8837,248.86,243.84,243.84,245.55,stock_price,AAPL,2025-02-24 12:04:02+00:00,2025-02-24 12:03:59-05:00,2025-02-24 12:03:59,3.88,2.17,2025-02-24
9,247.7815,2.2315,0.9088,248.86,243.84,243.84,245.55,stock_price,AAPL,2025-02-24 12:05:02+00:00,2025-02-24 12:04:57-05:00,2025-02-24 12:04:57,3.9415,2.2315,2025-02-24


In [17]:
print("df_stock is empty: ",df_stock.empty)
print("df_company is empty: ", df_company.empty)

df_stock is empty:  False
df_company is empty:  True


### Saving the transformed data on the Processed_data bucket

In [18]:
import pandas_gbq
Stock_Quote_table_id = 'stock-data-project-449518.TRansforme_stock_data.Updated_stockQuotes'
Company_data_table_id = 'stock-data-project-449518.TRansforme_stock_data.Updated_Company_data'
project_id = 'stock-data-project-449518'


if df_stock.empty and df_company.empty:
    print("No new records available for adding to the Transformed table.")
elif df_stock.empty and not df_company.empty:
    print("New Company info added.")
    try:
        df_company.to_gbq(destination_table=Company_data_table_id,
                          project_id=project_id,
                          if_exists="append")
    except Exception as e:
        print(f"Error uploading company data to BigQuery: {e}")
elif not df_stock.empty and df_company.empty:
    print("New Stock Quotes added.")
    try:
        df_stock.to_gbq(destination_table=Stock_Quote_table_id,
                        project_id=project_id,
                        if_exists="append")
    except Exception as e:
        print(f"Error uploading stock data to BigQuery: {e}")
else:
    print("New Stock Quotes and company info rows added to transformed Tables.")
    try:
        df_stock.to_gbq(destination_table=Stock_Quote_table_id,
                        project_id=project_id,
                        if_exists="append")
    except Exception as e:
        print(f"Error uploading stock data to BigQuery: {e}")

    try:
        df_company.to_gbq(destination_table=Company_data_table_id,
                          project_id=project_id,
                          if_exists="append")
    except Exception as e:
        print(f"Error uploading company data to BigQuery: {e}")


New Stock Quotes added.


  df_stock.to_gbq(destination_table=Stock_Quote_table_id,
100%|██████████| 1/1 [00:00<00:00, 4144.57it/s]


In [None]:
df_stock

Unnamed: 0,current_price,abs_change,pct_change,high_price,low_price,open_price,prev_close_price,message_type,symbol,fetched_at,event_time,event_time_sec,diff_from_open,diff_from_prev_close,date


In [None]:
df_stock['event_time_sec']

Unnamed: 0,event_time_sec
