In [1]:
# This module helps to select stocks to look at
# It writes raw stock data to disk

import sys
sys.path.append('..')

import os
from os.path import exists

import time
from datetime import date, timedelta
from glob import glob
import json

from etl import download_data_by_dates, delete_data_dir

import yfinance as yf # type: ignore
import yahooquery as yq # type: ignore
import pandas as pd
from pandas import DataFrame


In [2]:
type Prices = list[list[str | int]]

ranges = ["0_50", "50_100", "100_150", "150_200", "200_10000"]

def split_by_range(price_dict: dict[str, Prices], price_list: Prices):
    for range in ranges:
        start, end = range.split("_")
        price_dict[range] = [data for data in price_list if data[1] > float(start) and data[1] < float(end)]


large: dict[str, Prices] = {}
mid: dict[str, Prices] = {}

for match in glob(f'../prices/*cap*'):
    with open(match, "r") as f:
        if "large" in match:
            split_by_range(large, json.load(f))
        else:
            split_by_range(mid, json.load(f))

In [3]:
print("Large")
for range in ranges:
    print(f"{range}: {len(large[range])}")

print()

print("Mid")
for range in ranges:
    print(f"{range}: {len(mid[range])}")

Large
0_50: 97
50_100: 126
100_150: 87
150_200: 49
200_10000: 142

Mid
0_50: 151
50_100: 126
100_150: 54
150_200: 25
200_10000: 44


In [46]:
# delete_data_dir("mid")
# delete_data_dir("large")

In [8]:
# Get only ticker symbols

def get_tickers(price_dict: dict[str, Prices], range: str, dir_name: str) -> list[str]:
    # [data[0] for data in price_dict[range] if not exists(f"{dir_name}/{data[0]}")]
    return [data[0] for data in price_dict[range]]

large_0_50: list[str] = get_tickers(large, "0_50", "large_0_50")
mid_0_50: list[str] = get_tickers(mid, "0_50", "mid_0_50")

In [14]:
def get_start_end_dates(dir_name: str) -> tuple[str, str]:
    """Get start and end days for next rows of stock data"""
    try:
        today = date.today()

        dir = glob(f'{dir_name}/*')

        if len(dir) == 0:
            yrAgo = today - timedelta(days=365)
            return today.strftime("%Y-%m-%d"), yrAgo.strftime("%Y-%m-%d")

        stock_path = f"{dir[0]}/stats.csv"
        
        if exists(stock_path):
            df = pd.read_csv(stock_path, parse_dates=['Date'], index_col=['Date'])
            
            last = df.index.max()
            start = last + pd.Timedelta(days=1)
            
            return start.strftime('%Y-%m-%d'), today.strftime('%Y-%m-%d')
        
        else:
            raise Exception(f"Stock path does not exist: {stock_path}")

    except Exception as e:
        raise Exception(f"Error getting start and end dates from {dir_name}: {e}")

start, end = get_start_end_dates("large_0_50")
# start = "2023-01-01"
# end = "2023-12-31"
start, end

('2024-01-25', '2024-01-25')

In [12]:
# Get stock data as DataFrames

interval = '1d'

large_0_50_dfs = download_data_by_dates(large_0_50, start, end, interval)
mid_0_50_dfs = download_data_by_dates(mid_0_50, start, end, interval)

Getting 97 stocks
[*********************100%%**********************]  97 of 97 completed
[*********************100%%**********************]  151 of 151 completed


In [13]:
def dfs_to_disk(dfs: dict[str, DataFrame], tickers: list[str], dir_name: str):
    """Writes dataframes to disk as csv"""
    for ticker in tickers:
        stock_dir = f"{dir_name}/{ticker}"
        os.makedirs(stock_dir, exist_ok=True)

        if exists(f"{stock_dir}/stats.csv"):
            dfs[ticker].to_csv(f"{stock_dir}/stats.csv", mode='a', header=False)
        else:
            dfs[ticker].to_csv(f"{stock_dir}/stats.csv")

dfs_to_disk(large_0_50_dfs, large_0_50, "large_0_50")
dfs_to_disk(mid_0_50_dfs, mid_0_50, "mid_0_50")

In [57]:
def get_ticker_info(tickers: list[str], dir_name: str):
    """Gets ticker info from yfinance and recs from yahooquery and writes them to disk"""
    for ticker in tickers:
        try: 
            stock_dir = f"{dir_name}/{ticker}"
            os.makedirs(stock_dir, exist_ok=True)

            recs_path = f"{stock_dir}/recs.csv"
            if not exists(recs_path):
                recs: DataFrame = yq.Ticker(ticker).recommendation_trend
                recs.to_csv(recs_path)
            

            ticker_obj = yf.Ticker(ticker)

            json_path = f"{stock_dir}/info.json"
            if not exists(json_path):
                with open(json_path, "w") as f:
                    f.write(json.dumps(ticker_obj.info))
            
            earnings_dates_path = f"{stock_dir}/earnings_dates.csv"
            if not exists(earnings_dates_path):
                ticker_obj.earnings_dates.to_csv(earnings_dates_path)
            
            time.sleep(3)
        except Exception as e:
            err = f"Error getting info for {ticker}: {e}"
            print(err)
            with open("log", "w") as f:
                    f.writelines(err)

# get_ticker_info(large_0_50, "large_0_50")
get_ticker_info(mid_0_50, "mid_0_50")

  dates.loc[dates[cn] == '-', cn] = "NaN"
  dates.loc[dates[cn] == '-', cn] = "NaN"
  dates.loc[dates[cn] == '-', cn] = "NaN"
  dates.loc[dates[cn] == '-', cn] = "NaN"
  dates.loc[dates[cn] == '-', cn] = "NaN"


Error getting info for FNF: 500 Server Error: Internal Server Error for url: https://query2.finance.yahoo.com/v10/finance/quoteSummary/FNF?modules=financialData%2CquoteType%2CdefaultKeyStatistics%2CassetProfile%2CsummaryDetail&ssl=true&crumb=zqOtFS0oeRi


  dates.loc[dates[cn] == '-', cn] = "NaN"
  dates.loc[dates[cn] == '-', cn] = "NaN"
  dates.loc[dates[cn] == '-', cn] = "NaN"
  dates.loc[dates[cn] == '-', cn] = "NaN"
  dates.loc[dates[cn] == '-', cn] = "NaN"
