In [7]:
import os
import json
import polars as pl
from tqdm import tqdm
import yfinance as yf
from collections import Counter
import pandas as pd
import numpy as np
import pandas_ta_remake as ta
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed

In [8]:
#expected to be a directory with submission, companyfacts, ticker.txt and company_tickers.json all available on SEC website

main_dir = "Output_dir"
temp_data_dir= "data"
stock_data_dir ="stock_data"
data_dir = os.path.join(main_dir, "Data")
submissions_dir = os.path.join(main_dir, "submission")
company_facts_dir = os.path.join(main_dir, "companyfacts")

In [9]:
cik_ticker_cvs = os.path.join(main_dir, "CIK.csv")

ticker_df = pl.read_csv(cik_ticker_cvs).with_columns(
    pl.col("cik_str").cast(pl.Utf8).str.zfill(10))

In [5]:
def extract_facts_to_csv(ticker_df, source_dir, output_folder):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    valid_ciks = set(ticker_df["cik_str"].cast(str))
    file_list = [f for f in os.listdir(source_dir) if f.endswith(".json")]
    
    for file_name in tqdm(file_list, desc="Processing files"):
        cik_number = file_name.replace("CIK", "").split(".")[0]
        
        if cik_number not in valid_ciks:
            #print(f"Skipping {file_name} as its CIK is not in the ticker dataframe.")
            continue
        
        file_path = os.path.join(source_dir, file_name)
        try:
            with open(file_path, "r") as file:
                data = json.load(file)

            output_csv = os.path.join(output_folder, f"{os.path.splitext(file_name)[0]}.csv")
            
            facts = data.get("facts", {}).get("us-gaap", {})
            rows = []

            for field, field_data in facts.items():
                units = field_data.get("units", {})
                
                for unit_type, entries in units.items(): 
                    for entry in entries:
                        entry["field"] = field
                        entry["unit_type"] = unit_type
                        rows.append(entry)

            if rows:
                df = pl.DataFrame(rows)
                df.write_csv(output_csv)
        except Exception as e:
            print(f"Error processing file {file_name}: {e}")
    
def count_unique_metrics(source_dir, output_csv="unique_metrics.csv"):
    metric_counts = Counter()

    for file_name in os.listdir(source_dir):
        if file_name.endswith(".csv"):
            file_path = os.path.join(source_dir, file_name)
            
            try:
                df = pl.read_csv(file_path)
                if "field" in df.columns:
                    metrics = df["field"].drop_nulls().to_list()
                    metric_counts.update(metrics) 
            except Exception as e:
                print(f"Error processing {file_name}: {e}")

    output_df = pl.DataFrame({
        "metric": list(metric_counts.keys()),
        "amount": list(metric_counts.values())
    })


    output_df.write_csv(output_csv)

    print(f"Unique metric values and their counts saved to {output_csv}")

def filter_csv_files(metric_counts_csv, directory):
    metric_counts = pl.read_csv(metric_counts_csv).filter(pl.col("amount") >= 380000)
    
    valid_metrics = set(metric_counts["metric"].to_list())

    # Process each CSV file
    for file_name in os.listdir(directory):
        if file_name.endswith(".csv"):
            file_path = os.path.join(directory, file_name)
            
            try:
                df = pl.read_csv(file_path)
                
                if "field" in df.columns:
                    df_filtered = df.filter(pl.col("field").is_in(valid_metrics))
                    
                    if len(df_filtered) < 1000:
                        os.remove(file_path) 
                        print(f"Deleted {file_name} (fewer than 1000 rows after filtering).")
                    else:
                        df_filtered.write_csv(file_path)
                        print(f"Filtered {file_name}: {len(df)} -> {len(df_filtered)} rows")
            except Exception as e:
                print(f"Error processing {file_name}: {e}")
                
