In [1]:
import polars as pl
import arrow
import finnhub
from limit import limit

In [2]:
data_directory = "./data/daily"
finnhub_api_key = "............"

In [8]:
df = (pl.scan_parquet(f"{data_directory}/*.parquet")
    .groupby("Symbol")
    .agg(
        pl.col("Date").min().alias("First Date"),
        pl.col("Date").max().alias("Last Date")
    )
    .collect()
)
print(df)

shape: (3, 3)
┌────────┬─────────────────────┬─────────────────────┐
│ Symbol ┆ First Date          ┆ Last Date           │
│ ---    ┆ ---                 ┆ ---                 │
│ str    ┆ datetime[ns]        ┆ datetime[ns]        │
╞════════╪═════════════════════╪═════════════════════╡
│ SNOW   ┆ 2020-09-16 00:00:00 ┆ 2023-08-10 20:00:00 │
│ TLT    ┆ 2010-01-04 00:00:00 ┆ 2023-08-10 20:00:00 │
│ AMD    ┆ 2010-01-04 00:00:00 ┆ 2023-08-10 20:00:00 │
└────────┴─────────────────────┴─────────────────────┘


In [4]:
@limit(60,60)
def fetch_finnhub(symbol:str, starttime:arrow, endtime:arrow, finnhub_api_key:str, interval:str='D') -> dict:
    finnhub_client = finnhub.Client(api_key=finnhub_api_key)
    content = finnhub_client.stock_candles(
        symbol,
        interval,
        int(starttime.datetime.timestamp()),
        int(endtime.datetime.timestamp())
    )
    if 'no_data' in content.values():
        return None
    return content

In [5]:

def convert_finnhub_to_polars(symbol:str, content:dict)-> pl.DataFrame:
    df = (pl.from_dict(content)
        .with_columns([
            # finnhub timestamps are in unix time (seconds) in GMT so 
            # we have to replace the timezone, convert to ET and then remove it
            pl.from_epoch("t")
                .dt.replace_time_zone("GMT")
                .dt.convert_time_zone("US/Eastern")
                .dt.replace_time_zone(None)
                # we'll have to change this to use pl.Date but since the data
                # have is already using Datetime[ns] we'll stick with this
                .cast(pl.Datetime).dt.cast_time_unit("ns")
                .alias("Date"),

            # add the symbol as a column for easier grouping
            pl.lit(symbol).alias("Symbol"),

            # cast the types to ensure all of the data is homogenous and change the names
            pl.col("v").cast(pl.Int64).alias("Volume"),
            pl.col("c").cast(pl.Float64).alias("Close"),
            # finnhub doen not have an adjusted close like yahoo so we'll
            #  have to substitute it with the close
            pl.col("c").cast(pl.Float64).alias("Adj Close"),
            pl.col("h").cast(pl.Float64).alias("High"),
            pl.col("l").cast(pl.Float64).alias("Low"),
            pl.col("o").cast(pl.Float64).alias("Open"),
        ])
        .select([
            pl.col('Date'),
            pl.col('Open'),
            pl.col('High'),
            pl.col('Low'),
            pl.col('Close'),
            pl.col('Adj Close'),
            pl.col('Volume'),
            pl.col('Symbol')
        ])
    )
    return df

In [6]:
def merge_and_save(df:pl.DataFrame, filename:str) -> None:
    '''
    Merge the dataframe with the historicals.
    '''
    (pl
        .read_parquet(filename)
        # use vstack to append the latest data
        .vstack(df)
        # if there happens to be an overlap with the data, the use the
        #    unique function and keep the most recent
        .unique(subset='Date', keep='last')
        # write everthing out
        .write_parquet(filename)
    )

In [7]:
# using the data from for what we currently have on disk
for item in df.iter_rows(named=True):
    # get the symbol since we'll need that for the finnhum request
    symbol = item.get("Symbol")
    
    # get the last date of the data
    last_date = arrow.get(item.get("Last Date"))
    # fetch the raw content from finnhub. Notice that we use the now()
    # function from arrow to get today's date.
    data = fetch_finnhub(symbol, last_date, arrow.now(), finnhub_api_key)
    # convert the response to a polars dataframe
    df_convert = convert_finnhub_to_polars(symbol, data)
    # merge and save the data
    merge_and_save(df_convert, f"{data_directory}/{symbol}.parquet")