In [1]:
from pipeline import _cleanup_multiprocessing_resources, run_process_for_cik
from metadata import FileMetadata
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine

import pyspark.sql.functions as F

import sys
import os
import tqdm
import hashlib
import datetime
import logging
import atexit
import multiprocessing
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
import pyarrow.compute as pc
import re
import json
import time
import asyncio

In [4]:
data_folder = "/Users/apple/PROJECT/Code_4_SECfilings/total_sp500_10k-txt"
save_path = "/Users/apple/PROJECT/hons_project/data/SP500/10K/test"
firms_csv_file_path = '../Code_4_SECfilings/sp500_total_constituents_final.csv'
constituents_metadata_path = "../Code_4_SECfilings/sp500_constituents.csv" # This is for getting the CIKs for the SP500, but only for the year 2006 - 2023

In [13]:
def filter_sp500(save_folder, file_path, total_constituents_path, constituents_metadata_path):
        """
        Filter out the SP500 whose firms are not active each year   
        """
        # hard-coded years where you are interested in
        start = 2006
        end = 2023
        #Temp
        match = re.search(r'batch_(\d+)', file_path)
        batch_number = int(match.group(1))
        save_folder = os.path.join(save_folder, 'filtered')
        os.makedirs(save_folder, exist_ok=True)
        

        # Load the data
        # Assuming sp500_constituents.csv has columns: 'Firm', 'EntryDate', 'ExitDate'
        df = pd.read_csv(constituents_metadata_path)

        # Convert date columns to datetime format
        df['start'] = pd.to_datetime(df['start'], errors='coerce')
        df['ending'] = pd.to_datetime(df['ending'], errors='coerce')
        df['nameendt'] = pd.to_datetime(df['nameendt'], errors='coerce')
        
        # Define the range of years we are interested in
        years = range(start, end + 1)

        # Dictionary to hold the yearly snapshots
        sp500_by_year = {}
        permno_to_ticker = {}
        for year in years:
            # Define the start and end of each year
            start_of_year = datetime.datetime(year, 1, 1)
            end_of_year = datetime.datetime(year, 12, 31)
            
            # Filter firms active during the year, ensuring they only appear once per year by permno
            active_firms = df[
                (df['start'] <= end_of_year) & 
                ((df['ending'].isna()) | (df['ending'] >= start_of_year))
            ]
            
            # Get the last entry(nameendt) for each permno, and remove permno duplicate except the last entry of active_firms
            active_firms = active_firms.sort_values(by=['permno', 'nameendt']).groupby('permno').last().reset_index()

            # Convert the resulting DataFrame of firms to a list of unique permnos
            permno_to_ticker = dict(zip(active_firms['permno'], active_firms['ticker']))

            # Store the list of active firms for the year
            sp500_by_year[year] = permno_to_ticker
            
        # Get CIK constituents for the SP500 from local cik meta data
        sp500_ciks_df = pd.read_csv(total_constituents_path)
        
        # Change pernmo to CIK
        reversed_dict = {}
        for year, firms in sp500_by_year.items():
            reversed_dict[year] = {ticker: pernmo for pernmo, ticker in sp500_by_year[year].items()}
            for _, ticker in sp500_by_year[year].items():
                if ticker in sp500_ciks_df["Symbol"].tolist():
                    cik = sp500_ciks_df[sp500_ciks_df["Symbol"] == ticker]["CIK"].values[0]
                    reversed_dict[year][ticker] = cik
        sp500_by_year = reversed_dict.copy()
        
        # Load the intermediate Parquet file to be filtered 
        sp500_dtm = pd.read_parquet(file_path)
        sp500_dtm["Date"] = pd.to_datetime(sp500_dtm["Date"], errors="coerce") 
        sp500_dtm['Year'] = sp500_dtm["Date"].dt.year

        # Create a set of valid CIK-Year pairs
        valid_pairs = set()
        for year, firms in sp500_by_year.items():
            for cik in firms.values():
                cik = str(cik).zfill(10)
                valid_pairs.add((year, cik))
        valid_years = [year for year, _ in valid_pairs]
        # Filter out the data based on valid CIK-Year pairs
        df_filtered = sp500_dtm[
            (~sp500_dtm["Year"].isin(valid_years)) |  # Keep rows where Year is not in valid_pairs
            (sp500_dtm.apply(lambda row: (row["Year"], row["_cik"]) in valid_pairs, axis=1))  # Filter only if Year exists in valid_pairs
        ]

        df_filtered = df_filtered.drop(columns=["Year"])
        save_folder = os.path.join(save_folder, f"batch_filtered_{batch_number}.parquet")
        df_filtered.to_parquet(save_folder, index=False)
        print(f"Filtered data saved to {save_folder}")
        return save_folder