def refactor_facts_data(source_dir, output_dir):
    os.makedirs(output_dir, exist_ok=True)
    
    for file in os.listdir(source_dir):
        if file.endswith(".csv"):
            file_path = os.path.join(source_dir, file)
            output_file_path = os.path.join(output_dir, file)
            
            if os.path.exists(output_file_path):
                try:
                    existing_df = pl.read_csv(output_file_path)
                    if 'end' in existing_df.columns and 'field' not in existing_df.columns:
                        print(f"Skipping already processed file: {file}")
                        continue
                except Exception as e:
                    print(f"Error reading existing processed file {file}: {e}. Re-processing...")
            
            try:
                df = pl.read_csv(file_path)
                
                if not {'end', 'field', 'val'}.issubset(df.columns):
                    print(f"Skipping {file}: Missing required columns.")
                    continue
                
                df_pivot = df.pivot(index='end', on='field', values='val', aggregate_function='first')
                
                df_pivot.write_csv(output_file_path)
                print(f"Processed: {file}")
            except Exception as e:
                print(f"Error processing {file}: {e}. Deleting file...")
                if os.path.exists(output_file_path):
                    os.remove(output_file_path)
            
def download_stock_data(source_dir, target_folder, ticker_df, failed_log="failed_tickers.csv"):
    if not os.path.exists(target_folder):
        os.makedirs(target_folder)
    
    failed_tickers = []
    
    for file in os.listdir(source_dir):
        if file.startswith("CIK") and file.endswith(".csv"):
            cik_str = file[3:13]
            output_path = os.path.join(target_folder, file)
            
            if os.path.exists(output_path):
                continue

            row = ticker_df.filter(ticker_df['cik_str'] == cik_str)
            if row.is_empty():
                failed_tickers.append([file, "No matching ticker"])
                continue
            
            ticker = row['ticker'][0]
            
            try:
                stock_data = yf.download(ticker, start="2009-01-01", end="2024-12-31", interval="1wk")
                if stock_data.empty:
                    failed_tickers.append([file, "No data available"])
                    continue
                
                stock_data.reset_index(inplace=True)

                pl_df = pl.from_pandas(stock_data)
                pl_df.write_csv(output_path)
            except Exception as e:
                failed_tickers.append([file, str(e)])
    
    if failed_tickers:
        failed_df = pl.DataFrame(failed_tickers, schema=["file_name", "Reason"])
        failed_df.write_csv(failed_log)
    
    print("Download complete.")
    
def refactor_stock_data(directory_path, max_workers=4):
    file_names = [
        f for f in os.listdir(directory_path)
        if f.endswith(".csv")
    ]

    def process_file(file_name):
        try:
            file_path = os.path.join(directory_path, file_name)
            df = pl.read_csv(file_path)

            if df.height < 120:
                os.remove(file_path)
                print(f"Deleted {file_name} (less than 120 entries)")
                return

            if df.width < 5:
                print(f"Skipped {file_name} (less than 5 columns)")
                return

            df = df.rename({
                df.columns[0]: "date",
                df.columns[1]: "close",
                df.columns[2]: "high",
                df.columns[3]: "low",
                df.columns[4]: "open",
                df.columns[5]: "volume"
            })

            # Clean up date
            df = df.with_columns(df["date"].str.split("T").list.get(0).alias("date"))
            df = df.with_columns(df["date"].str.split(" ").list.get(0).alias("date"))

            df.write_csv(file_path)

        except Exception as e:
            print(f"Error processing {file_name}: {e}")

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(process_file, f): f for f in file_names}
        for _ in tqdm(as_completed(futures), total=len(futures), desc="Refactoring stock data"):
            pass
            # print(f"Processed {file_name}")
            
