set the imports

In [None]:
import os
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
# import mplfinance as mpf
import yfinance as yf
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

from talib import abstract
# add base to the modules
from indicators import indicators as ind
from indicators import normalize as nor
from matplotlib import pyplot as plt
from typing import Union, Tuple
import math

# plt.style.use('dark_background')
%matplotlib inline

### Set the symbol

In [None]:
symbol = "DIA"

In [None]:
def load_ticker(sym, period="5y", interval="1d"):
    spy = yf.Ticker(sym)
    return spy.history(period, interval).reset_index()

### load ticker function

In [None]:
df = load_ticker(symbol)
df.info()

In [None]:
print(df.head())

## Plot the data to get an idea of the shape

In [None]:
plt.figure(figsize=(12,12))
# plot the chart
df.plot(x="Date", y='Close', kind="line", title=f"{symbol} Close", figsize=(15,5))
plt.legend(loc="best")

## Price Indicators
Bollinger Bands, Keltner Channels, EMA, and SMA values

In [None]:
df['Mid'] = ind.mid(df['Open'].values, df['Close'].values)

In [None]:
WINDOW = 60

### Basic Trend Indicators

EMA, SMA, and linear regression

In [None]:
price_periods = (3,6,12,20,30,50)

In [None]:
def get_price_indicators(df: pd.DataFrame, periods: tuple):
    f = df[['Mid']]
    for i in periods:
        f[f'EMA_{i}'] = ind.ema(df['Mid'].values, i)
        f[f'LREG_{i}'] = ind.rolling_lin(df['Mid'].values, i)
        f[f'SMA_{i}'] = ind.sma(df['Mid'].values, i)

    cols = ['Mid']

    # rearrange columns for output
    for i in periods:
        cols.append(f'EMA_{i}')
    for i in periods:
        cols.append(f'LREG_{i}')
    for i in periods:
        cols.append(f'SMA_{i}')
    return f[cols]


In [None]:
# add the price indicators to the dataframe
price_df = get_price_indicators(df, price_periods)

In [None]:
# price_df = get_price_indicators(df, tuple(range(3,20)))
price_df.tail(WINDOW*2).plot(kind="line", title=f"{symbol} price indicators", figsize=(16,8))

### SAR

In [None]:
sar_accelerations = (.1,.2,.3,.4,.5)

In [None]:
# smoothed sar seperated as it uses a different set of values
def get_sar_frame(df: pd.DataFrame, accelerations: tuple)->pd.DataFrame:
    f = pd.DataFrame(index=df.index)
    for i in accelerations:
        f[f'SAR_{i}'] = ind.ema(ind.sar(df['High'], df['Low'], float(i), 1.), int(i*100))
    return f

In [None]:
sar_df = get_sar_frame(df, sar_accelerations)
sar_df.tail(WINDOW*2).plot(kind="line", figsize=(16,4))

In [None]:
def get_bar_df(df: pd.DataFrame, periods: tuple) -> pd.DataFrame:
    f = pd.DataFrame(index=df.index)
    # top wave
    f['MID_V_HIGH'] = df['High'] - df['Mid']
    for i in periods:
        f[f'MID_V_HIGH_SMA_{i}'] = ind.sma(f['MID_V_HIGH'].values, i)

    # bottom wave
    f['MID_V_LOW'] = df['Low'] - df['Mid']
    for i in periods:
        f[f'MID_V_LOW_SMA_{i}'] = ind.sma(f['MID_V_LOW'].values, i)

    # high vs low
    f['HIGH_V_LOW'] = df['High'] - df['Low']
    for i in periods:
        f[f'HIGH_V_LOW_SMA_{i}'] = ind.sma(f['HIGH_V_LOW'].values, i)

    return f

In [None]:
bar_df = get_bar_df(df, price_periods[:-2])
# plot the dataframe with the indicators
bar_df.tail(30).plot(figsize=(32,16))