In [20]:
# Uncomment them to concatenate files without modification
existing_files_path = os.path.join(save_path, 'intermediate')
existing_files = os.listdir(existing_files_path)
existing_files = [f for f in existing_files if f != '.DS_Store']
intermediate_file_paths = [os.path.join(existing_files_path, f) for f in existing_files]
print(intermediate_file_paths)
file_path = intermediate_file_paths[0]


['/Users/apple/PROJECT/hons_project/data/SP500/10K/test/intermediate/batch_7.parquet', '/Users/apple/PROJECT/hons_project/data/SP500/10K/test/intermediate/batch_12.parquet']


In [21]:
filter_sp500(save_path, file_path, firms_csv_file_path, constituents_metadata_path)

Filtered data saved to /Users/apple/PROJECT/hons_project/data/SP500/10K/test/filtered/batch_filtered_7.parquet


'/Users/apple/PROJECT/hons_project/data/SP500/10K/test/filtered/batch_filtered_7.parquet'

In [22]:
path = file_path
print(path)
df = pd.read_parquet(path)
df["Date"] = pd.to_datetime(df["Date"], errors="coerce")
df["Year"] = df["Date"].dt.year
if 2024 in df["Year"].tolist():
    print("WElldone")
    
df[df["Year"] == 2024]


/Users/apple/PROJECT/hons_project/data/SP500/10K/test/intermediate/batch_7.parquet
WElldone


Unnamed: 0,Date,_cik,_vol,_ret,_vol+1,_ret+1,united,state,security,exchange,...,dixie,stoker,strom,housekeeping,cleanliness,haze,wilderness,bark,millwork,Year
18,2024-02-20,47111,4.4e-05,0.012565,0.00012,0.011929,56.0,70.0,91.0,85.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2024
28,2024-02-15,1590955,8e-05,0.02306,0.000281,0.013999,36.0,92.0,184.0,37.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2024
54,2024-09-05,858877,0.000125,0.042816,0.000161,0.0076,43.0,56.0,193.0,57.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2024
62,2024-02-21,1675149,8e-05,-0.022903,0.00016,0.00072,49.0,88.0,74.0,61.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2024
81,2024-02-20,1126328,0.000928,-0.053404,6.7e-05,0.008128,21.0,130.0,459.0,89.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2024
100,2024-02-23,1040971,7.5e-05,-0.004591,0.00022,-0.013521,10.0,29.0,176.0,50.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2024
119,2024-01-29,1035443,7.8e-05,-0.01373,0.000301,0.026783,7.0,54.0,130.0,50.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2024
138,2024-01-17,796343,0.000146,0.000906,0.000166,-0.01152,33.0,44.0,141.0,79.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2024
157,2024-02-23,717423,1.2e-05,0.007709,0.000189,-0.014706,84.0,101.0,82.0,51.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2024
195,2024-02-09,100885,0.000188,-0.027516,2.5e-05,0.005603,11.0,39.0,94.0,33.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2024


In [27]:
df[df["_cik"] == "0000354190"]