def combine_files(temp_dir, stock_dir, output_dir):
    os.makedirs(output_dir, exist_ok=True)
    temp_files = {f for f in os.listdir(temp_dir) if f.endswith(".csv")}
    stock_files = {f for f in os.listdir(stock_dir) if f.endswith(".csv")}
    common_files = temp_files.intersection(stock_files)
    def process_pair(file):
        temp_path = os.path.join(temp_dir, file)
        stock_path = os.path.join(stock_dir, file)

        df_temp = pl.read_csv(temp_path, try_parse_dates=True)
        df_stock = pl.read_csv(stock_path, try_parse_dates=True)

        df_merged = df_temp.join(df_stock, left_on="end", right_on="date", how="outer")

        df_merged = df_merged.with_columns(
            pl.when(df_merged["date"].is_null())
            .then(df_merged["end"])
            .otherwise(df_merged["date"])
            .alias("date")
        )

        df_merged = df_merged.drop("end")

        output_path = os.path.join(output_dir, file)
        df_merged.write_csv(output_path)
        # print(f"Processed: {file}")

    with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
        futures = {executor.submit(process_pair, file): file for file in common_files}
        for _ in tqdm(as_completed(futures), total=len(futures), desc="Combining files"):
            pass

def fill_nulls(source_dir):
    def process_file(file_path):
        df = pl.read_csv(file_path)

        df = df.with_columns(
            pl.col("date").str.strptime(pl.Date, "%Y-%m-%d").alias("date")
        )

        df = df.sort(by="date")

        df = df.with_columns(
            pl.col("volume").fill_null(0).alias("volume")
        )

        df = df.with_columns(
            pl.col("close").fill_null(strategy="forward").alias("close"),
            pl.col("high").fill_null(strategy="forward").alias("high"),
            pl.col("low").fill_null(strategy="forward").alias("low"),
            pl.col("open").fill_null(strategy="forward").alias("open"),
        )
        df = df.with_columns(
            df[:, :27].with_columns(pl.all().fill_null(strategy="forward"))
        )
        df = df.drop_nulls()

        df.write_csv(file_path)
        print(f"Processed: {file_name}")

    csv_files = [
        os.path.join(source_dir, file_name)
        for file_name in os.listdir(source_dir)
        if file_name.endswith(".csv")
    ]
    with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
        executor.map(process_file, csv_files)
            
def add_company_cik(source_dir):    
    for file_name in tqdm(os.listdir(source_dir), desc="Processing files"):
        if file_name.endswith(".csv"):
            cik = file_name[3:13]
            
            file_path = os.path.join(source_dir, file_name)
            
            df = pl.read_csv(file_path)
            
            df = df.with_columns(pl.lit(cik).alias("CIK"))
            
            df.write_csv(file_path)
            
            # print(f"Updated {file_name} with CIK {cik}")

