### Initialization

In [50]:
from dash import Dash, html, Output, Input, callback, State, ALL, dash_table
from dash.exceptions import PreventUpdate
import dash.dcc as dcc
import dash_bootstrap_components as dbc
import ta

import yfinance as yf
import os

### TA Generator | Dash Preview

In [51]:
symbol = "MSFT"
time = "max"
ticker_hist = grab_ticker_data(symbol, time)
data = create_shifted_data(ticker_hist)

In [52]:
submodules = [name for name in dir(ta) if not (name.startswith("__") or name.startswith("add") or name == 'wrapper' or name == 'utils')]

In [53]:
mod_to_inds = {}
    
for module in submodules:
    function_names = [
        name for name in dir(getattr(ta, module)) 
        if callable(getattr(getattr(ta, module), name)) and 
        not (name.startswith("__") or name.startswith("_") or name[0].isupper())]
    mod_to_inds[module] = function_names

In [54]:
import inspect

def get_function_arguments(func):
    signature = inspect.signature(func)
    arguments = []
    for name, param in signature.parameters.items():
        if param.default is inspect.Parameter.empty:
            arguments.append((name, None))
        else:
            arguments.append((name, param.default))
    return arguments

# list(filter((lambda x: x[0].lower() not in ["close", "open", "high", "low", "volume"]), get_function_arguments(ta.trend.ema_indicator)))

In [68]:
def get_ind_args(module, ind):
    return list(filter(lambda x: (x[0].lower() not in ["close", "open", "high", "low", "volume"]), get_function_arguments(getattr(getattr(ta, module), ind))))

In [69]:
def draw_ind_inputs(module, ind):
    inputs = []
    for arg, val in get_ind_args(module, ind):
        inputs.append(dbc.Stack([
            dbc.Label(arg),
            dbc.Input(id={'type':'ind-param', 'index':arg}, value=val)
        ], direction='horizontal', gap=2))
    return inputs


In [78]:
app = Dash(__name__, external_stylesheets=[dbc.themes.SLATE])
app.layout = dbc.Container([
    dcc.Store(id='ind-store', storage_type='memory', data={'trend': {}, 'momentum': {}, 'volume': {}, 'volatility': {}, 'others': {}}),
    dbc.Row(
        [
            dbc.Col(
                [
                    dbc.Stack([
                        dbc.Label("Indicator Type", style={'width': '20%'}),
                        dcc.Dropdown(submodules, id="submod-dd", style={'width':'100%'}),
                    ], direction='horizontal', gap=2)
                ]
            ),
            dbc.Col(
                [
                    dbc.Stack([
                        dbc.Label("Indicator"),
                        dcc.Dropdown(id = 'ind-dd', style={'width':'100%'})
                    ], direction='horizontal', gap=2)
                ]
            )
        ]
    ),
    html.Br(),
    dbc.Row(
        [
            dbc.Col((
                [
                    html.Div(id='ind-params'),
                    html.Br(),
                    dbc.Button("Add Indicator", id='add-ind', color='primary'),
                    html.Div(id='test-data')
                ]
            ))
        ]
    ),
    dbc.Row(
        [
            dbc.Col([
                dash_table.DataTable(id='ind-table', row_deletable=True, style_cell={'textAlign': 'left'},)
            ])
        ]
    )
])

@app.callback(
    Output('ind-dd', 'options'),
    Input('submod-dd', 'value')
)
def update_ind_dropdown(submod):
    if not submod:
        raise PreventUpdate
    return mod_to_inds[submod]

@app.callback(
    Output('add-ind', 'disabled'), 
    Input('ind-dd', 'value'),
    Input('submod-dd', 'value')
)
def disable_button(ind, mod):
    if not ind or not mod:
        return True
    return False

@app.callback(
    Output('ind-params', 'children'),
    Input('ind-dd', 'value'),
    Input('submod-dd', 'value')
)
def update_ind_adder(ind, mod):
    if not ind or not mod:
        return None
    return draw_ind_inputs(mod, ind)

@app.callback(   
    [Output('ind-store', 'data', allow_duplicate=True),
    Output('ind-table', 'data')],
    [
        Input('add-ind', 'n_clicks'),
        State('ind-store', 'data'),
        State({'type': 'ind-param', 'index': ALL}, 'value'),
        State({'type': 'ind-param', 'index': ALL}, 'id'),
        State('ind-dd', 'value'),
        State('submod-dd', 'value'),
        State('ind-table', 'data')
    ],
    prevent_initial_call='initial_duplicate'
)
def add_indicator(clicks, data, vals, ids, ind, mod, tabdata):
    if not clicks or clicks == 0:
        raise PreventUpdate

    if data == None:
        data = {}
    
    args = get_function_arguments(getattr(getattr(ta, mod), ind))
    # print(args)
    if ind not in data[mod]:
        data[mod][ind] = {}
    
    for col in ["open", "high", "low", "close"]:
        for arg in args:
            if col == arg[0]:
                data[mod][ind] = {
                    **data[mod][ind],
                    arg[0]: arg[1],
                }
                break
    
    params = {
        ids[i]['index']: vals[i] for i in range(len(ids))
    }
    
    data[mod][ind] = {
        **data[mod][ind],
        **params
    }
    
    if not tabdata:
        tabdata = []

    return data, tabdata + [{'Indicator Type': [mod], 'Indicator': [ind]}]