Unnamed: 0,Date,_cik,_vol,_ret,_vol+1,_ret+1,p,em,fry,author,...,mediterranean,panorama,barbary,ape,retrospective,unfolded,wed,pensive,haired,wander
29670,2016-02-08,354190,0.000329,-0.014796,0.000102,0.001057,3.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
29671,2019-04-08,354190,2.3e-05,-0.007509,1.3e-05,-0.002697,3.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
29672,2019-10-18,354190,6.6e-05,-0.003503,1.7e-05,-0.006219,22.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
29673,2020-11-25,354190,8.7e-05,0.009383,0.000101,0.000949,4.0,1.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
29674,2022-02-15,354190,6.1e-05,0.021941,0.00011,-0.008615,4.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
29675,2022-05-25,354190,7.9e-05,-0.032111,0.000154,-0.011681,3.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
29676,2022-10-19,354190,0.000175,0.033882,0.000199,0.018186,4.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
29677,2023-01-05,354190,8.5e-05,-0.039522,0.000279,-0.021211,4.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
29678,2023-03-22,354190,0.000103,0.019934,0.000262,-0.022924,4.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
29679,2023-05-25,354190,3.4e-05,0.022569,3.7e-05,0.003429,3.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [41]:
filter_path = "/Users/apple/PROJECT/hons_project/data/SP500/10K/test/filtered/batch_filtered_7.parquet"
df = pd.read_parquet(filter_path)
df[df["_cik"] == "0001590955"]

Unnamed: 0,Date,_cik,_vol,_ret,_vol+1,_ret+1,united,state,security,exchange,...,softwood,dixie,stoker,strom,housekeeping,cleanliness,haze,wilderness,bark,millwork
19,2020-02-13,1590955,6.3e-05,0.051048,0.000455,-0.031368,22.0,66.0,88.0,34.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
20,2021-02-18,1590955,5e-05,0.013913,0.000342,0.005433,23.0,74.0,100.0,35.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
21,2022-02-17,1590955,0.000169,-0.05534,0.000758,0.034341,23.0,75.0,94.0,39.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
22,2023-02-16,1590955,0.000136,-0.066576,0.00022,0.006069,25.0,83.0,108.0,37.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
23,2024-02-15,1590955,8e-05,0.02306,0.000281,0.013999,36.0,92.0,184.0,37.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [40]:
dtm_path = "/Users/apple/PROJECT/hons_project/data/SP500/10K/dtm/part-00000-bbf3389a-a3ed-4008-babf-f89139cffa93-c000.snappy.parquet"
df = pd.read_parquet(dtm_path)
df[df["_cik"] == "0001590955"]

Unnamed: 0,Date,_cik,_vol,_ret,_vol+1,_ret+1,united,state,security,exchange,...,mollie,chandler,grime,disappearance,soldier,predetermination,pristine,opportune,microgram,persimmon
6571,2020-02-13,1590955,6.3e-05,0.051048,0.000455,-0.031368,22.0,66.0,88.0,34.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
6572,2021-02-18,1590955,5e-05,0.013913,0.000342,0.005433,23.0,74.0,100.0,35.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
6573,2022-02-17,1590955,0.000169,-0.05534,0.000758,0.034341,23.0,75.0,94.0,39.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
6574,2023-02-16,1590955,0.000136,-0.066576,0.00022,0.006069,25.0,83.0,108.0,37.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
6575,2024-02-15,1590955,8e-05,0.02306,0.000281,0.013999,36.0,92.0,184.0,37.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [11]:
dtm_path = "/Users/apple/PROJECT/hons_project/data/SP500/analysis_reports/dtm/part-00000-022ba5b1-fd75-4e76-997d-22d526f5a2cf-c000.snappy.parquet"
df_500 = pd.read_parquet(dtm_path)
df_500[df_500["_cik"] == "0001841968"]

Unnamed: 0,Date,_cik,_vol,_ret,_vol+1,_ret+1,p,em,fry,author,...,mediterranean,panorama,barbary,ape,retrospective,unfolded,wed,pensive,haired,wander


In [15]:
df_500[df_500["_cik"] == "0001755672"]

Unnamed: 0,Date,_cik,_vol,_ret,_vol+1,_ret+1,p,em,fry,author,...,mediterranean,panorama,barbary,ape,retrospective,unfolded,wed,pensive,haired,wander
73974,2019-06-07,1755672,2.3e-05,-0.002731,0.001409,0.029586,16.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
73975,2019-08-22,1755672,4.5e-05,0.00607,0.000756,0.02549,13.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
73976,2019-12-06,1755672,1.2e-05,-0.003493,0.000294,-0.01171,5.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
73977,2019-12-09,1755672,1.3e-05,0.002001,0.000116,-0.006982,9.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
73978,2020-03-23,1755672,0.004477,0.13173,0.004266,0.029998,5.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
73979,2020-03-26,1755672,0.002677,-0.194289,0.002949,-0.004318,6.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
73980,2020-07-14,1755672,6.4e-05,0.005803,0.000186,-0.002867,14.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
73981,2020-07-22,1755672,0.000135,-0.009268,0.000156,-0.007,12.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
73982,2021-02-03,1755672,6.4e-05,0.00702,0.000916,-0.01917,10.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
73983,2021-05-06,1755672,7.3e-05,-0.008442,0.000188,-0.011942,2.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [65]:
df_500[df_500["_cik"] == "0000315293"]