def add_indicators(source_dir):  # you can tweak `max_workers` if needed
    def process_file(file_path):
        df = pl.read_csv(file_path)
        if df.width > 6:
            return
        df = df.to_pandas()

        numeric_cols = df.select_dtypes(include=[np.number]).columns
        df[numeric_cols] = df[numeric_cols].astype(np.float64)

        df['date'] = pd.to_datetime(df['date'])

        # df['stoch_k'] = ta.stoch(df['high'], df['low'], df['close'], window=14)
        # df['parabolic_sar'] = ta.psar(df['high'], df['low'], df['close'])
        # ichimoku = ta.ichimoku(df['high'], df['low'], df['close'])
        # df['ichimoku_a'] = ichimoku['ichimoku_a']
        # df['ichimoku_b'] = ichimoku['ichimoku_b']
        # df['ichimoku_base_line'] = ichimoku['ichimoku_base_line']

        # SMAs & EMAs
        df['SMA_5'] = df['close'].rolling(window=10).mean()  # 2 weeks
        df['SMA_15'] = df['close'].rolling(window=15).mean()  # 3 weeks
        df["EMA_15"] = ta.ema(df["close"], window=15)  # 3 weeks
        df["EMA_30"] = ta.ema(df["close"], window=30)  # 6 weeks

        # MACD & ADX
        df = df.join(ta.macd(df["close"], window_slow=30, window_fast=15, window_sign=9))
        # df = df.join(ta.adx(df["high"], df["low"], df["close"], window=30))  # 6 weeks
        # df = df.join(ta.adx(df["high"], df["low"], df["close"], window=50))  # 10 weeks

        # RSI
        df["RSI_14"] = ta.rsi(df["close"], window=14)  # Standard (2 weeks)
        df["RSI_21"] = ta.rsi(df["close"], window=21)  # About 3 weeks

        # ROC & CCI
        df["ROC_21"] = ta.roc(df["close"], window=21)  # 4 weeks
        df["ROC_30"] = ta.roc(df["close"], window=30)  # 6 weeks
        df["CCI_30"] = ta.cci(df["high"], df["low"], df["close"], window=30)  # 6 weeks
        df["CCI_60"] = ta.cci(df["high"], df["low"], df["close"], window=60)  # 12 weeks

        # ATR
        df["ATR_14"] = ta.atr(df["high"], df["low"], df["close"], window=14)

        # OBV & MFI
        df["OBV"] = ta.obv(df["close"], df["volume"])
        df["MFI_30"] = ta.mfi(df["high"], df["low"], df["close"], df["volume"], window=30)  # 6 weeks
        df["MFI_60"] = ta.mfi(df["high"], df["low"], df["close"], df["volume"], window=60)  # 12 weeks

        df['bb_upper'] = df['SMA_15'] + 2 * df['close'].rolling(window=20).std()
        df['bb_lower'] = df['SMA_15'] - 2 * df['close'].rolling(window=20).std()
        df['bb_width'] = (df['bb_upper'] - df['bb_lower']) / df['SMA_15']

        # Target (weekly return)
        df["target"] = (df["close"].shift(-7) / df["close"]) - 1

        # Daily return and Rolling statistics
        df["daily_return"] = df["close"].pct_change()
        df["rolling_mean_5"] = df["close"].rolling(5).mean()
        df["rolling_std_5"] = df["close"].rolling(5).std()

        # Lagged values
        df["lag_1"] = df["close"].shift(1)
        df["lag_2"] = df["close"].shift(2)
        df["lag_return_1"] = df["daily_return"].shift(1)

        # Weekly trend
        df["weekly_trend"] = df["close"].shift(0) / df["close"].shift(7) - 1
        df = pl.from_pandas(df)

        #week/weekday
        df = df.sort("date")

        df = df.with_columns([
            df["date"].dt.week().alias("week"),
            df["date"].dt.weekday().alias("weekday")
        ])
        df.write_csv(file_path)

    csv_files = [
        os.path.join(source_dir, file_name)
        for file_name in os.listdir(source_dir)
        if file_name.endswith(".csv")
    ]

    with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
        executor.map(process_file, csv_files)

def change_to_weekly(source_dir):
    for file_name in os.listdir(source_dir):
        if file_name.endswith(".csv"):
            file_path = os.path.join(source_dir, file_name)
            df = pl.read_csv(file_path)
            

            
            weekly_stock = df.group_by(["year", "week"]).agg([
                pl.col("close").mean().alias("close"),
                pl.col("high").mean().alias("high"),
                pl.col("low").mean().alias("low"),
                pl.col("open").mean().alias("open"),
                pl.col("volume").sum().alias("volume")
            ])

            df_friday_indicator = df.filter(df["weekday"] == 5).drop(["weekday", "low", "high", "close", "open", "volume"])
            weekly_df = df_friday_indicator.join(weekly_stock, on=["year", "week"], how="left")
            print(f"Chnaged {file_name}")
            weekly_df.write_csv(file_path)

In [None]:
extract_facts_to_csv(ticker_df, company_facts_dir, temp_data_dir)
count_unique_metrics(temp_data_dir, ".\\unique_metrics.csv")
filter_csv_files(".\\unique_metrics.csv", temp_data_dir)

