In [1]:
import os
from pathlib import Path
import requests
import sys
from typing import List, Optional, Tuple, Union

import numpy as np
import pandas as pd
# import pandas_datareader as pdr
import altair as alt
# import pyfredapi as pf
import battenkill.visualization.graphing as graph
import battenkill.utils.io_ as io
import battenkill.utils.equity_crypto_reader as equity_crypto_reader

In [4]:
sys.path.insert(0, (Path.cwd().parent/'battenkill').as_posix())
sys.path.insert(1, (Path.cwd().parent/'battenkill/utils').as_posix())
sys.path

# Bitcoin
Alpha Vantage API

Pull most up-to-date or load pre-existing price data, reindex, cast dates to datetime, strip numerals from column names, ensure continuous variables are of dtype `float`, begin creating derived features.  

In [2]:
cols = ['open', 'high', 'low', 'close', 'volume']
btc_json = io.contingent_load(
        path_data=(Path.cwd().parent / "data_tasks/interim/btc.json"),
        entity="crypto",
        ticker="BTC",
        function="DIGITAL_CURRENCY_DAILY",
        market="USD",
        refresh=False,
        serialize_and_save=True,
        serialization_format="json",
        unique_file=None,
    )
btc = equity_crypto_reader.security_json_to_dataframe(
    btc_json, key_="Time Series (Digital Currency Daily)", rename_={"index": "date"}
)
btc.date = pd.to_datetime(btc.date, format="%Y-%m-%d")
btc.columns = btc.columns.str.replace(r'^\d+\.\s*', '', regex=True)
btc[cols] = btc[cols].astype(float)
btc = (
    btc
    .assign(dp1=btc.close - btc.open)
    .assign(dp2=btc.high - btc.low)
    .assign(dp3=btc.close.shift() - btc.close)
    .assign(dp3p = btc.close.shift() / btc.close - 1)
)
btc.head()

Unnamed: 0,date,open,high,low,close,volume,dp1,dp2,dp3,dp3p
0,2025-07-19,118023.06,118125.87,117917.74,118075.0,54.013884,51.94,208.13,,
1,2025-07-18,119273.7,120918.68,116892.42,118023.05,8735.403864,-1250.65,4026.26,51.95,0.00044
2,2025-07-17,118681.67,120998.76,117469.0,119273.69,8360.351309,592.02,3529.76,-1250.64,-0.010485
3,2025-07-16,117781.63,120134.8,117033.23,118681.68,9250.986426,900.05,3101.57,592.01,0.004988
4,2025-07-15,119863.71,119973.3,115697.37,117781.64,23726.313561,-2082.07,4275.93,900.04,0.007642


In [5]:
btc_chart = graph.altair_ts_scatter(
    btc,
    "date",
    "close",
    _title="Bitcoin",
    y_title="Closing price (USD)",
    x_title="Date",
    tooltip_fld=["date", "close"],
    save=True,
    chart_file_stem=f"btc_{btc.date.min()}-to-present",
)
# btc_chart.show()
btc_chart

Add moving averages; considering them as guardrails, create breach indicators.

In [5]:
# function in btc.py
windows = [7, 14, 30, 200]
for wind in windows:
    btc.loc[:, 'ma' + str(wind)] = btc['close'].rolling(window=wind).mean()  # .ewm(com=0.5).mean()
    btc.loc[:, 'ma' + str(wind) + 'b'] = np.where(btc.close > btc['ma' + str(wind)], 1, 0)
    btc.loc[:, 'ma' + str(wind) + 'b_'] = np.where(btc.close < btc['ma' + str(wind)], 1, 0)
btc = (
    btc
    .assign(ma7b=np.where(btc.close > btc['ma7'], 1, 0))
)
btc.head()

Unnamed: 0,date,open,high,low,close,volume,dp1,dp2,dp3,dp3p,...,ma7b_,ma14,ma14b,ma14b_,ma30,ma30b,ma30b_,ma200,ma200b,ma200b_
0,2025-07-19,118023.06,118125.87,117917.74,118075.0,54.013884,51.94,208.13,,,...,0,,0,0,,0,0,,0,0
1,2025-07-18,119273.7,120918.68,116892.42,118023.05,8735.403864,-1250.65,4026.26,51.95,0.00044,...,0,,0,0,,0,0,,0,0
2,2025-07-17,118681.67,120998.76,117469.0,119273.69,8360.351309,592.02,3529.76,-1250.64,-0.010485,...,0,,0,0,,0,0,,0,0
3,2025-07-16,117781.63,120134.8,117033.23,118681.68,9250.986426,900.05,3101.57,592.01,0.004988,...,0,,0,0,,0,0,,0,0
4,2025-07-15,119863.71,119973.3,115697.37,117781.64,23726.313561,-2082.07,4275.93,900.04,0.007642,...,0,,0,0,,0,0,,0,0