@app.callback(
    # [
        # Output('test-data', 'children', allow_duplicate=True),
        Output('ind-store', 'data', allow_duplicate=True),
    # ],
    Input('ind-table', 'data'),
    State('ind-table', 'data_previous'),
    State('ind-store', 'data'),
    prevent_initial_call='initial_duplicate'
)
def remove_data(tdata, old, data):
    if old is None:
        old = []
    else:
        old = list(map(lambda x: (x["Indicator Type"][0], x['Indicator'][0]), old))
    if tdata is None:
        tdata = []
    else:
        tdata = list(map(lambda x: (x["Indicator Type"][0], x['Indicator'][0]), tdata))
    
    if len(old) <= len(tdata):
        raise PreventUpdate
    
    
    for mod, ind in old:
        if (mod, ind) not in tdata:
            del data[mod][ind]
    return data

@app.callback(
    Output('test-data', 'children'),
    Input('ind-store', 'data')
)
def update_test_data(data):
    if not data: data = {}
    return data.__str__()

app.run(jupyter_mode="inline")

{'trend': {}, 'momentum': {'awesome_oscillator': {'high': None, 'low': None}}, 'volume': {}, 'volatility': {}, 'others': {}}
{'window1': 5, 'window2': 34, 'fillna': False}
{'trend': {}, 'momentum': {'awesome_oscillator': {'high': None, 'low': None}}, 'volume': {}, 'volatility': {}, 'others': {}}
{'window1': 5, 'window2': 34, 'fillna': False}
{'trend': {}, 'momentum': {'awesome_oscillator': {'high': None, 'low': None, 'window1': 5, 'window2': 34, 'fillna': False}, 'kama': {'close': None}}, 'volume': {}, 'volatility': {}, 'others': {}}
{'window': 10, 'pow1': 2, 'pow2': 30, 'fillna': False}
{'trend': {}, 'momentum': {'awesome_oscillator': {'high': None, 'low': None, 'window1': 5, 'window2': 34, 'fillna': False}, 'ppo_hist': {'close': None}}, 'volume': {}, 'volatility': {}, 'others': {}}
{'window_slow': 26, 'window_fast': 12, 'window_sign': 9, 'fillna': False}


### TA Generation | DF

In [58]:
def grab_ticker_data(symbol, time):
    DATA_PATH = f"../../data/{symbol}_data_{time}.json"
    if os.path.exists(DATA_PATH):
        # Read from file if we've already downloaded the data.
        with open(DATA_PATH) as f:
            ticker_hist = pd.read_json(DATA_PATH)

    else:
        ticker = yf.Ticker(symbol)
        ticker_hist = ticker.history(period=time)

        # Save file to json in case we need it later.  This prevents us from having to re-download it every time.
        ticker_hist.to_json(DATA_PATH)
    return ticker_hist

In [59]:
def create_shifted_data(ticker_hist, days=1):
    # Ensure we know the actual closing price
    data = ticker_hist.copy()
    del data["Stock Splits"]
    del data["Dividends"]
    # data = data.rename(columns = {'Close':'Actual_Close'})
    
    # Setup our target.  This identifies if the price went up or down
    # data["Target"] = ticker_hist.rolling(days+1).apply(lambda x: x.iloc[-1] > x.iloc[0])["Close"] # Might have to adjust if days are different
    # data["Target"] = (ticker_hist["Close"].shift(days) < ticker_hist["Close"]).astype(float)
    data["Target"] = (data["Close"].shift(-days) > data["Close"]).astype(int)

    # Shift stock prices forward one day, so we're predicting tomorrow's stock prices from today's prices.
    # ticker_prev = ticker_hist.copy()
    # ticker_prev = ticker_prev.shift(days)

    # data = data.join(ticker_prev[predictors]).iloc[1:]
    return data

In [62]:
# Need to include type of indicator in the dictionary

inds = {
    'trend': {
        'ema_indicator': {
            'close': None,
            'window': 12, 
            'fillna':False
        }
    }
}

def gen_indicators(data, indicators):
    for typ, ind in indicators.items():
        for name, params in ind.items():
            
            for col in ['Open', 'High', 'Low', 'Close', 'Volume']:
                if col.lower() in params:
                    params[col.lower()] = data[col]
            
            data[name] = getattr(getattr(ta, typ), name)(**params)
    return data

In [63]:
data = grab_ticker_data('MSFT', 'max')[['Open', 'High', 'Low', 'Close', 'Volume']]
# data["ema_indicator"] = ta.trend.ema_indicator(data["Close"], window=12, fillna=False)
data = gen_indicators(data, inds)
data

Unnamed: 0,Open,High,Low,Close,Volume,ema_indicator
1986-03-13 05:00:00,0.054893,0.062965,0.054893,0.060274,1031788800,
1986-03-14 05:00:00,0.060274,0.063504,0.060274,0.062427,308160000,
1986-03-17 05:00:00,0.062427,0.064042,0.062427,0.063503,133171200,
1986-03-18 05:00:00,0.063504,0.064042,0.061350,0.061889,67766400,
1986-03-19 05:00:00,0.061889,0.062427,0.060274,0.060812,47894400,
...,...,...,...,...,...,...
2023-11-22 05:00:00,378.000000,379.790009,374.970001,377.850006,23345300,367.898970
2023-11-24 05:00:00,377.329987,377.970001,375.140015,377.429993,10176600,369.365281
2023-11-27 05:00:00,376.779999,380.640015,376.200012,378.609985,22179200,370.787543
2023-11-28 05:00:00,378.350006,383.000000,378.160004,382.700012,20453100,372.620231