In [None]:
count_unique_metrics(temp_data_dir, ".\\unique_metrics.csv")
filter_csv_files(".\\unique_metrics.csv", temp_data_dir)

In [None]:
refactor_facts_data(temp_data_dir, temp_data_dir)

In [None]:
download_stock_data(temp_data_dir, stock_data_dir, ticker_df)
refactor_stock_data(stock_data_dir)

In [None]:
add_indicators(stock_data_dir)
refactor_stock_data(stock_data_dir)

In [None]:
# change_to_weekly(stock_data_dir)

In [6]:
combine_files(temp_data_dir, stock_data_dir, data_dir)

  df_merged = df_temp.join(df_stock, left_on="end", right_on="date", how="outer")
Combining files: 100%|██████████| 4192/4192 [00:17<00:00, 241.14it/s]


In [None]:
add_company_cik(data_dir)

In [22]:
fill_nulls(data_dir)

In [10]:
import glob
print(os.getcwd())
os.chdir(os.path.join(main_dir, "Data"))

csv_files = glob.glob("*.csv")

dataframes = []
for file in csv_files:
    df = pd.read_csv(file)

    dataframes.append(df)

merged_df = pd.concat(dataframes, ignore_index=True, sort=False)
os.chdir("..")
os.chdir("..")
print(os.getcwd())
print(os.path.join(main_dir,"all_data.csv"))
merged_df.to_csv(os.path.join(main_dir,"all_data.csv"), index=False)

/home/daniel/PycharmProjects/TFT for market prediction


  merged_df = pd.concat(dataframes, ignore_index=True, sort=False)


/home/daniel/PycharmProjects/TFT for market prediction
Output_dir/all_data.csv


In [11]:
merged_df = pl.read_csv(os.path.join(main_dir, "all_data.csv"))
print(merged_df.head())
print(merged_df.columns)
print(merged_df.shape)

shape: (5, 70)
┌───────────┬─────────────┬─────────────┬─────────────┬───┬──────┬─────────┬─────────┬─────────────┐
│ Assets    ┆ AssetsCurre ┆ CashAndCash ┆ CommonStock ┆ … ┆ week ┆ weekday ┆ CIK     ┆ GeneralAndA │
│ ---       ┆ nt          ┆ Equivalents ┆ ParOrStated ┆   ┆ ---  ┆ ---     ┆ ---     ┆ dministrati │
│ f64       ┆ ---         ┆ AtCarryi…   ┆ ValuePer…   ┆   ┆ i64  ┆ i64     ┆ i64     ┆ veExpens…   │
│           ┆ f64         ┆ ---         ┆ ---         ┆   ┆      ┆         ┆         ┆ ---         │
│           ┆             ┆ f64         ┆ f64         ┆   ┆      ┆         ┆         ┆ f64         │
╞═══════════╪═════════════╪═════════════╪═════════════╪═══╪══════╪═════════╪═════════╪═════════════╡
│ 7.5475e9  ┆ 2.9988e9    ┆ 1.4260e9    ┆ 0.0         ┆ … ┆ 40   ┆ 3       ┆ 1318605 ┆ null        │
│ 8.0925e9  ┆ 2.7916e9    ┆ 1.1969e9    ┆ 0.0         ┆ … ┆ 53   ┆ 4       ┆ 1318605 ┆ null        │
│ 1.2592e10 ┆ 5.1724e9    ┆ 3.0843e9    ┆ 0.0         ┆ … ┆ 39   ┆ 5       ┆

In [38]:
data = {
    "close": [100, 102, 101, 105, 107]
}
df = pd.DataFrame(data)

# Add the daily_diff column
df["daily_diff"] = df["close"].shift(-1) / df["close"]

print(df)

   close  daily_diff  daily_return_pct
0    100    1.020000          2.000000
1    102    0.990196         -0.980392
2    101    1.039604          3.960396
3    105    1.019048          1.904762
4    107         NaN               NaN