Locate peak-to-troughs, and peak to `delta` points

In [13]:
import scipy.signal as signal

In [37]:
# len(btc)
def peak_finder(
        data: Union[pd.Series, np.ndarray],
        height: Optional[float] = None,
        threshold: Optional[Union[float, Tuple[float, float]]] = None,
        distance: Optional[float] = None,
        width: Optional[float] = None,
        prominence: Optional[float] = None,
        trough: bool = False,
) -> Tuple[np.ndarray, dict]:
    """
    Find array index of sample whose two direct neighbours have a smaller amplitude.
    
    Dynamic constraints can be implemented by passing arrays of equal length as `data_tasks`
     to multiple arguments.  The `width` argument is probably the most effective way
     of reducing the number of peaks detected. `distance` could as well, if the
     sequence is on a consistent grid.
    
    Parameters
    ----------
    data
        Array of floats.
    trough
        Whether to find troughs instead of peaks.
    height
        Minimum required; if a two-element sequence, min required and max allowed. 
    threshold
        Minimum vertical distance from neighboring samples.
    distance
        Minimum horizontal distance in samples between neighboring peaks.
    width
        Required width of peaks in samples.
    prominence
        Minimum vertical distance between the peak and its lowest contour line.
        To recover the prominence values, use scipy.signal.peak_prominences.
         
    Returns
    -------

    """
    if trough:
        data = -1 * data.copy()
    return signal.find_peaks(
        data,
        height=height,
        prominence=prominence,
        threshold=threshold
    )

def peak_to_peak(
        data: Union[pd.Series, np.ndarray], 
        peak_idx: Union[list, tuple, np.ndarray],
) -> tuple:  # List[np.ndarray, float]
    """
    Compute vertical distance between adjacent peaks.
    
    Parameters
    ----------
    data
        Original series from which peaks were identified.
    peak_idx
        Indices of peaks.
        
    Returns
    -------
        Tuple of the vertical distance between peaks, in original units
        and as a fractional change. 
    """
    # ppd: peak-to-peak distance
    ppd = np.diff(data[peak_idx])
    # return list(zip(ppd, ppd / data_tasks[peak_idx[:-1]]))
    return ppd, ppd / data[peak_idx[:-1]]

def peak_to_trough(
        data: Union[pd.Series, np.ndarray], 
        peak_idx: Union[list, tuple, np.ndarray],
        trough_idx: Union[list, tuple, np.ndarray],
) -> tuple:  # List[np.ndarray, float]
    """
    Compute vertical distance between adjacent peaks and troughs.
    
    Should be impossible for the number of (identified) peaks and
    troughs to be equal.
    
    Parameters
    ----------
    data
        Original series from which peaks were identified.
    peak_idx
        Indices of peaks.
    trough_idx
        Indices of troughs.
    Returns
    -------
        Tuple of the vertical distance between peaks, in original units
        and as a fractional change. Trough to peak differences as well.
    """
    n_peaks = len(peak_idx)
    n_troughs = len(trough_idx)
    
    # Ensure both arrays are of equal length (max of two)
    # todo: verify whether NaN should be prepended, not appended; likely depends on whether the first feature is a peak or a trough
    if n_peaks > n_troughs:
        _trough_idx = np.full(n_peaks, np.nan)
        _trough_idx[:trough_idx.shape[0]] = trough_idx
        _peak_idx = peak_idx
    else:
        _peak_idx = np.full(n_troughs, np.nan)
        left_end = peak_idx.shape[0]
        right_end = -1 * left_end
        _peak_idx[:left_end] = peak_idx  # append
        _peak_idx[:right_end] = peak_idx  # prepend
        _trough_idx = trough_idx
    
    # ptd: peak-to-trough distance
    # todo: add a trough-to-peak computation
    # todo: verify that these if statements do not also each need a second criterion:
    # trough_idx[n_troughs] > peak_idx[n_peaks]
    if _peak_idx[0] < _trough_idx[0]:  # peak before trough
        ptd = data[_trough_idx] - data[_peak_idx]
        ptdf = ptd / data[_trough_idx[:-1]]
    else:  # trough before peak
        ptd =  data[_peak_idx] - data[_trough_idx]
        ptdf = ptd / data[_peak_idx[:-1]]
    return ptd, ptdf