Unnamed: 0,Date,_cik,_vol,_ret,_vol+1,_ret+1,p,em,fry,author,...,panorama,barbary,ape,retrospective,unfolded,wed,pensive,haired,wander,Year
9695,2011-02-10,315293,7.3e-05,-0.014854,7.8e-05,-0.011694,7.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2011
9696,2012-01-30,315293,5.4e-05,-0.007914,3.7e-05,-0.005383,3.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2012
9697,2012-02-16,315293,7.3e-05,-0.009824,5.9e-05,0.0044,5.0,1.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2012
9698,2012-03-05,315293,7.4e-05,0.006422,5.1e-05,-0.002337,5.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2012
9699,2019-07-17,315293,5.7e-05,-0.015184,0.000105,-0.009228,12.0,4.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2019
9700,2020-11-05,315293,0.000407,-0.110536,0.000495,-0.020648,5.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2020
9701,2022-02-14,315293,0.000761,0.049371,0.000228,-0.002005,4.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2022
9702,2022-12-06,315293,7.9e-05,0.019743,0.000131,0.012612,5.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2022
9703,2024-02-05,315293,0.000206,-0.017352,0.000219,-0.018554,3.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,2024
9704,2024-05-28,315293,0.000102,0.018158,6.6e-05,0.008736,3.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2024


In [66]:
df_500["Date"] = pd.to_datetime(df_500["Date"], errors="coerce")
df_500["Year"] = df_500["Date"].dt.year
df_500[(df_500["Year"] == 2024) & (df_500["_cik"] == "0000315293")]


Unnamed: 0,Date,_cik,_vol,_ret,_vol+1,_ret+1,p,em,fry,author,...,panorama,barbary,ape,retrospective,unfolded,wed,pensive,haired,wander,Year
9703,2024-02-05,315293,0.000206,-0.017352,0.000219,-0.018554,3.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,2024
9704,2024-05-28,315293,0.000102,0.018158,6.6e-05,0.008736,3.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2024


In [16]:
path_parquet = "/Users/apple/PROJECT/hons_project/data/SP500/analysis_reports/company_df/0001755672/part-00000-9c784797-ba40-4abe-8295-f33cfb7500dd-c000.snappy.parquet"
path_parquet_calls = "/Users/apple/PROJECT/hons_project/data/SP500/transcripts/company_df/0000320193/part-00000-b5526bb2-5fef-4cce-8c04-a26427082563-c000.snappy.parquet"
path_10k = "/Users/apple/PROJECT/hons_project/data/SP500/10K/company_df/0000320193/part-00000-b0419882-0b54-4c6e-9f44-61b83c589735-c000.snappy.parquet"
path_10q = "/Users/apple/PROJECT/hons_project/data/SP500/10Q/company_df/0000320193/part-00000-ab04ad3d-2681-4ca4-bfef-6965ec70a9c4-c000.snappy.parquet"
df = pd.read_parquet(path_parquet)
df.head()
df["Date"] = pd.to_datetime(df["Date"], errors="coerce")
df["Year"] = df["Date"].dt.year
df[df["Year"] == 2024]

Unnamed: 0,Name,CIK,Date,Body,Year
34,CTVA,1755672,2024-02-02,"<p><figure class=""getty-figure"" data-type=""get...",2024
35,CTVA,1755672,2024-03-06,"<p><figure class=""getty-figure"" data-type=""get...",2024
36,CTVA,1755672,2024-04-02,"<p><figure class=""getty-figure"" data-type=""get...",2024
37,CTVA,1755672,2024-06-18,"<figure class=""getty-figure"" data-type=""getty-...",2024
38,CTVA,1755672,2024-09-05,"<p><figure class=""getty-figure"" data-type=""get...",2024
39,CTVA,1755672,2024-09-16,"<p><figure class=""getty-figure"" data-type=""get...",2024
40,CTVA,1755672,2024-11-10,"<p><figure class=""getty-figure"" data-type=""get...",2024


