<a href="https://colab.research.google.com/github/coder272377/dvp/blob/feeder/tictop.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Tic Top : Top Five Equal Weight



In [None]:
interval = "1m"
previous_days = 3
tickers = tuple("AAPL TSLA GOOG AMZN MSFT".split())



## Package management



In [None]:
import datetime
import functools
import math
import urllib.parse as up

from typing import Dict, List, Tuple, Union



In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import requests
import scipy.stats as ss
import scipy.signal as signal



yfinance Library – A Complete Guide https://algotrading101.com/learn/yfinance-guide/



Charting Candlestick_OHLC one minute bars with Pandas and Matplotlib https://stackoverflow.com/questions/41821916/charting-candlestick-ohlc-one-minute-bars-with-pandas-and-matplotlib

Candlestick Chart in Python (mplfinance, plotly, bokeh, bqplot and cufflinks) https://coderzcolumn.com/tutorials/data-science/candlestick-chart-in-python-mplfinance-plotly-bokeh#1

In [None]:
try:
  import mplfinance as fplt
except ModuleNotFoundError:
  !pip install mplfinance
  import mplfinance as fplt



## Dates



In [None]:
@functools.lru_cache()
def tz_ny() -> str:
  return "America/New_York"



Most recent previous business day in Python https://stackoverflow.com/questions/2224742/most-recent-previous-business-day-in-python and https://stackoverflow.com/a/51528258



In [None]:
from pandas.tseries.holiday import USFederalHolidayCalendar
@functools.lru_cache()
def USBday():
  """
  US Business day unit
  """
  return pd.tseries.offsets.CustomBusinessDay(calendar=USFederalHolidayCalendar())



In [None]:
def now() -> pd.Timestamp:
  return pd.Timestamp.now(tz=tz_ny())



In [None]:
def get_start_date(n_days:int, end_date=now(), unit=USBday()):
  return end_date + unit * (-n_days)



In [None]:
def test_get_start_date__default_argument():
  result = get_start_date(5)
  assert result < now(), (result, now)



In [None]:
test_get_start_date__default_argument()



In [None]:
def get_nyse_open(t:pd.Timestamp, tz=tz_ny()):
  """
  09:30 of given timestamp in NY time zone
  """
  return get_nyse_time(t, hour=9, minute=30, tz=tz)



In [None]:
def get_nyse_close(t:pd.Timestamp, tz=tz_ny()):
  """
  16:00 of given timestamp in NY time zone
  """
  return get_nyse_time(t, hour=16, minute=00, tz=tz)



In [None]:
def get_nyse_time(t:pd.Timestamp, hour:int, minute:int, tz=tz_ny()):
  """
  New timestamp with given hour & minute in NY time zone
  """
  return pd.Timestamp(
    year=t.year, month=t.month, day=t.day, hour=hour, minute=minute, tz=tz,
  )



## Data-getters



In [None]:
@functools.lru_cache()
def header() -> Dict[str, str]:
  return {
      'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'
  }



In [None]:
def get_yf_timestamp(timestamp:pd.Timestamp) -> int:
  return int(timestamp.timestamp())



In [None]:
def test_get_yf_timestamp__22_Jan_21():
  timestamp = pd.Timestamp("2022-01-21", tz=tz_ny())
  result = get_yf_timestamp(timestamp)

  expected = 1642723200
  expected_timestamp = pd.Timestamp(expected, unit='s', tz="GMT")

  result_timestamp = pd.Timestamp(result, unit='s', tz=tz_ny())

  assert result_timestamp.date() == expected_timestamp.date(), ('\n'
      f"result =\t{result} = {result_timestamp}\n"
      f"expected =\t{expected} = {expected_timestamp}"
  )



In [None]:
test_get_yf_timestamp__22_Jan_21()



In [None]:
def chart_url(ticker:str) -> str:
  """
  Get json URL for the ticker
  """
  return (
      "https://query2.finance.yahoo.com"
      f"/v8/finance/chart/{ticker}"
  )



In [None]:
def test_chart_url__ticker():
  ticker="SPY"

  result = chart_url(ticker)

  parsed = up.urlparse(result)

  assert "https" == parsed.scheme, parsed
  assert parsed.path.split('/')[-1] == ticker, parsed