### BAR indicators
day to day gaps and bar sizes

In [None]:
def get_gap_df(df: pd.DataFrame, periods: tuple) -> pd.DataFrame:
    o = np.delete(df['Open'].to_numpy(), 0)
    c = df['Close'].to_numpy()[:-1]
    gap_pc = (o - c) / c
    gap_up = np.concatenate(([np.nan], np.where(gap_pc > 0, gap_pc, 0.)))
    gap_down = np.concatenate(([np.nan], np.where(gap_pc < 0, gap_pc, 0.)))
    f = pd.DataFrame(index=df.index)
    for i in periods:     
        f[f'GAPU_SMA_{i}'] = ind.sma(gap_up, i)
    for i in periods:     
        f[f'GAPD_SMA_{i}'] = ind.sma(gap_down, i)
    return f

In [None]:
gap_df = get_gap_df(df, price_periods[:-2])
# plot the dataframe with the indicators
gap_df.tail(30).plot(figsize=(32,16))

### Channel Indicators
Bbands + Keltner channel

In [None]:
def get_channel_indicators(df: pd.DataFrame, periods: tuple)->tuple:
    f = pd.DataFrame(index=df.index)
    for i in periods:
        bb2u, _, bb2l = ind.bbands(df['Mid'].values, i, 2.)
        bb3u, _, bb3l = ind.bbands(df['Mid'].values, i, 2.)
        # bb3u, _, bb3l = ind.bbands(df['Mid'].values, i, 3.)
        ku, kl, _ = ind.keltner_channels(df['High'].values, df['Low'].values, df['Close'].values, i)
        # f[f'BB3U_{i}'] = bb3u
        f[f'BB3U_{i}'] = bb3u
        f[f'BB2U_{i}'] = bb2u
        f[f'KELU_{i}'] = ku
        # f[f'BB3L_{i}'] = bb3l
        f[f'KELL_{i}'] = kl
        f[f'BB2L_{i}'] = bb2l
        f[f'BB3L_{i}'] = bb3l

    cols = []

    # rearrange columns for output
    for i in periods:
        cols.append(f'KELU_{i}')
    for i in periods:
        cols.append(f'KELL_{i}')
    for i in periods:
        cols.append(f'BB2U_{i}')
    for i in periods:
        cols.append(f'BB2L_{i}')
    for i in periods:
        cols.append(f'BB3U_{i}')
    for i in periods:
        cols.append(f'BB3L_{i}')
    return f, f[cols]

In [None]:
channel_df, channel_df_b = get_channel_indicators(df, price_periods)
channel_df_b.tail(WINDOW).plot(kind="line", title=f"{symbol} channel indicators", figsize=(32,16))

### BBAND SPREAD

In [None]:
def get_bband_spread_df(df: pd.DataFrame, periods: Tuple[int])->pd.DataFrame:
    f = pd.DataFrame(index=df.index)
    for i in periods:
        bbu, _, bbl = ind.bbands(df['Close'], i, 2.)
        f[f'BBAND_DIFF_{i}'] = bbu - bbl
    return f

In [None]:
bbd_df = get_bband_spread_df(df, price_periods)
# plot ease of movement
bbd_df.tail(WINDOW).plot(figsize=(16,4))

### BBAND price convergence

In [None]:
def get_bband_price_conv_df(df: pd.DataFrame, periods: Tuple[int])->tuple:
    f = pd.DataFrame(index=df.index)
    for i in periods:
        bbu, _, bbl = ind.bbands(df['Mid'], i, 2.)
        f[f'BBANDU_DIFF_{i}'] = bbu - df['High'].values
        f[f'BBANDL_DIFF_{i}'] = df['Low'].values - bbl
    cols = []
    # rearrange the columns
    for i in periods:
        cols.append(f'BBANDU_DIFF_{i}')
    for i in periods:
        cols.append(f'BBANDL_DIFF_{i}')        
    return f, f[cols]