In [17]:
path_parquet_processed_calls = "/Users/apple/PROJECT/hons_project/data/SP500/transcripts/processed/dtm_0000320193.parquet"
path_parquet_processed = "/Users/apple/PROJECT/hons_project/data/SP500/analysis_reports/processed/dtm_0001755672.parquet"
path_10k_processed = "/Users/apple/PROJECT/hons_project/data/SP500/10K/processed/dtm_0000320193.parquet"
path_10q_processed = "/Users/apple/PROJECT/hons_project/data/SP500/10Q/processed/dtm_0000320193.parquet"

df_processed = pd.read_parquet(path_parquet_processed)
df_processed.head()
df_processed["Date"] = pd.to_datetime(df_processed["Date"], errors="coerce")
df_processed["Year"] = df_processed["Date"].dt.year
df_processed[df_processed["Year"] == 2024]

Unnamed: 0,Date,_cik,_vol,_ret,_vol+1,_ret+1,bargain,value,yet,apparent,...,highlight,open,committee,preferred,entity,majority,exception,office,bet,Year
27,2024-02-02,1755672,0.000324,0.023478,9.5e-05,0.013423,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2024
28,2024-03-06,1755672,7.3e-05,0.001295,9e-05,-0.001102,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2024
29,2024-04-02,1755672,0.0001,0.01096,3.6e-05,-0.00349,0.0,1.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2024
30,2024-06-18,1755672,0.000272,0.007488,0.000226,-0.01133,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2024
31,2024-09-05,1755672,0.000216,0.013517,7.2e-05,-0.007438,0.0,0.0,0.0,0.0,...,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2024
32,2024-09-16,1755672,0.000218,0.011134,4.2e-05,-0.006684,0.0,0.0,0.0,0.0,...,0.0,1.0,1.0,1.0,1.0,1.0,1.0,0.0,0.0,2024


: 

In [None]:
def transform_parquet_to_yf_format(parquet_path):
    # Read the Parquet file
    df = pd.read_parquet(parquet_path)
    df.rename(columns={'date': 'Date'}, inplace=True)
    # Ensure the 'date' column is a datetime type
    df['Date'] = pd.to_datetime(df['Date'])
    
    # Set the 'date' column as the index
    df.set_index('Date', inplace=True)
    
    # Rename columns to match yfinance format
    df.rename(columns={
        'symbol': 'Symbol',
        'open': 'Open',
        'high': 'High',
        'low': 'Low',
        'close': 'Close',
        'volume': 'Volume'
    }, inplace=True)
    
    # Add 'Adj Close' column if it doesn't exist
    if 'Adj Close' not in df.columns:
        df['Adj Close'] = df['Close']
    
    return df




Unnamed: 0_level_0,Symbol,Volume,Close,Open,High,Low,Adj Close
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
1980-12-12,AAPL,469033600,0.098834,0.098834,0.099264,0.098834,0.098834
1980-12-15,AAPL,175884800,0.093678,0.094108,0.094108,0.093678,0.093678
1980-12-16,AAPL,105728000,0.086802,0.087232,0.087232,0.086802,0.086802
1980-12-17,AAPL,86441600,0.088951,0.088951,0.089381,0.088951,0.088951
1980-12-18,AAPL,73449600,0.091530,0.091530,0.091959,0.091530,0.091530
...,...,...,...,...,...,...,...
2024-12-27,AAPL,42355300,255.589996,257.829987,258.700012,253.059998,255.589996
2024-12-30,AAPL,35557500,252.199997,252.229996,253.500000,250.750000,252.199997
2024-12-31,AAPL,39480700,250.419998,252.440002,253.279999,249.429993,250.419998
2025-01-02,AAPL,55740700,243.850006,248.929993,249.100006,241.820007,243.850006