In [None]:
def get_request_param(
    start:pd.Timestamp, end:pd.Timestamp, interval:str,
    prepost:bool=True,
  ) -> Dict[str, Union[int, str, bool]]:
  return {
      "period1": int(start.timestamp()),
      "period2": int(end.timestamp()),
      "interval": interval.lower(),
      "includePrePost": prepost,
  }



In [None]:
def test_get_request_param__timezone_timestamps():
  start = pd.Timestamp("2022-01-21 00:00", tz=tz_ny())
  end = pd.Timestamp("2022-01-21 23:59", tz=tz_ny())
  interval = "1m"

  query = get_request_param(start, end, interval)

  start_from_url = pd.Timestamp(float(query["period1"]), unit='s', tz=tz_ny())
  assert start_from_url == start, (start_from_url, start)

  end_from_url = pd.Timestamp(float(query["period2"]), unit='s', tz=tz_ny())
  assert end_from_url == end, (end_from_url, end)



In [None]:
test_get_request_param__timezone_timestamps()



pandas.Series.dt.tz_convert https://pandas.pydata.org/docs/reference/api/pandas.Series.dt.tz_convert.html



In [None]:
@functools.lru_cache()
def int_2_timestamp(t:int, tz:str=tz_ny()) -> pd.Timestamp:
  return pd.Timestamp(t, unit='s', tz="GMT").tz_convert(tz_ny())



In [None]:
def get_index_ny(json) -> pd.Series:
  data = []

  for t in json["chart"]["result"][0]["timestamp"]:
    data.append(int_2_timestamp(t))
  
  return pd.Series(data)



In [None]:
@functools.lru_cache()
def get_df_start_end(
    ticker:str,
    start:pd.Timestamp, end:pd.Timestamp=now(), interval:str=interval,
    headers=header(),
  ) -> pd.DataFrame:
  """
  Read data between start and end
  """

  params=get_request_param(start, end, interval)

  r = requests.get(
      chart_url(ticker),
      params=params,
      headers=headers,
  )

  assert r.ok, r.text

  json = r.json()

  df = pd.DataFrame(
      data=json["chart"]["result"][0]["indicators"]["quote"][0],
      index=get_index_ny(json),
  )

  df.columns = df.columns.str.capitalize()

  return df



In [None]:
def get_df(ticker:str, n_days:int, end_date:pd.Timestamp=now(), interval:str=interval):
  return get_df_start_end(ticker, get_start_date(n_days, end_date=end_date), end=end_date, interval=interval)



In [None]:
def get_days_df(ticker:str, n_days:int=previous_days, end_date:pd.Timestamp=now(), interval:str="1D") -> pd.DataFrame:
  return get_df(ticker, n_days, end_date, interval)



In [None]:
def get_minutes_df(ticker:str, n_days:int=2, end_date:pd.Timestamp=now(), interval:str=interval) -> pd.DataFrame:
  return get_df(ticker, n_days, end_date, interval)



Test daily data getter functions



In [None]:
def get_last_sunday(start:pd.Timestamp=now()) -> pd.Timestamp:
  start = get_nyse_time(start, hour=10, minute=50)
  for offset in range(-1, -7-1, -1):
    result = start + pd.Timedelta(offset, 'd')
    if 6 == result.dayofweek:
      break;
  return result



In [None]:
def test__get_last_sunday__2022_Jan_25():
  start = pd.Timestamp("2022-01-25 10:50", tz=tz_ny())
  result = get_last_sunday(start)

  expected = pd.Timestamp("2022-01-23 10:50", tz=tz_ny())
  assert 6 == result.dayofweek
  assert result == expected, (result, expected)




In [None]:
test__get_last_sunday__2022_Jan_25()



In [None]:
def get_last_trading_day(start:pd.Timestamp=now()) -> pd.Timestamp:
  start = get_nyse_time(start, hour=00, minute=00)
  result = start + (-1) * USBday()
  return result



In [None]:
def test__get_last_trading_day__2022_Jan_23():
  start = pd.Timestamp("2022-01-23 10:50", tz=tz_ny())
  result = get_last_trading_day(start)

  expected = pd.Timestamp("2022-01-21 10:50", tz=tz_ny())
  assert result.date() == expected.date(), (result, expected)



In [None]:
test__get_last_trading_day__2022_Jan_23()