In [None]:
bbpd_df, bbpd_df_b = get_bband_price_conv_df(df, price_periods[:-2])
# plot ease of movement
bbpd_df_a.tail(WINDOW*2).plot(figsize=(16,4))

### Linear Regression Slope

In [None]:
def get_lin_slope_df(df: pd.DataFrame, periods: Tuple[int])->pd.DataFrame:
    f = pd.DataFrame(index=df.index)
    for i in periods:
        f[f'LREGSL_{i}'] = ind.rolling_lin_slope(df['Mid'].values, i)
    return f

In [None]:
lin_slope_df = get_lin_slope_df(df, price_periods)
# plot ease of movement
lin_slope_df.tail(WINDOW*2).plot(figsize=(16,4))

### Volume

In [None]:
def get_volume_df(vol: np.array, periods: Tuple[int])->pd.DataFrame:
    f = pd.DataFrame(index=df.index)
    f["Volume"] = vol
    for i in periods:
        f[f'VOL_{i}'] = ind.sma(vol.astype(np.float), i)
    return f

In [None]:
vol_df = get_volume_df(df['Volume'].to_numpy(), price_periods)
# plot ease
vol_df['Volume']
vol_df.tail(WINDOW*2).plot(figsize=(16,4))

### Ease of Movement

In [None]:
def get_eom_oscillator_frame(df: pd.DataFrame, window:int, periods: Tuple[int])->pd.DataFrame:
    f = pd.DataFrame(index=df.index)
    for i in periods:
        f[f'EOME_{i}'] = ind.eome(df['Volume'].values, df['Close'].values, window, i)
    return f

In [None]:
eom_df = get_eom_oscillator_frame(df, WINDOW, price_periods[:-2])
# plot ease of movement
eom_df.tail(WINDOW*2).plot(figsize=(16,4))

### ADOSC

In [None]:
def get_adosc_oscillator_frame(df: pd.DataFrame, periods: Tuple[int])->pd.DataFrame:
    f = pd.DataFrame(index=df.index)
    pre = periods[0]
    for i in range(1,len(periods)):
        f[f'ADOSC_{pre}_{periods[i]}'] = ind.adosc(df["High"], df["Low"], df["Close"], df["Volume"], pre, periods[i])
        pre = periods[i]
    return f


In [None]:
# adosc
adosc_df = get_adosc_oscillator_frame(df, price_periods)
adosc_df.tail(WINDOW).plot(figsize=(16,4))

### EMA C/D

In [None]:
def get_ema_cd_frame(df: pd.DataFrame, periods: Tuple[int])->pd.DataFrame:
    f = pd.DataFrame(index=df.index)
    pre = periods[0]
    for i in range(1,len(periods)):
        f[f'EMA_CD_{pre}_{periods[i]}'] = ind.ema(df['Mid'].values, pre) - ind.ema(df['Mid'].values, periods[i])
    return f

In [None]:
ema_cd_df = get_ema_cd_frame(df, price_periods)
ema_cd_df.tail(WINDOW).plot(figsize=(16,4))

### ATR

In [None]:
def get_atr_frame(df: pd.DataFrame, periods: tuple)->pd.DataFrame:
    f = pd.DataFrame(index=df.index)
    for i in periods:
        f[f'ATR_{i}'] = ind.atr(df['High'], df['Low'], df['Close'], i)
    return f

In [None]:
atr_df = get_atr_frame(df, price_periods)
atr_df.tail(WINDOW).plot(figsize=(16,4))

### ADX

In [None]:
def get_adx_frame(df: pd.DataFrame, periods: tuple)->pd.DataFrame:
    f = pd.DataFrame(index=df.index)
    for i in periods:
        f[f'ADX_{i}'] = ind.adx(df['High'].values, df['Low'].values, df['Close'].values, i)
    return f