In [105]:
start_date = "2000-01-01"
end_date = "2026-01-01"
path_price = "/Users/apple/PROJECT/data/stock_price_daily.parquet"
df_price = transform_parquet_to_yf_format(path_price)
time_series = df_price[df_price["Symbol"] == "AAPL"]
time_series = time_series.loc[start_date:end_date]
time_series

Unnamed: 0_level_0,Symbol,Volume,Close,Open,High,Low,Adj Close
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2000-01-03,AAPL,535796800,0.843076,0.789884,0.847313,0.765877,0.843076
2000-01-04,AAPL,512377600,0.771997,0.815303,0.833191,0.762111,0.771997
2000-01-05,AAPL,778321600,0.783293,0.781411,0.832720,0.775762,0.783293
2000-01-06,AAPL,767972800,0.715509,0.799299,0.805889,0.715509,0.715509
2000-01-07,AAPL,460734400,0.749401,0.726806,0.760699,0.719275,0.749401
...,...,...,...,...,...,...,...
2024-12-27,AAPL,42355300,255.589996,257.829987,258.700012,253.059998,255.589996
2024-12-30,AAPL,35557500,252.199997,252.229996,253.500000,250.750000,252.199997
2024-12-31,AAPL,39480700,250.419998,252.440002,253.279999,249.429993,250.419998
2025-01-02,AAPL,55740700,243.850006,248.929993,249.100006,241.820007,243.850006


In [106]:
import numpy as np
def vol_reader():
    start_date = "2000-01-01"
    end_date = "2026-01-01"
    path_price = "/Users/apple/PROJECT/data/stock_price_daily.parquet"
    df_price = transform_parquet_to_yf_format(path_price)
    time_series = df_price[df_price["Symbol"] == "AAPL"]
    time_series = time_series.loc[start_date:end_date]
    def vol_proxy(ret, proxy):
        proxies = ['sqaured return', 'realized', 'daily-range', 'return']
        assert proxy in proxies, f'proxy should be in {proxies}'
        if proxy == 'realized':
            raise 'Realized volatility proxy not yet implemented'
        elif proxy == 'daily-range':
            ran = np.log(ret['High']) - np.log(ret['Low'])
            adj_factor = 4 * np.log(2)
            return np.square(ran)/adj_factor
        elif proxy == 'return':
            def ret_fun(xt_1, xt):
                return np.log(xt_1/xt) ### used to xt/xt_1
            return ret_fun(ret['Open'], ret['Close'])
        else:
            assert proxy == 'squared return'
            raise 'Squared return proxy not yet implemented'
        
    vol_list = []
    for p in ['daily-range', 'return']:
        vol = vol_proxy(time_series, p)
        vol_list.append(vol.to_frame())
    
    df_vol = pd.concat(vol_list, axis=1)
    df_vol.columns = ['_vol', '_ret']
    df_vol = df_vol.reset_index()
    df_vol['_vol+1'] = df_vol['_vol'].shift(-1)
    df_vol['_ret+1'] = df_vol['_ret'].shift(-1)
    df_vol = df_vol.dropna()
    df_vol.set_index('Date', inplace=True)
    return df_vol

In [111]:
vol_reader()

Unnamed: 0_level_0,_vol,_ret,_vol+1,_ret+1
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-03,0.003683,-0.065171,0.002868,0.054580
2000-01-04,0.002868,0.054580,0.001811,-0.002407
2000-01-05,0.001811,-0.002407,0.005103,0.110741
2000-01-06,0.005103,0.110741,0.001131,-0.030615
2000-01-07,0.001131,-0.030615,0.002093,0.042559
...,...,...,...,...
2024-12-26,0.000033,-0.003209,0.000175,0.008726
2024-12-27,0.000175,0.008726,0.000043,0.000119
2024-12-30,0.000043,0.000119,0.000085,0.008034
2024-12-31,0.000085,0.008034,0.000317,0.020618


In [None]:
def compute_file_hash(file_path, chunk_size=65536):
    """Serializable helper function for computing file hash."""
    md5 = hashlib.md5()
    with open(file_path, 'rb') as f:
        while True:
            data = f.read(chunk_size)
            if not data:
                break
            md5.update(data)
    return md5.hexdigest()

def get_file_modified_time(file_path):
    """Serializable helper function for getting file modification time."""
    epoch_time = os.path.getmtime(file_path)
    return datetime.datetime.fromtimestamp(epoch_time)