In [None]:
def test_get_days_df_sunday():
  tic = "GOOG"
  days = 5
  end_date = get_last_sunday()
  df_days = get_days_df(tic, n_days=days, end_date=end_date, interval="1d")

  expected = get_last_trading_day(end_date)

  assert df_days.shape[0], df_days.shape
  assert df_days.index[-1].date() == expected.date(),(
      '\n'
      f"last index\t= {df_days.index[-1]}\n"
      f"expected\t= {expected}"
  )



In [None]:
test_get_days_df_sunday()



Test minute data getter functions



In [None]:
def test_get_minutes_df__today():
  tic = "GOOG"
  days = 2
  end_date = now()
  df_minutes = get_minutes_df(tic, n_days=days, end_date=end_date, interval="5m")

  assert pd.Timedelta(20, 'h') < (df_minutes.index[-1] - df_minutes.index[0]), df_minutes



In [None]:
test_get_minutes_df__today()



In [None]:
def get_prev_index(df:pd.DataFrame):
  """
  Get indices before the second last day market close
  """

  fin = df.index[-1]
  fin_begin = get_nyse_time(fin, hour=0, minute=0)

  prev = df.index < fin_begin

  return df.index[prev]



In [None]:
def get_last_day_df(df:pd.DataFrame):
  index_prev = get_prev_index(df)
  return df.drop(index_prev)



## Percentage-calculator



In [None]:
def calc_pct(tic:str):
  df_minutes = get_minutes_df(tic)
  assert df_minutes.shape[0], df_minutes.shape

  index_prev = get_prev_index(df_minutes)

  previous_close = df_minutes.at[index_prev[-1], "Close"]

  df_current_date = get_last_day_df(df_minutes)

  ser_current_date = df_current_date["Close"]
  assert index_prev[-1] < ser_current_date.index[0]

  return ser_current_date.mul(100.0 / previous_close) - 100.0



In [None]:
def ave_pct(tickers:Tuple[str]=tickers):

  df_pct = pd.DataFrame(
      {tic:calc_pct(tic) for tic in tickers},
      columns=tickers
  )

  result = df_pct.mean(axis=1)

  return result, df_pct



## Chart-plotter



In [None]:
def get_close_upper_lower(close:pd.Series) -> Tuple[float]:
  """
  Estimate lower & upper limits from close prices
  """
  close_max = close.max()
  close_min = close.min()

  delta_close = close_max - close_min

  close_lower = close_min - delta_close * 0.1
  close_upper = close_max + delta_close * 0.1

  return close_lower, close_upper


In [None]:
def get_milder_ylim(ax:plt.Axes, ser_close:pd.Series,) -> Tuple[float]:
  """
  Find milder y limits
  """
  close_lower, close_upper = get_close_upper_lower(ser_close)

  ylim = ax.get_ylim()

  return (
      max(ylim[0], close_lower),
      min(ylim[1], close_upper),
  )



In [None]:
def plot_candlestick(df:pd.DataFrame, ax=None):
  """
  Candle stick chart of a given dataframe
  """
  if ax is None:
    fig, ax = plt.subplots(figsize=(16, 9))

  fplt.plot(
      df,
      type='candle',
      style='charles',
      ylabel='Price ($)',
      ax=ax,
      datetime_format="%y/%m/%d %H:%M"
  );

  ax.set_ylim(get_milder_ylim(ax, df["Close"]))

  ax.grid(True)
  
  return ax



In [None]:
def plot_candle_pct(tic, figsize:Tuple[int]=(24,10)):
  """
  Plot candlestick and TopTic indicator
  """
  fig, ax = plt.subplots(2, 1, figsize=figsize)
  # fig.subplots_adjust(hspace=0)

  df_minutes = get_minutes_df(tic, )

  fig.suptitle(f"{tic} {df_minutes.index[-1].date()}")

  df_today = get_last_day_df(df_minutes)

  plot_candlestick(
      df_today,
      ax=ax[0]
  )

  xlim = ax[0].get_xlim()
  xticks = ax[0].get_xticks()
  xticklabels = ax[0].get_xticklabels()
  # xmajorticklabels = ax[0].get_xmajorticklabels()

  s_ave, df_pct = ave_pct()
  s_ave.plot.bar(
      ax=ax[1], grid=True, ylabel="%",
      xticks=xticks,
  )
  ax[1].set_xticklabels(xticklabels)
  ax[1].set_xlim(xlim);

  last_date = df_minutes.index[-1].date()

  print(df_pct.tail())

  return ax



## Runing the code



In [None]:
plot_candle_pct("SPY")
plt.savefig("tictop.png", dpi=300)