In [None]:
adx_df = get_adx_frame(df, price_periods[:-2])
adx_df.tail(WINDOW).plot(figsize=(16,4))

### Get Labels

labels will be the rounded average future change

In [None]:
def discretize(a: np.array, 
               bins: Union[list, tuple], 
               labels: Union[None, list, tuple]=None,
               right=False) -> np.array:
    """
    discretize a numpy array into bins
    if labels is given then apply those lables
    """
    if labels == None:
        labels = bins

    assert len(bins)== len(labels)

    bins_by_labels = dict(zip(range(0,len(bins)), labels))
    digitized = np.digitize(np.nan_to_num(a, nan=.0), bins=bins, right=right)
    res = np.empty((0))

    for v in digitized:
        for b, l in bins_by_labels.items():
            if v == b:
                res = np.append(res, [l])
                    
    return res

In [None]:
# get average future change percentage 3 days into the future
avg_future_change = ind.apfc(df['Close'], 3, 3)
future_change = discretize(avg_future_change, [-.1, -.05, -0.01, .01, .05, .1, np.inf], [-.1, -.05, -.01, 0, .01, .05, .1])

In [None]:
chart = pd.DataFrame()
chart['label'] = future_change
chart['AFC'] = avg_future_change
chart['Mid'] = df['Mid']
chart.tail(WINDOW*2).plot(figsize=(16,4), secondary_y=['Mid'])

### Create images

In [None]:
# set up the create image functions
def create_pixels(a) -> tuple:
    # crete pixels where green and blue values are tanh normalized percent change values
    # red indicates negative percent change, green indicates positive percent change
    # pc = np.tanh(nor.min_max(ind.delta(s.values) * 2 -1))
    # pc = np.tanh(ind.delta(a))
    pc = nor.min_max(nor.zscore(a)) * 2 - 1
    g = np.where(pc > 0, pc * a + (1. - a), 0.0)
    r = np.where(pc < 0, -pc * a + (1. - a), 0.0)
    return r, g, a

def min_max_all(df: pd.DataFrame) -> pd.DataFrame:
    max_v: np.Float = None
    min_v: np.Float = None

    for col in df.columns:
        max_v = df[col].max() if max_v is None else max(max_v, df[col].max())
        min_v = df[col].min() if min_v is None else min(min_v, df[col].min())

    return (df - min_v) / (max_v - min_v)

def create_image(frames: list, label, mirror=int) -> tuple:
    idf: pd.DataFrame = None
    columns = []
    for frame in frames:
        # normalize the price-relative data across
        norm_df = min_max_all(frame)
        if idf is None:
            idf = pd.DataFrame(index=norm_df.index)

        for col in norm_df.columns:
            columns.append(col)
            idf[col + '_r'], idf[col + '_g'], idf[col + '_b'] = create_pixels(norm_df[col].to_numpy())

    # todo: handle oscillator values
    # image h x w = window size x number of features. 3 = rgb values
    idf = idf.dropna()
    # reset the index
    idf = idf.reset_index(drop=True)
    img1 = np.zeros((len(idf), len(columns), 3))
    for i, row in idf.iterrows():
        vals = []
        for col in columns:
            vals.append((row[col+'_r'], row[col+'_g'], row[col+'_b']))
        img1[i] = tuple(vals)
    # mirror
    if mirror > 0:
        img_len = len(img1)
        for i in range(0, mirror):
            img1 = np.concatenate((img1, [img1[img_len - i - 1]]))

    # return everything but the first row as the first low has no RGB data
    return img1[1:], label

In [None]:
start = 100+192
col_len = (
    len(price_df.columns)
    + len(channel_df.columns)
    + len(sar_df.columns)
    + len(bar_df.columns)
    + len(bbd_df.columns)
    + len(gap_df.columns)
    + len(adx_df.columns)
    + len(atr_df.columns)
    + len(lin_slope_df.columns)
    + len(eom_df.columns)
    + len(vol_df.columns))