def _upsert_metadata(session, meta_dict, cik):
    """Helper to insert or update the FileMetadata row."""
    file_path = meta_dict["file_path"]
    record = session.query(FileMetadata).filter_by(file_path=file_path).first()
    if record:
        record.file_hash = meta_dict["file_hash"]
        record.last_modified = meta_dict["last_modified"]
        record.is_deleted = False
    else:
        new_record = FileMetadata(
            file_path=file_path,
            last_modified=meta_dict["last_modified"],
            file_hash=meta_dict["file_hash"],
            is_deleted=False,
            cik = cik
        )
        session.add(new_record)

In [129]:
import os
import pandas as pd
import logging
from hons_project.annual_report_reader import reader
from hons_project.vol_reader_fun import vol_reader

def run_process_for_cik(cik, save_folder, folder_path, start_date, end_date, firms_csv_file_path):
    """
    Simplified worker function for debugging:
    1) Loads firms data.
    2) Processes parquet files for the CIK.
    3) Merges with volatility data and writes output.
    No metadata or database interactions.
    """
    # Load firms data
    firms_df = pd.read_csv(firms_csv_file_path)
    firms_df['CIK'] = firms_df['CIK'].apply(lambda x: str(x).zfill(10))
    firms_dict = firms_df.set_index('Symbol')['CIK'].to_dict()
    firms_dict = {cik: symbol for symbol, cik in firms_dict.items()}

    # Build paths
    cik_folder = os.path.join(folder_path, cik)
    if not os.path.exists(cik_folder):
        logging.info(f"[{cik}] No folder found at {cik_folder}")
        return {"cik": cik, "output_file": None}

    try:
        # Identify parquet files
        all_files = [
            os.path.join(cik_folder, f)
            for f in os.listdir(cik_folder)
            if f.endswith('.parquet')
        ]

        # Process new files (no metadata check, just run all)
        processed_dataframes = [
            reader(os.path.basename(file_path), file_loc=cik_folder)
            for file_path in all_files
            if reader(os.path.basename(file_path), file_loc=cik_folder) is not None
        ]

        if not processed_dataframes:
            logging.info(f"[{cik}] No valid dataframes from reader")
            return {"cik": cik, "output_file": None}

        # Combine and process
        combined = pd.concat(processed_dataframes)
        logging.info(f"[{cik}] Post-concat rows: {len(combined)}, Dates: {combined[combined.columns[0]].min()} to {combined[combined.columns[0]].max()}")

        # Add volatility data
        vol_data = vol_reader(cik, firms_dict, start_date=start_date, end_date=end_date)
        logging.info(f"[{cik}] Vol data rows: {len(vol_data)}, Dates: {vol_data.index.min()} to {vol_data.index.max()}")

        # Merge
        combined.reset_index(inplace=True)
        first_column_name = combined.columns[0]
        combined = pd.merge(combined.rename(columns={first_column_name: "Date"}), 
                            vol_data.reset_index(), how="inner", on="Date")
        logging.info(f"[{cik}] Post-merge rows: {len(combined)}, Dates: {combined['Date'].min()} to {combined['Date'].max()}")

        # Add CIK and reorder columns
        combined["_cik"] = cik
        columns_to_move = ['Date', '_cik', '_vol', '_ret', '_vol+1', '_ret+1']
        remaining_cols = [col for col in combined.columns if col not in columns_to_move]
        combined = combined[columns_to_move + remaining_cols]

        # Filter invalid rows
        combined = combined[combined["_ret"].notna()]
        logging.info(f"[{cik}] Post-_ret filter rows: {len(combined)}, Dates: {combined['Date'].min()} to {combined['Date'].max()}")

        # Handle existing data
        save_path = os.path.join(save_folder, 'test/processed')
        os.makedirs(save_path, exist_ok=True)
        out_file_path = os.path.join(save_path, f"dtm_{cik}.parquet")
        
        if os.path.exists(out_file_path):
            existing_files_df = pd.read_parquet(out_file_path, engine='pyarrow')
            combined = pd.concat([existing_files_df, combined])
            combined.drop_duplicates(inplace=True)
            combined = combined.fillna(0.0)
            logging.info(f"[{cik}] Post-concat+dedupe rows: {len(combined)}, Dates: {combined['Date'].min()} to {combined['Date'].max()}")

        # Write output
        combined.to_parquet(out_file_path, index=False)
        logging.info(f"[{cik}] Written to {out_file_path}")

        return {"cik": cik, "output_file": out_file_path}

    except Exception as e:
        logging.error(f"[{cik}] Error: {e}")
        return {"cik": cik, "output_file": None}