In [48]:
# Experimenting
source = btc['close'][:10].to_numpy()
peaks, properties = peak_finder(source)
ppd, ppdf = peak_to_peak(source, peaks)
ptd, ptdf = peak_to_trough(source, peaks)

print(f"Peaks: {list(zip(peaks, source[peaks]))}\nchange: {ptd} {ptdf}")

btc.loc[peaks, 'peak'] = 1
btc.loc[peaks[1:], 'ptd'] = ptd 
btc.loc[peaks[1:], 'ptdf'] = ptdf

buy_trigger = 0.03
sell_trigger = buy_trigger

# With the changes implemented above, including a trough-to-peak (tpd) calculation
# the sell signal should use `tpd`, not `ptd`
btc.loc[:, 'buy'] = np.where(btc['ptdf'] >= buy_trigger, 1, 0)
btc.loc[:, 'sell'] = np.where(btc['ptdf'] <= sell_trigger, 1, 0)
btc

Unnamed: 0,date,open,high,low,close,volume,dp1,dp2,dp3,dp3p,peak,ptd,ptdf,buy,sell
0,2024-08-29,59047.94,59258.03,58971.77,58974.77,61.685838,-73.17,286.26,,,,,,0,0
1,2024-08-28,59437.68,60236.98,57851.62,59045.88,10001.057065,-391.80,2385.36,-71.11,-0.001204,,,,0,0
2,2024-08-27,62840.09,63226.26,58025.49,59439.64,14193.249930,-3400.45,5200.77,-393.76,-0.006625,,,,0,0
3,2024-08-26,64250.01,64509.36,62806.80,62840.00,10741.966511,-1410.01,1702.56,-3400.36,-0.054111,,,,0,0
4,2024-08-25,64179.63,65050.08,63793.74,64251.93,4012.307646,72.30,1256.34,-1411.93,-0.021975,1.0,,,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
345,2023-09-19,26764.03,27500.00,26666.93,27216.13,14386.078847,452.10,833.07,-92.19,-0.003387,,,,0,0
346,2023-09-18,26530.96,27427.34,26382.13,26764.49,17493.708204,233.53,1045.21,451.64,0.016875,,,,0,0
347,2023-09-17,26569.69,26626.49,26405.04,26530.95,3252.269761,-38.74,221.45,233.54,0.008803,,,,0,0
348,2023-09-16,26601.71,26777.00,26453.32,26569.69,4132.048176,-32.02,323.68,-38.74,-0.001458,,,,0,0


In [22]:
peaks

array([4, 8])

## Finnhub

In [None]:
import finnhub

In [None]:
finnhub_client = finnhub.Client(api_key=)

In [None]:
print(finnhub_client.quote('SNOW'))

In [None]:
# Basic financials
print(finnhub_client.company_basic_financials('SNOW', 'all'))

# Economic data

In [None]:
extra_parameters = {
        "observation_start": '2019-01-01',
        "observation_end": '2024-07-01'
    }

In [None]:
iorb = pf.get_series(series_id='IORB', **extra_parameters).rename(columns={'value': 'iorb'})
sofr = pf.get_series(series_id='SOFR', **extra_parameters).rename(columns={'value': 'sofr'})

In [None]:
iorb.date.min()
sofr[sofr.date >= '2024-06-01']

In [None]:
sofr_iorb = sofr.merge(iorb, how='left', on='date')

In [None]:
coll = pf.SeriesCollection(series_id=['DFEDTARL', 'DFEDTARU'], **extra_parameters)

In [None]:
type(coll)
coll
coll['DFEDTARL'].df

In [None]:
fed_range = coll.DFEDTARL.df.merge(coll.DFEDTARU.df)
fed_range.date.max()
fed_range[fed_range.date >= '2023-12-01']

In [None]:
cb = fed_range.merge(sofr_iorb[['date', 'sofr', 'iorb']], how='left', on='date')
# cb

In [None]:
cb.loc[:, 'iorb_minus_sofr'] = cb.iorb - cb.sofr
cb.fillna({'iorb_minus_sofr': 0}, inplace=True)
cb.iorb_minus_sofr *= 100
# cb

In [None]:
iorb_minus_sofr_bars = graph.plot_bars(
    cb, 'iorb_minus_sofr', save_chart=True, chart_file_stem='iorb_minus_sofr_bars', h_w=(100, 600))

In [None]:
iorb_minus_sofr_bars