mirror = int(math.floor(col_len/4))
window = mirror * 3

In [None]:
print(f'x = {col_len}, y={mirror} + {window} = {mirror+window}')

In [None]:
start_b = 100+192
col_len_b = (
    len(price_df.columns)
    + len(channel_df.columns)
    + len(sar_df.columns)
    + len(bar_df.columns)
    + len(gap_df.columns)
    + len(adx_df.columns)
    + len(atr_df.columns)
    + len(eom_df.columns)
    + len(vol_df.columns))
mirror_b = int(math.floor(col_len/4))
window_b = mirror * 3

In [None]:
# plot the images
from matplotlib import image

fig = plt.figure(figsize=(16,16))
columns = 4
rows = 4
cnt = 0


In [None]:
for i in range(0, columns * rows):
    s = i + start
    e = i + start + window
    im_array, label = create_image(
        [
            price_df[s:e], 
            channel_df[s:e], 
            sar_df[s:e], 
            bar_df[s:e], 
            bbd_df[s:e], 
            gap_df[s:e],
            adx_df[s:e], 
            atr_df[s:e], 
            lin_slope_df[s:e], 
            eom_df[s:e], 
            vol_df[s:e]
        ], future_change[e], mirror)
    fig.add_subplot(rows, columns, i+1, title=str(label))
    plt.imshow(im_array)
    # image.imsave(os.path.join(os.getcwd(), "..", "data", f"{label}_{symbol}.png"), im_array)
plt.show()

In [None]:
for i in range(0, columns * rows):
    s = i + start_b
    e = i + start_b + window_b
    im_array, label = create_image(
        [
            price_df_b[s:e], 
            channel_df_b[s:e], 
            sar_df[s:e], 
            bar_df[s:e], 
            gap_df[s:e],
            adx_df[s:e], 
            atr_df[s:e], 
            eom_df[s:e], 
            vol_df[s:e]
        ], future_change[e], mirror)
    fig.add_subplot(rows, columns, i+1, title=str(label))
    plt.imshow(im_array)
    # image.imsave(os.path.join(os.getcwd(), "..", "data", f"{label}_{symbol}.png"), im_array)
plt.show()

### Create images for the entire dataset

use multiprocessing to speed up the export

In [None]:
from pathlib import Path
from time import time
from multiprocessing import Pool, Queue

In [None]:
def worker(queue: Queue):
    print(f'{os.getpid()} worker started')
    while True:
        # get and block
        item = queue.get(True)
        if item == -1:
            # stop the worker if we recieved None, signaling queue is empty
            print(f'{os.getpid()} worker finished')
            return
        img, label = create_image(item[0], item[1], item[2])
        fname = item[4] / f"{item[1]}_{item[5]}_{item[3]}.png"
        image.imsave(fname, img)
        print(f'saved {fname}')

In [None]:
workers = 8
tasks = {}
start = 100+192

# set the data dir
data_dir = Path(os.getcwd()) / '..' / 'data' / 'img'

# create dir if note exists
if not os.path.isdir(data_dir):
    os.makedirs(data_dir)
    print(f'created dir: {data_dir}')

q = Queue(workers)

# create worker pool
p = Pool(workers, worker, (q,))

for i in range(start, len(df)-window-4):
    s = i
    e = i + window
    # put and block
    q.put(([
                price_df[s:e], 
                channel_df[s:e], 
                sar_df[s:e], 
                bar_df[s:e], 
                bbd_df[s:e], 
                gap_df[s:e],
                adx_df[s:e], 
                atr_df[s:e], 
                lin_slope_df[s:e], 
                eom_df[s:e], 
                vol_df[s:e]
            ], future_change[e], mirror, df['Date'].at[e].isoformat(), data_dir, symbol), True)

for i in range(0, workers):
    q.put(-1, True)

print('Done')