In [131]:
cik = "0000320193"
save_folder = "/Users/apple/PROJECT/hons_project/data/SP500/analysis_reports"
start_date = '2000-01-01'
end_date = '2026-01-01'
folder = 'company_df'
folder_path = os.path.join(save_folder, folder)
firms_csv_file_path = '../Code_4_SECfilings/sp500_total_constituents_final.csv'

run_process_for_cik(cik, save_folder, folder_path, start_date, end_date, firms_csv_file_path)

preprocessing...


100%|██████████| 1578/1578 [00:03<00:00, 523.64it/s]
100%|██████████| 1578/1578 [00:53<00:00, 29.27it/s]


preprocessing...


100%|██████████| 1578/1578 [00:02<00:00, 625.89it/s]
100%|██████████| 1578/1578 [00:53<00:00, 29.31it/s] 


{'cik': '0000320193',
 'output_file': '/Users/apple/PROJECT/hons_project/data/SP500/analysis_reports/processed/dtm_0000320193.parquet'}

In [110]:
from hons_project.annual_report_reader import reader
file_path = "/Users/apple/PROJECT/hons_project/data/SP500/analysis_reports/company_df/0000320193/part-00000-678a8694-90d2-45f3-8ad9-0e833bfe8448-c000.snappy.parquet"
df = reader(os.path.basename(file_path), file_loc="/Users/apple/PROJECT/hons_project/data/SP500/analysis_reports/company_df/0000320193")
print(df)

Downloading NLTK data: wordnet


[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/apple/PROJECT/package/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to
[nltk_data]     /Users/apple/PROJECT/package/nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /Users/apple/PROJECT/package/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


Downloading NLTK data: omw-1.4
Downloading NLTK data: averaged_perceptron_tagger
preprocessing...


100%|██████████| 1578/1578 [00:02<00:00, 618.86it/s]
100%|██████████| 1578/1578 [00:52<00:00, 30.30it/s] 


              p  according  sale  new  despite  shockingly  approval  medium  \
2005-09-15  6.0        2.0   2.0  1.0      2.0         1.0       1.0     3.0   
2005-10-11  3.0        0.0   0.0  0.0      0.0         0.0       0.0     0.0   
2005-10-12  5.0        0.0   1.0  4.0      0.0         0.0       0.0     0.0   
2005-10-12  7.0        0.0   2.0  1.0      0.0         0.0       0.0     0.0   
2005-10-13  1.0        0.0   0.0  0.0      0.0         0.0       0.0     0.0   
...         ...        ...   ...  ...      ...         ...       ...     ...   
2025-01-30  3.0        1.0   0.0  1.0      0.0         0.0       0.0     0.0   
2025-01-31  8.0        0.0   0.0  0.0      0.0         0.0       0.0     0.0   
2025-01-31  4.0        0.0   0.0  0.0      0.0         0.0       0.0     0.0   
2025-01-31  4.0        0.0   0.0  0.0      0.0         0.0       0.0     0.0   
2025-02-01  4.0        0.0   0.0  0.0      0.0         0.0       0.0     0.0   

            lot  stalwart  ...  precipi

In [116]:
df.head(1545)

Unnamed: 0,p,according,sale,new,despite,shockingly,approval,medium,lot,stalwart,...,precipice,precondition,mag,inauguration,brock,titanium,chine,daughter,joyfully,ake
2005-09-15,6.0,2.0,2.0,1.0,2.0,1.0,1.0,3.0,1.0,1.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2005-10-11,3.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2005-10-12,5.0,0.0,1.0,4.0,0.0,0.0,0.0,0.0,0.0,1.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2005-10-12,7.0,0.0,2.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2005-10-13,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2024-12-20,5.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2024-12-28,5.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2024-12-31,3.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2025-01-02,4.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
