## Setup


In [None]:
#Parameters
connection_string = "DEFAULT"
install_ta_lib_binary = False
install_deps = False

In [None]:
# If we are using this in collab, then connection_string will by default be DEFAULT and when trigger manually or by paper mill, it will be set using
# parameter injection
if connection_string == "DEFAULT":
    print("No connection string provided... Using Collab Userdata")
    from google.colab import userdata

    connection_string = userdata.get('PG_CONNECTION')

if install_ta_lib_binary:
    print("Installing ta lib binary")
    !wget https://github.com/ta-lib/ta-lib/releases/download/v0.6.3/ta-lib_0.6.3_amd64.deb
    !dpkg -i ta-lib_0.6.3_amd64.deb

if install_deps:
    print("Install dependencies...")
    !pip install TA-Lib pandas-ta pandas numpy==1.26.4 tables websockets sqlalchemy --quiet

In [None]:
import asyncio
import json
from logging import log
from random import choices
from string import ascii_letters, digits
from typing import Any
from typing import Literal

import numpy as np
import pandas as pd
import pandas_ta as ta
import requests
from websockets import ConnectionClosed
from websockets import Origin
from websockets.asyncio.client import connect, ClientConnection

In [None]:
# Check if the DB Connect is set
from sqlalchemy import create_engine,text
engine = create_engine(connection_string, echo=False)
with engine.connect() as conn:
    result = conn.execute(text("SELECT 1"))
    print("Database connection is successful.")

## Download

In [None]:
_MESSAGE_PREFIX = "~m~"


def chunk_list(lst: list[str], chunk_size: int):
    return [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)]


async def download(tickers: list[str]):
    all_quotes = {}
    all_bars = {}
    async  for quotes, bars in fetch_bulk(tickers):
        all_quotes.update(quotes)
        all_bars.update(bars)

    return all_quotes, all_bars


def to_quote_df(quotes: dict[str, dict]):
    print("Generating Quote DataFrames...")
    quote_df = pd.DataFrame(quotes.values())
    quote_df['ticker'] = quote_df['pro_name']
    quote_df = quote_df.set_index(['ticker'])
    print("Generated Quote DataFrames...")
    return quote_df


def to_bars_df(bars: dict[str, list[list]]):
    required_columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume']

    def process_bar(bar):
        b_df = pd.DataFrame(bar)
        # Add column names dynamically (truncate to available data)
        b_df.columns = required_columns[:b_df.shape[1]]
        # Ensure all required columns are present
        for col in required_columns:
            if col not in b_df.columns:
                b_df[col] = np.nan  # Fill missing columns with NaN

        b_df['timestamp'] = pd.to_datetime(b_df['timestamp'], unit='s')
        b_df['timestamp'] = b_df['timestamp'].dt.floor('D')
        b_df.set_index(['timestamp'], inplace=True)
        return b_df

    print("Generating Bar DataFrames...")
    v = {k: process_bar(bar) for k, bar in bars.items()}
    print("Generated Bar DataFrames...")
    return v


async def fetch_bulk(tickers: list[str]):
    main_chunk = chunk_list(tickers, 500)
    failed_chunks = []
    for idx, chunk in enumerate(main_chunk):
        print(f"Started: {idx + 1}/{len(main_chunk)}")
        sub_chunks = chunk_list(chunk, 100)
        tasks = [asyncio.create_task(fetch_data(chunked_symbols)) for chunked_symbols in sub_chunks]
        chunk_result = await asyncio.gather(*tasks)
        for chunked_symbols, result in zip(sub_chunks, chunk_result):
            if result is None:
                print("Failed chunks: ", chunked_symbols)
                failed_chunks = failed_chunks + chunked_symbols
                continue

            yield result

        print(f"Completed: {idx + 1}/{len(main_chunk)}")


async def fetch_data(ticker: list[str], mode: Literal["quote", "bar", "all"] = "all"):
    if len(ticker) == 0:
        return {}, {}

    data = {}
    complete = False
    last_message = None
    async with (connect_to_server() as socket):
        try:
            await _init(socket, ticker, data, mode)
            async  for message in socket:
                last_message = message
                complete = await _process_data(socket, ticker, message, data)
                if complete:
                    break
            await _end(socket)
        except ConnectionClosed as e:
            if not complete:
                print("Failed", last_message)
                log.error("Connection Closed", e)

    if complete:
        return data.get("quotes", {}), data.get("bars", {})

    return None


def connect_to_server():
    url = "wss://data-wdc.tradingview.com/socket.io/websocket?type=chart"
    origin = "https://in.tradingview.com"
    return connect(url, origin=Origin(origin), max_size=None, ping_timeout=60)


def _encode(data: dict[str, Any] | list[dict[str, Any]] | str) -> str:
    encoded_message = ""
    if isinstance(data, str):
        encoded_message += f"{_MESSAGE_PREFIX}{len(data)}{_MESSAGE_PREFIX}{data}"
        return encoded_message

    if not isinstance(data, list):
        data = [data]

    for item in data:
        stringified = json.dumps(item) if item is not None else ""
        encoded_message += f"{_MESSAGE_PREFIX}{len(stringified)}{_MESSAGE_PREFIX}{stringified}"

    return encoded_message


async def _decode(socket: ClientConnection, msg: str) -> list[dict[str, Any]]:
    decoded_messages = []
    while msg.startswith(_MESSAGE_PREFIX):
        msg = msg[len(_MESSAGE_PREFIX):]
        separator_index = msg.find(_MESSAGE_PREFIX)
        length = int(msg[:separator_index])
        decoded_messages.append(
            msg[separator_index + len(_MESSAGE_PREFIX):separator_index + len(_MESSAGE_PREFIX) + length])
        msg = msg[separator_index + len(_MESSAGE_PREFIX) + length:]

    events = []
    for m in decoded_messages:
        if m.startswith("~h~"):
            await _send(socket, m)
        if m.startswith("{"):
            events.append(json.loads(m))
    return events


async def _init(socket: ClientConnection, tickers: list[str], data: dict[str, Any],
                mode: Literal["quote", "bar", "all"] = "all"):
    qs_session = _gen_session_id("qs")
    cs_session = _gen_session_id("cs")
    keys = {f"sds_sym_{i + 1}": {"t": tickers[i], "i": i + 1} for i in range(len(tickers))}

    # Store the key of the symbol that is completed
    data['quotes'] = {}
    data['bars'] = {}
    data['bar_completed'] = 0
    data['bar_started'] = []
    data['quote_completed'] = 0
    data['qs'] = qs_session
    data['cs'] = cs_session
    data['keys'] = keys

    await _send(socket, {"m": "set_auth_token", "p": ["unauthorized_user_token"]})
    await _send(socket, {"m": "set_locale", "p": ["en", "IN"]})

    if mode == "quote":
        data['bar_completed'] = len(tickers)

    if mode == "bar":
        data['quote_completed'] = len(tickers)

    if mode == "all" or mode == "quote":
        await _send(socket, {"m": "quote_create_session", "p": [qs_session]})
        await _send(socket, {"m": "quote_add_symbols", "p": [qs_session, *tickers]})
    if mode == "all" or mode == "bar":
        await _send(socket, {"m": "chart_create_session", "p": [cs_session, ""]})
        await _send(socket, {"m": "switch_timezone", "p": [cs_session, "Asia/Kolkata"]})
        resolve_request = []
        for symbol_key in keys:
            meta = keys[symbol_key]
            ticker = meta['t']
            p = json.dumps({"adjustment": "splits", "currency-id": "INR", "symbol": ticker})
            request = {"m": "resolve_symbol", "p": [cs_session, symbol_key, f'={p}']}
            resolve_request.append(request)
        await _send(socket, resolve_request)


async def _end(socket: ClientConnection):
    await socket.close()


async def _send(socket: ClientConnection, data: str | dict[str, Any] | list[dict[str, Any]]):
    message = _encode(data)
    await socket.send(message)


def _gen_session_id(prefix: str):
    characters = ascii_letters + digits  # A-Z, a-z, 0-9
    random_string = ''.join(choices(characters, k=12))
    return f"{prefix}_{random_string}"


async def _process_data(socket: ClientConnection, tickers: list[str], message: str | bytes, data: dict[str, Any]):
    events = await _decode(socket, message)

    for event in events:
        event_type = event.get("m")
        if event_type == "qsd":
            data['quotes'] = _on_qsd_event(event, data)
        if event_type == "quote_completed":
            completed_count = data['quote_completed']
            data["quote_completed"] = completed_count + 1
        if event_type == "symbol_resolved":
            await _on_symbol_resolved(socket, data)
        if event_type == "timescale_update":
            await _on_timescale_update(event, data)
        if event_type == "series_completed":
            await _on_series_completed(socket, tickers, data)

    return data.get("quote_completed", 0) == len(tickers) and data.get("bar_completed", 0) == len(tickers)


def _on_qsd_event(event: dict[str, Any], data: dict[str, Any]) -> dict[str, Any]:
    quotes = data.get("quotes", {})

    q: dict = event.get("p")[1]
    ticker = q.get("n")

    if q.get("v") is None:
        return quotes

    # Update Quote
    ticker_quote = quotes.get(ticker, {})
    q_data: dict = q.get("v")
    quotes[ticker] = ticker_quote | q_data

    return quotes


async def _on_symbol_resolved(socket: ClientConnection, data: dict[str, Any]):
    symbol_resolve_count = data.get("symbol_resolve_count", 0)
    symbol_resolve_count = symbol_resolve_count + 1
    data["symbol_resolve_count"] = symbol_resolve_count

    keys = data['keys']
    ticker_count = len(keys.keys())
    if symbol_resolve_count != ticker_count:
        # All symbol not yet resolved
        return

    bar_started = data['bar_started']
    to_start = list(set(keys.keys()) - set(bar_started))
    if len(to_start) == 0:
        return

    # Start with the first pending
    cs = data["cs"]
    symbol_key = to_start[0]
    series_id = f"s{keys[symbol_key]['i']}"

    # Request data
    await _send(socket, {"m": "create_series", "p": [cs, "sds_1", series_id, symbol_key, "1D", 5500]})
    bar_started.append(symbol_key)


async def _on_timescale_update(event: dict[str, Any], data: dict[str, Any]):
    p: dict[str, Any] = event.get("p")[1]
    series = p.get('sds_1')
    if series is None or series.get("s") is None:
        print("Series is missing", event)
        return

    # Day Data
    d = list(map(lambda s: s['v'], series.get("s")))

    keys = data['keys']
    bar_started = data['bar_started']
    bars = data['bars']

    # Mark the bars to loaded
    last_bar_key = bar_started[-1]
    ticker = keys[last_bar_key]['t']

    bar = bars.get(ticker, [])
    bars[ticker] = d + bar


async def _on_series_completed(socket: ClientConnection, ticker: list[str], data: dict[str, Any]):
    cs = data["cs"]
    keys = data['keys']
    bar_started = data['bar_started']
    bar_completed = data['bar_completed'] + 1
    data['bar_completed'] = bar_completed

    if bar_completed == len(ticker):
        return

    pending = list(set(keys.keys()) - set(bar_started))
    if len(pending) == 0:
        return

    symbol_key = pending[0]
    meta = keys[symbol_key]
    series_id = f"s{meta['i']}"

    await _send(socket, {"m": "modify_series", "p": [cs, "sds_1", series_id, symbol_key, "1D", ""]})
    bar_started.append(symbol_key)

In [5]:
def get_all_symbols(market: str):
    indexes = {
        "india": [
            "NSE:NIFTY", "NSE:NIFTYJR", "NSE:CNX500", "NSE:BANKNIFTY", "NSE:CNXFINANCE", "NSE:CNXIT",
            "NSE:CNXAUTO", "NSE:CNXPHARMA", "NSE:CNXPSUBANK", "NSE:CNXMETAL", "NSE:CNXFMCG", "NSE:CNXREALTY",
            "NSE:CNXMEDIA", "NSE:CNXINFRA", "NSE:NIFTYPVTBANK", "NSE:NIFTY_OIL_AND_GAS", "NSE:NIFTY_HEALTHCARE",
            "NSE:NIFTY_CONSR_DURBL", "NSE:CNX200", "NSE:NIFTY_MID_SELECT",
            "NSE:CNXSMALLCAP", "NSE:CNXMIDCAP", "NSE:CNXENERGY", "NSE:NIFTYMIDCAP50", "NSE:NIFTYSMLCAP250",
            "NSE:CNXPSE", "NSE:NIFTYMIDSML400", "NSE:NIFTYMIDCAP150", "NSE:CNXCONSUMPTION", "NSE:CNXCOMMODITIES",
            "NSE:NIFTY_MICROCAP250", "NSE:CPSE", "NSE:CNXSERVICE", "NSE:CNXMNC", "NSE:CNX100", "NSE:NIFTYALPHA50",
            "NSE:NIFTY_TOTAL_MKT", "NSE:NIFTY_INDIA_MFG",
            "NSE:NIFTY_IND_DIGITAL", "NSE:NIFTY_LARGEMID250", "BSE:SENSEX",
        ]
    }

    url = f"https://scanner.tradingview.com/{market}/scan"
    payload = {
        "columns": [],
        "filter": [
            {
                "left": "is_primary",
                "operation": "equal",
                "right": True
            }
        ],
        "sort": {
            "sortBy": "market_cap_basic",
            "sortOrder": "desc"
        },
    }
    headers = {'Content-Type': 'text/plain'}

    r = requests.request("POST", url, headers=headers, data=json.dumps(payload))
    r.raise_for_status()
    data = r.json()['data']  # [{'s': 'NYSE:HKD', 'd': []}, {'s': 'NASDAQ:ALTY', 'd': []}...]
    return indexes[market] + [i['s'] for i in data]


In [None]:
t = get_all_symbols("india")
print("Ticker fetched")

In [None]:
quotes, bars = await download(t)

In [8]:
len(quotes.keys())

100

In [9]:
q = to_quote_df(quotes)

Generating Quote DataFrames...
Generated Quote DataFrames...


In [10]:
daily_candles = to_bars_df(bars)

Generating Bar DataFrames...
Generated Bar DataFrames...


## Extract

### Utils

In [None]:
# Function to process each row
def calculate_surprise(value):
    if isinstance(value, list):  # Only process if the value is a list of dicts
        # Create a temporary DataFrame
        temp_df = pd.DataFrame(value)
        # Ensure the required keys are present
        if 'Estimate' not in temp_df.columns:
            temp_df['Estimate'] = None
        if 'Actual' not in temp_df.columns:
            temp_df['Actual'] = None
        # Add the Surprise column
        temp_df['Surprise'] = (temp_df['Actual'] - temp_df['Estimate']) / temp_df['Estimate'] * 100
        # Convert back to a list of dicts
        temp_df = temp_df.where(pd.notnull(temp_df), None)
        temp_df = temp_df.replace([np.nan, np.inf, -np.inf], None)

        return temp_df.to_dict(orient='records')
    elif pd.isna(value):  # Replace NaN with an empty list
        return []
    else:
        return value  # Return as-is for other types


def get_surprise_df(s: pd.Series, n: int, prefix: str, index: pd.Series):
    # Extract the latest n Surprise values and create individual columns
    def extract_latest_surprise(value):
        if isinstance(value, list):
            # Extract the Surprise values, reverse the order for latest first
            surprise_values = [d.get('Surprise', None) for d in value if d.get("IsReported", False)][-n:]
            # Fill the list to ensure exactly `n` values
            return list(reversed(surprise_values + [None] * (n - len(surprise_values))))
        return [None] * n  # Return empty if not a list

    surprise_columns = [f"{prefix}_{i}" for i in range(n)]
    surprises_values = s.map(lambda x: extract_latest_surprise(x)).tolist()
    return pd.DataFrame(surprises_values, index=index, columns=surprise_columns, dtype='float32')


def flatten_list(s: pd.Series, n: int, prefix: str):
    cols = {}
    s_normalized = s.apply(lambda x: x if isinstance(x, list) else []).astype(object)
    cols[f'{prefix}_h'] = s_normalized

    for i in range(n):
        col = f'{prefix}_{i}'
        cols[col] = s_normalized.map(lambda x: x[i] if i < len(x) else None).astype(float)

    return pd.DataFrame(cols)


def growth(d: pd.DataFrame, comp: str, n: int, name: str, period: int = 1):
    cols = {}
    for i in range(n - period):
        curr_col = f"{comp}_{i}"
        prev_col = f"{comp}_{i + period}"
        col = f"{name}_{i}"
        cols[col] = ((d[curr_col] - d[prev_col]) / d[prev_col] * 100).astype('float32')
    return pd.DataFrame(cols)


def forecast_growth(d: pd.DataFrame, l_rep: str, comp: str, n: int, name: str, loopback: int = 1):
    cols = {}
    for i in range(min(n - loopback, 4)):
        curr_col = f"{comp}_{i}"
        prev_col = l_rep if i == 0 else f"{comp}_{i - 1}"
        col = f"{name}_{i}"
        cols[col] = ((d[curr_col] - d[prev_col]) / d[prev_col] * 100).astype('float32')
    return pd.DataFrame(cols)


def growth_avg(n: int, comp: str, name: str):
    cols = {}
    for i in range(2, min(4, n)):
        needed_cols = [f"{comp}_{r}" for r in range(i)]
        col = f"{name}_{i}"
        cols[col] = df[needed_cols].mean(axis=1, skipna=True)
    return pd.DataFrame(cols)


def to_datetime(series: pd.Series, unit='s', utc=True):
    return pd.to_datetime(series.astype('Int64'), unit=unit)


def to_weekly_candles(d: pd.DataFrame):
    # TODO: Try with .resample('W-MON', label='left', closed='left')

    d = d.copy()
    # Step 1: Adjust the timestamp index to the start of the week (Monday 12:00 AM UTC)
    d["Week_Start"] = d.index.to_period("W").start_time

    # Step 2: Group by the week start
    w: pd.DataFrame = (d.groupby("Week_Start").agg({
        'open': 'first',
        'high': 'max',
        'low': 'min',
        'close': 'last',
        'volume': 'sum'
    }).reset_index())

    w = w.rename(columns={'Week_Start': 'timestamp'}).set_index('timestamp')
    return w


def to_monthly_candles(d: pd.DataFrame):
    d = d.copy()
    # Step 1: Adjust the timestamp index to the start of the week (Monday 12:00 AM UTC)
    d["Month_Start"] = d.index.to_period("M").start_time

    # Step 2: Group by the week start
    m = (d.groupby("Month_Start").agg({
        'open': 'first',
        'high': 'max',
        'low': 'min',
        'close': 'last',
        'volume': 'sum'
    }).reset_index())

    m = m.rename(columns={'Month_Start': 'timestamp'}).set_index('timestamp')
    return m


def to_yearly_candles(m: pd.DataFrame):
    return m.resample('YS').agg({
        'open': 'first',  # First Open in the year
        'high': 'max',  # Maximum High in the year
        'low': 'min',  # Minimum Low in the year
        'close': 'last',  # Last Close in the year
        'volume': 'sum'  # Sum of Volume in the year
    })

## Fundamental

In [None]:
fq_count = 12
fy_count = 4
today = pd.Timestamp.today()
df = pd.DataFrame()
df['ticker'] = q.index.astype(str)
df = df.set_index(['ticker'])
######################################## ~~~START~~~~ ########################################


########################### ~~~General~~~~ ###########################
df['isin'] = q['isin'].astype(str)
df['name'] = q['short_name'].astype(str)
df['is_primary_listing'] = q['is-primary-listing'].astype(bool)
df['logo'] = q['logoid'].astype(str)
df['description'] = q['description'].astype(str)
df['type'] = q['type'].astype('category')
df['exchange'] = (q['source-id'].fillna(q['exchange']).fillna(q['exchange-listed']).astype('category'))
df['exchange_logo'] = (q['source-logoid'].astype('category'))
df['exchange_logo'] = df["exchange_logo"].where(
    df["exchange_logo"].notna(),
    df.groupby("exchange", observed=True)["exchange_logo"].transform("first")
)
df['timezone'] = q['timezone'].astype('category')
df['currency'] = (q['currency-id']
                  .fillna(q['currency_id'])
                  .fillna(q['currency_code'])
                  .fillna(q['currency_fund'])
                  .fillna(q['currency'])
                  .astype('category'))
df['currency_logo'] = q['currency-logoid']
df['fundamental_currency'] = (q['fundamental_currency_code'].fillna(df['currency']).astype('category'))
df['subsessions'] = q['subsessions']
df['session_holidays'] = q['session_holidays']
########################### ~~~General~~~~ ###########################


########################### ~~~Fiscal~~~~ ###########################
df['fiscal_period_fy'] = q['fiscal_period_fy'].astype('category')
df['fiscal_period_end_fy'] = to_datetime(q['fiscal_period_end_fy'])
df['fiscal_period_fq'] = q['fiscal_period_fq'].astype('category')
df['fiscal_period_end_fq'] = to_datetime(q['fiscal_period_end_fq'])
df['fiscal_period_fy_h'] = q['fiscal_period_fy_h']
df['fiscal_period_end_fy_h'] = q['fiscal_period_end_fy_h']
df['fiscal_period_fq_h'] = q['fiscal_period_fq_h']
df['fiscal_period_end_fq_h'] = q['fiscal_period_end_fq_h']
########################### ~~~Fiscal~~~~ ###########################


########################### ~~~Company~~~~ ###########################
df['fundamental_data'] = q['fundamental_data']
df['sector'] = q['sector'].astype('category')
df['group'] = q['group'].astype('category')
df['industry'] = q['industry'].astype('category')
df['sub_industry'] = None
df['sub_industry'] = df["sub_industry"].astype('category')
df['logo'] = q['logoid'].astype(str)
df['ceo'] = q['ceo'].astype(str)
df['website'] = q['web_site_url'].astype(str)
df['country'] = q['country'].astype('category')
df['location'] = q['location'].astype(str)
df['country_code'] = q['country_code'].astype('category')
df['employees'] = q['number_of_employees'].astype('Int64')
df['business_description'] = q['business_description'].astype(str)
df['ipo_date'] = to_datetime(q['first_bar_time_1d'])
df['most_recent_split'] = to_datetime(q['split_last_date'])
df['mcap'] = q['market_cap_basic'].astype(float)
df['shares_float'] = q['float_shares_outstanding'].astype(float)
df['total_shares_outstanding'] = q['total_shares_outstanding'].astype(float)
########################### ~~~Company~~~~ ###########################


########################### ~~~Revenue~~~~ ###########################
df['price_revenue_ttm'] = q['price_revenue_ttm']
df['revenue_action_fq_h'] = q['revenues_fq_h'].map(calculate_surprise)
df['revenue_action_fy_h'] = q['revenues_fy_h'].map(calculate_surprise)
# Add Surprise FQ
df = pd.concat([df, get_surprise_df(df['revenue_action_fq_h'], fq_count, prefix="revenue_surprise_fq", index=df.index)],
               axis=1)
# Add Surprise FY
df = pd.concat([df, get_surprise_df(df['revenue_action_fy_h'], fy_count, prefix="revenue_surprise_fy", index=df.index)],
               axis=1)
# Reported Revenue FQ
df = pd.concat([df, flatten_list(q['revenue_fq_h'], fq_count, prefix="revenue_fq")], axis=1)
# Reported Revenue FY
df = pd.concat([df, flatten_list(q['revenue_fy_h'], fy_count, prefix="revenue_fy")], axis=1)
# Forecasted Revenue FQ
df = pd.concat([df, flatten_list(q['revenue_forecast_fq_h'], fq_count, prefix="revenue_forecast_fq")], axis=1)
# Forecasted Revenue FY
df = pd.concat([df, flatten_list(q['revenue_forecast_fy_h'], fy_count, prefix="revenue_forecast_fy")], axis=1)
# Reported Revenue Growth FQ
df = pd.concat([df, growth(df, comp='revenue_fq', n=fq_count, name='revenue_growth_fq')], axis=1)
# Reported Revenue Growth YOY FQ
df = pd.concat([df, growth(df, comp='revenue_fq', n=fq_count, name='revenue_growth_yoy_fq', period=4)], axis=1)
# Reported Revenue Growth FY
df = pd.concat([df, growth(df, comp='revenue_fy', n=fy_count, name='revenue_growth_fy')], axis=1)
# Report Average Revenue Growth FQ
df = pd.concat([df, growth_avg(n=fq_count, comp='revenue_growth_fq', name='revenue_avg_growth_fq')], axis=1)
# Report Average Revenue Growth FY
df = pd.concat([df, growth_avg(n=fy_count, comp='revenue_growth_fy', name='revenue_avg_growth_fy')], axis=1)
# Forecast Revenue Growth FQ
df = pd.concat([
    df,
    forecast_growth(df, n=fq_count, l_rep='revenue_fq_0', comp='revenue_forecast_fq', name='revenue_forecast_growth_fq')
], axis=1)
# Forecast Revenue Growth FY
df = pd.concat([
    df,
    forecast_growth(df, n=fy_count, l_rep='revenue_fy_0', comp='revenue_forecast_fy', name='revenue_forecast_growth_fy')
], axis=1)
# Forecast Average Revenue Growth FQ
df = pd.concat([
    df,
    growth_avg(n=fq_count, comp='revenue_forecast_growth_fq', name='revenue_forecast_growth_avg_fq')
], axis=1)
# Forecast Average Revenue Growth FY
df = pd.concat([
    df,
    growth_avg(n=fy_count, comp='revenue_forecast_growth_fy', name='revenue_forecast_growth_avg_fy')
], axis=1)
########################### ~~~Revenue~~~~ ###########################

########################### ~~~Earning~~~~ ###########################
#All Earning Release
df['earnings_release_date_fq_h'] = q['earnings_release_date_fq_h']  #ABS Date
df['earnings_release_date_fy_h'] = q['earnings_release_date_fy_h']  #ABD Date
df['earnings_fiscal_period_fq_h'] = q['earnings_fiscal_period_fq_h']  # Period
df['earnings_fiscal_period_fy_h'] = q['earnings_fiscal_period_fy_h']  # Period

# Latest Earning Date
df['earnings_release_date'] = to_datetime(q['earnings_release_date'])
df['earnings_release_time'] = q['earnings_release_time'] == 1
# Latest FQ Earning Release Date
df['earnings_release_date_fq'] = to_datetime(q['earnings_release_date_fq'])
df['earnings_release_time_fq'] = q['earnings_release_time_fq'] == 1
# Latest FQ Earning Release Date
df['earnings_release_date_fy'] = to_datetime(q['earnings_release_date_fy'])
# Next Latest Earning Date
df['earnings_release_next_date'] = to_datetime(q['earnings_release_next_date'])
df['earnings_release_next_time'] = q['earnings_release_next_time'] == 1
# Next FQ Earning Release Date
df['earnings_release_next_date_fq'] = to_datetime(q['earnings_release_next_date_fq'])
df['earnings_release_next_time_fq'] = q['earnings_release_next_time_fq'] == 1
# Next FY Earning Release Date
df['earnings_release_next_date_fy'] = to_datetime(q['earnings_release_next_date_fy'])
# Latest Earning Release Trading Date
df['earnings_release_trading_date_fq'] = to_datetime(q['earnings_release_trading_date_fq'])
df['earnings_release_trading_date_fy'] = to_datetime(q['earnings_release_trading_date_fy'])
df['earnings_release_trading_date'] = df[['earnings_release_trading_date_fq', 'earnings_release_trading_date_fy']].max(
    axis=1)
# Latest Next Earning Release Trading Date
df['earnings_release_next_trading_date_fq'] = to_datetime(q['earnings_release_next_trading_date_fq'])
df['earnings_release_next_trading_date_fy'] = to_datetime(q['earnings_release_next_trading_date_fy'])
df['earnings_release_next_trading_date'] = df[
    ['earnings_release_next_trading_date_fq', 'earnings_release_next_trading_date_fy']].min(axis=1)

df['earning_action_fq_h'] = q['earnings_fq_h'].map(calculate_surprise)
df['earning_action_fy_h'] = q['earnings_fy_h'].map(calculate_surprise)
df['earnings_per_share_ttm'] = q['earnings_per_share_ttm'].astype(float)

# Add Surprise FQ
df = pd.concat([df, get_surprise_df(df['earning_action_fq_h'], fq_count, prefix="earning_surprise_fq", index=df.index)],
               axis=1)
# Add Surprise FY
df = pd.concat([df, get_surprise_df(df['earning_action_fy_h'], fy_count, prefix="earning_surprise_fy", index=df.index)],
               axis=1)
# Reported Earning FQ
df = pd.concat([df, flatten_list(q['earnings_per_share_diluted_fq_h'], fq_count, prefix="eps_fq")], axis=1)
# Reported Earning FY
df = pd.concat([df, flatten_list(q['earnings_per_share_diluted_fy_h'], fy_count, prefix="eps_fy")], axis=1)
# Reported Earning TTM
df = pd.concat([df, flatten_list(q['earnings_per_share_diluted_ttm_h'], fy_count, prefix="eps_ttm")], axis=1)
# Estimated Earning FQ
df = pd.concat([df, flatten_list(q['earnings_per_share_forecast_fq_h'], fq_count, prefix="eps_estimated_fq")], axis=1)
# Estimated Earning FY
df = pd.concat([df, flatten_list(q['earnings_per_share_forecast_fy_h'], fy_count, prefix="eps_estimated_fy")], axis=1)
# Reported Earning Growth FQ
df = pd.concat([df, growth(df, comp='eps_fq', n=fq_count, name='eps_growth_fq')], axis=1)
# Reported Earning Growth YOY FQ
df = pd.concat([df, growth(df, comp='eps_fq', n=fq_count, name='eps_growth_yoy_fq', period=4)], axis=1)
# Reported Earning Growth FY
df = pd.concat([df, growth(df, comp='eps_fy', n=fy_count, name='eps_growth_fy')], axis=1)
# Reported Earning Growth TTM
df = pd.concat([df, growth(df, comp='eps_ttm', n=fy_count, name='eps_growth_ttm')], axis=1)
# Reported Average Revenue Growth FQ
df = pd.concat([df, growth_avg(n=fq_count, comp='eps_growth_fq', name='eps_avg_growth_fq')], axis=1)
# Report Average Revenue Growth FY
df = pd.concat([df, growth_avg(n=fy_count, comp='eps_growth_fy', name='eps_avg_growth_fy')], axis=1)
# Estimated Earning Growth FQ
df = pd.concat([df, forecast_growth(df, n=fq_count, l_rep='eps_fq_0', comp='eps_estimated_fq',
                                    name='eps_estimated_growth_fq')], axis=1)
# Estimated Earning Growth FY
df = pd.concat([df, forecast_growth(df, n=fy_count, l_rep='eps_fy_0', comp='eps_estimated_fy',
                                    name='eps_estimated_growth_fy')], axis=1)
# Estimated Earning Growth Average FQ
df = pd.concat([df, growth_avg(n=fq_count, comp='eps_estimated_growth_fq', name='eps_estimated_growth_avg_fq')], axis=1)
# Estimated Earning Growth Average FY
df = pd.concat([df, growth_avg(n=fy_count, comp='eps_estimated_growth_fy', name='eps_estimated_growth_avg_fy')], axis=1)
# Net Income TTM
df = pd.concat([df, flatten_list(q['net_income_ttm_h'], fq_count, prefix="net_income_ttm")], axis=1)
# Net Income FQ
df = pd.concat([df, flatten_list(q['net_income_fq_h'], fq_count, prefix="net_income_fq")], axis=1)
# Net Income FY
df = pd.concat([df, flatten_list(q['net_income_fy_h'], fq_count, prefix="net_income_fy")], axis=1)
########################### ~~~Earning~~~~ ###########################

########################### ~~~Balance Sheet~~~~ ###########################
# Total Asset FQ
df = pd.concat([df, flatten_list(q['total_assets_fq_h'], fq_count, prefix="total_assets_fq")], axis=1)

# Total Liability FY
df = pd.concat([df, flatten_list(q['total_liabilities_fq_h'], fq_count, prefix="total_liabilities_fq")], axis=1)

# Total Asset FY
df = pd.concat([df, flatten_list(q['total_assets_fy_h'], fq_count, prefix="total_assets_fy")], axis=1)

# Total Liability FY
df = pd.concat([df, flatten_list(q['total_liabilities_fy_h'], fq_count, prefix="total_liabilities_fy")], axis=1)

########################### ~~~Dividend~~~~ ###########################

df['dividend_amount'] = q['dividend_amount_recent'].astype(float)
df['divided_ex_date'] = to_datetime(q['dividend_ex_date_recent'])
df['divided_payment_date'] = to_datetime(q['dividend_payment_date_recent'])
df['dividend_yield'] = q['dividends_yield_current'].astype('float16')
df = pd.concat([df, flatten_list(q['dividends_yield_fy_h'], fq_count, prefix="dividends_yield_fy")], axis=1)
df = pd.concat([df, flatten_list(q['dividend_payout_ratio_fq_h'], fq_count, prefix="dividend_payout_ratio_fq")], axis=1)
df = pd.concat([df, flatten_list(q['dividend_payout_ratio_fy_h'], fq_count, prefix="dividend_payout_ratio_fy")], axis=1)

########################### ~~~Dividend~~~~ ###########################

######################## ~~~Extra~~~~ ########################
df['price_earnings_ttm'] = q['price_earnings_ttm']
df['price_revenue_ttm'] = q['price_revenue_ttm']
df['price_sales_ttm'] = q['price_sales_ttm']
df['price_earnings_growth_ttm'] = q['price_earnings_growth_ttm']
df['current_ratio'] = q['current_ratio']
df['price_earnings_run_rate'] = q['lp'] / q['earnings_per_share_fq'] * 4
df['forward_price_earnings'] = q['lp'] / q['earnings_per_share_forecast_fy']

df = pd.concat([df, flatten_list(q['price_earnings_fy_h'], fq_count, prefix="price_earnings_fy")], axis=1)
df = pd.concat([df, flatten_list(q['price_earnings_fq_h'], fq_count, prefix="price_earnings_fq")], axis=1)
df = pd.concat([df, flatten_list(q['current_ratio_fq_h'], fq_count, prefix="current_ratio_fq")], axis=1)
df = pd.concat([df, flatten_list(q['current_ratio_fy_h'], fq_count, prefix="current_ratio_fy")], axis=1)
df = pd.concat([df, flatten_list(q['debt_to_equity_fq_h'], fq_count, prefix="debt_to_equity_fq")], axis=1)
df = pd.concat([df, flatten_list(q['debt_to_equity_fy_h'], fq_count, prefix="debt_to_equity_fy")], axis=1)
df = pd.concat([df, flatten_list(q['price_book_fq_h'], fq_count, prefix="price_book_fq")], axis=1)
df = pd.concat([df, flatten_list(q['price_book_fy_h'], fq_count, prefix="price_book_fy")], axis=1)
# TODO Fix: Causing Parquet file serialization error
# df = pd.concat([df, flatten_list(q['price_sales_fq_h'], fq_count, prefix="price_sales_fq")], axis=1)
df = pd.concat([df, flatten_list(q['price_sales_fy_h'], fq_count, prefix="price_sales_fy")], axis=1)
df = pd.concat([df, flatten_list(q['ebit_fq_h'], fq_count, prefix="ebit_fq")], axis=1)
df = pd.concat([df, flatten_list(q['ebit_fy_h'], fy_count, prefix="ebit_fy")], axis=1)
df = pd.concat([df, flatten_list(q['ebitda_fq_h'], fq_count, prefix="ebitda_fq")], axis=1)
df = pd.concat([df, flatten_list(q['ebitda_fy_h'], fy_count, prefix="ebitda_fy")], axis=1)

## Technical

In [None]:
def get_latest(value, index: int = -1):
    if value is None:
        return np.nan
    if isinstance(value, pd.Series):
        if index < 0 and abs(index) > len(value):
            return np.nan
        if index >= 0 and index >= len(value):
            return np.nan
        return value.iloc[index]
    return value


def ohlcv(candle: pd.DataFrame, name: str, shift: int = 0) -> dict[str, pd.Series]:
    return {
        f"{name}_open": candle.open if shift == 0 else candle.open.shift(shift),
        f"{name}_high": candle.high if shift == 0 else candle.high.shift(shift),
        f"{name}_low": candle.low if shift == 0 else candle.low.shift(shift),
        f"{name}_close": candle.close if shift == 0 else candle.close.shift(shift),
        f"{name}_volume": candle.volume if shift == 0 else candle.volume.shift(shift),
    }


def vwap(candle: pd.DataFrame, name: str) -> dict[str, pd.Series]:
    v = (candle.high + candle.low + candle.close) / 3
    away = (candle.close - v) / v * 100
    return {
        f'{name}_vwap': v,
        f'away_from_{name}_vwap_pct': away,
        f'price_above_{name}_vwap': candle.close > v,
    }


def price_change_close(candle: pd.DataFrame, periods: list[int] | range, name: str) -> dict[str, pd.Series]:
    return {
        f"price_change_pct_{i}{name}": (candle.close.pct_change(periods=i, fill_method=None) * 100) for i in periods
    }


def sma(series: pd.Series, periods: list[int] | range, name: str, freq: str, compare=False, relative=False,
        run_rate=False) -> dict[
    str, pd.Series]:
    def to_key(i: int):
        return f"{name}_sma_{i}{freq}"

    def to_compare_key(i: int):
        return f"{name}_vs_{name}_sma_{i}{freq}"

    def to_relative_key(i: int):
        return f"relative_{name}_{i}{freq}"

    def to_run_rate(i: int):
        return f"run_rate_{name}_{i}{freq}"

    cols = {
        to_key(i): series.rolling(i).mean() for i in periods
    }

    if compare:
        cols = cols | {
            to_compare_key(i): (series - cols[to_key(i)]) / cols[to_key(i)] * 100 for i in periods
        }

    if relative:
        cols = cols | {
            to_relative_key(i): series / cols[to_key(i)] for i in periods
        }

    if run_rate:
        cols = cols | {
            to_run_rate(i): series / cols[to_key(i)] * 100 for i in periods
        }

    return cols


def ema(series: pd.Series, periods: list[int] | range, name: str, freq: str, compare=False) -> dict[str, pd.Series]:
    return {
        f"{name}_ema_{i}{freq}": series.ewm(i).mean() for i in periods
    }


def up_down(candle: pd.DataFrame, freq: str, period: list[int]) -> dict[str, pd.Series]:
    cols: dict[str, pd.Series] = {}
    for i in period:
        c = candle.tail(i)
        change = c.close - c.open
        up = c[change > 0].volume.sum()
        down = c[change < 0].volume.sum()
        cols[f"up_down_day_{i}{freq}"] = up / down if down != 0 else np.nan

    return cols


def gap(candle: pd.DataFrame, freq: str):
    prev_close = candle.close.shift(1)
    gap_dollar = candle.open - prev_close
    gap_pct = gap_dollar / prev_close * 100
    unfilled_gap_dollar = ((candle.low.where(candle.low > prev_close, other=np.nan) - prev_close)
                           .where(candle.high < prev_close, candle.high - prev_close))

    unfilled_gap_pct = unfilled_gap_dollar / prev_close * 100
    return {
        f"gap_dollar_{freq}": gap_dollar,
        f"unfilled_gap_{freq}": unfilled_gap_dollar,
        f"gap_pct_{freq}": gap_pct,
        f"unfilled_gap_pct_{freq}": unfilled_gap_pct
    }


def price_action(
        d: pd.DataFrame,
        d_w52: pd.DataFrame,
        w: pd.DataFrame,
        m: pd.DataFrame,
        y: pd.DataFrame,
        days_since_earning: int,
        last_trading_day: pd.Timestamp,
        daily_periods: list[int],
        weekly_periods: list[int],
) -> dict[str, pd.Series]:
    high_52_week = d_w52.high.max()
    low_52_week = d_w52.low.min()
    all_time_high = d.high.max()
    all_time_low = d.low.min()
    earning_open = d.open.shift(days_since_earning)

    #VWAP
    cols = vwap(d, 'daily') | vwap(w, 'weekly') | vwap(m, 'monthly') | vwap(y, 'yearly')

    # Price Change
    cols = cols | price_change_close(d, [1, 2, 3, 4], 'D') | price_change_close(w, range(1, 4), 'W')
    cols = cols | price_change_close(m, range(1, 12), 'M') | price_change_close(y, range(1, 5), 'Y')

    # Price Change Comparison
    cols = cols | {"price_change_today_pct": cols['price_change_pct_1D']}
    cols = cols | {"price_change_prev_week_close_pct": cols['price_change_pct_1W'], }

    # SMA
    cols = cols | sma(d.close, daily_periods, 'price', 'D', compare=True)
    cols = cols | sma(w.close, weekly_periods, 'price', 'W', compare=True)

    #High Low
    cols = cols | {
        "high_52_week": high_52_week,
        "low_52_week": low_52_week,
        "high_52_week_today": d_w52.high.idxmax() == last_trading_day,
        "low_52_week_today": d_w52.low.idxmax() == last_trading_day,
        "away_from_52_week_high_pct": (d_w52.close - high_52_week) / high_52_week * 100,
        "away_from_52_week_low_pct": (d_w52.close - low_52_week) / low_52_week * 100,
        "all_time_high": all_time_high,
        "all_time_low": all_time_low,
        "all_time_high_today": d.high.idxmax() == last_trading_day,
        "all_time_low_today": d.low.idxmax() == last_trading_day,
        "away_from_all_time_high_pct": (d.close - all_time_high) / all_time_high * 100,
        "away_from_all_time_low_pct": (d.close - all_time_low) / all_time_low * 100,
    }

    # Recent Price Change Comparison abs
    cols = cols | {
        "price_change_today_abs": d.close - d.close.shift(1),
        "price_change_from_open_abs": d.close - d.open,
        "price_change_from_high_abs": d.close - d.high,
        "price_change_from_low_abs": d.close - d.low,
    }

    # Recent Price Change Comparison PCT
    cols = cols | {
        "price_change_from_open_pct": cols['price_change_from_open_abs'] / d.open * 100,
        "price_change_from_high_pct": cols['price_change_from_high_abs'] / d.high * 100,
        "price_change_from_low_pct": cols['price_change_from_low_abs'] / d.low * 100,
        "price_change_curr_week_open_pct": (w.close - w.open) / w.open * 100,
        "price_change_since_earning_pct": (d.close - earning_open) / earning_open * 100,
    }

    # Closing Range
    cols = cols | {
        "dcr": ((d.close - d.low) / (d.high - d.low)) * 100,
        "wcr": ((w.close - w.low) / (w.high - w.low)) * 100,
        "mcr": ((m.close - m.low) / (m.high - m.low)) * 100,
    }

    # Gaps
    cols = cols | gap(d, "D") | gap(w, "W") | gap(m, "M")

    # Up/Down
    cols = cols | up_down(d, "D", [20, 50])

    return cols


def volume_action(
        d: pd.DataFrame,
        d_w52: pd.DataFrame,
        d_since_earning: pd.DataFrame,
        w: pd.DataFrame,
        row,
        daily_periods: list[int],
        weekly_periods: list[int],
        last_trading_day
):
    cols = {
        "highest_vol_since_earning": False if len(
            d_since_earning.volume) == 0 else d_since_earning.volume.idxmax() == last_trading_day,
        "highest_vol_in_1_year": False if len(d_w52.volume) == 0 else d_w52.volume.idxmax() == last_trading_day,
        "highest_vol_ever": False if len(d.volume) == 0 else d.volume.idxmax() == last_trading_day,
        "vol_vs_yesterday_vol": d.volume.pct_change(periods=1, fill_method=None) * 100,
        "week_vol_vs_prev_week_vol": w.volume.pct_change(periods=1, fill_method=None) * 100,
    }

    # SMA
    cols = cols | sma(d.volume, daily_periods, 'vol', 'D', compare=True, relative=True, run_rate=True)
    cols = cols | sma(w.volume, weekly_periods, 'vol', 'W', compare=True, relative=True, run_rate=True)

    # Price Volume
    price_volume = d.close * d.volume
    cols = cols | {"price_volume": price_volume}
    cols = cols | sma(price_volume, daily_periods, 'price_volume', 'D')

    # Float Turnover
    total_float = row.shares_float
    float_turnover = d.volume / total_float * 100
    cols = cols | {"float_turnover": float_turnover}
    cols = cols | sma(float_turnover, daily_periods, 'float_turnover', 'D', compare=True)

    return cols


def price_compare(d: pd.DataFrame):
    prev = d.shift(1)
    prev_high = prev.high
    prev_low = prev.low
    prev_close = prev.close
    prev_open = prev.open
    return {
        "day_high_gt_prev_high": d.high > prev_high,
        "day_low_gt_prev_low": d.low > prev_low,
        "day_open_gt_prev_open": d.open > prev_open,
        "day_close_gt_prev_close": d.close > prev_close,
        "day_high_lt_prev_high": d.high < prev_high,
        "day_low_lt_prev_low": d.low < prev_low,
        "day_open_lt_prev_open": d.open < prev_open,
        "day_close_lt_prev_close": d.close < prev_close,
        "day_open_eq_high": d.open == d.high,
        "day_open_eq_low": d.open == d.low,
    }


def pocket_pivot(d: pd.DataFrame, prev: pd.DataFrame):
    price_check = d.close > prev.close
    negative_volume = d.volume.where(d.close < prev.close, 0)
    max_negative_vol_in10_days = negative_volume.rolling(window=10, min_periods=1).max()
    volume_check = d.volume > max_negative_vol_in10_days
    return price_check & volume_check


def minicoil(d: pd.DataFrame, prev: pd.DataFrame, prev_2: pd.DataFrame):
    return (
            (prev.close < prev_2.close) & (prev.low > prev_2.low)  # day2_inside
            &
            (d.high < prev_2.high) & (d.low > prev_2.low)  # day1_inside
    )


def three_week_tight(w: pd.DataFrame):
    rolling_3 = w.close.rolling(window=3)
    # Range is within 1.5%
    return ((rolling_3.max() - rolling_3.min()) / rolling_3.mean()) <= 0.015


def five_week_up(w: pd.DataFrame):
    w_close = w.close
    w_close1 = w_close.shift(1)
    w_close2 = w_close.shift(2)
    w_close3 = w_close.shift(3)
    w_close4 = w.close.shift(4)
    w_close5 = w.close.shift(5)

    return ((w_close > w_close1) &
            (w_close1 > w_close2) &
            (w_close2 > w_close3) &
            (w_close3 > w_close4) &
            (w_close4 > w_close5))


def high_tight_flag(d: pd.DataFrame):
    rolling8 = d.close.rolling(window=8)
    rolling3_high = d.high.rolling(window=3)
    rolling3_low = d.low.rolling(window=3)
    rolling_3_close = d.close.rolling(window=3)
    # 90% sharp move
    sharp_move = rolling8.max() / rolling8.min() >= 1.90
    # Tight (<= 25%) consolidation
    consolidation = (rolling3_high.max() - rolling3_low.min()) / rolling_3_close.mean() <= 0.025
    return sharp_move & consolidation


def ants(d: pd.DataFrame, prev: pd.DataFrame):
    return (
            ((d.close > prev.close).rolling(window=15).sum() >= 12)  # 12/15 days up
            &
            (d.volume > d.volume.rolling(window=15).mean())  # Increase in average volume
    )


def power_trend(d: pd.DataFrame, ema_21: pd.Series, sma_50: pd.Series):
    return (
            (d.close > ema_21)  # Close above 21
            &
            (ema_21 > sma_50)  # 21 EMA > 50 SMA
            &
            (d.close > sma_50)  # Close above 50 SMA
    )


def power_of_three(d: pd.DataFrame, ema_10: pd.Series, ema_21: pd.Series, sma_50: pd.Series):
    return (
            ta.cross(d.close, ema_10)  # Close above 10
            &
            ta.cross(d.close, ema_21)  # Close above 21
            &
            ta.cross(d.close, sma_50)  # Close above 50 SMA
    )


def launchpad_daily(ema_21: pd.Series, sma_50: pd.Series):
    # Short and long-term MAs close to each other (< 2%)
    return (ema_21 / sma_50 - 1).abs() < 0.02


def launchpad_weekly(ema_5: pd.Series, sma_10: pd.Series):
    # Short and long-term MAs close to each other (< 2%)
    return (ema_5 / sma_10 - 1).abs() < 0.02


def sigma_spike(d: pd.DataFrame):
    # Calculate daily percent change
    day_chang_pct = ta.percent_return(d.close) * 100
    # Calculate standard deviation of daily percent changes over the past 20 days
    volatility_20_days = day_chang_pct.rolling(window=20).std()
    # Calculate Sigma Spike
    return day_chang_pct / volatility_20_days


def sma_comparison(d: pd.DataFrame, sma_200: pd.Series):
    return {
        f"sma_200_vs_sma_200_{i}M_ago": sma_200 > ta.sma(d.close.shift(21 * i)) for i in range(7)
    }


def stan_weinstein_stage_analysis(d: pd.DataFrame):
    # TODO
    # Stan
    # Weinstein
    # Stages(1
    # A, 1, 2
    # A, 2, 3
    # A, 3, 4, 4
    # B -)  #
    return "-"


def sma_vs_ema_slope(d: pd.DataFrame, freq: str, period: list[int]):
    cols = {}
    for i in period:
        ma = d.close.rolling(i).mean()
        ma_shifted = ma.shift(1)
        ma_slope = (ma - ma_shifted) / ma_shifted * 100
        adr_10 = (d.high - d.low).rolling(10).mean() / d.close * 100
        ma_vs_adr_10 = ma_slope / adr_10

        cols[f"sma_{i}{freq}"] = ma
        cols[f"{i}{freq}sma_vs_ema_slope_pct"] = ma_slope
        cols[f"{i}{freq}sma_vs_ema_slope_adr"] = ma_vs_adr_10
    return cols


def relative_strength(d: pd.DataFrame, market_close: pd.Series):
    rs_day_periods = [5, 10, 15, 20, 25, 30, 60, 90]

    symbol_return = ta.percent_return(d.close)
    market_return = ta.percent_return(market_close)
    rs_day = symbol_return > market_return

    # Relative Strength
    cols = {}
    for i in rs_day_periods:
        days = rs_day.rolling(i).sum()
        cols[f"RS_{i}D"] = days
        cols[f"RS_{i}D_pct"] = days / i * 100

    rsnh_period_month = [1, 3, 6, 9, 12]
    rs_line = symbol_return / market_return
    for i in rsnh_period_month:
        # Relative Strength New High
        widow = i * 21
        rsnh = rs_line == rs_line.rolling(widow).max()
        cols[f"RSNH_{i}M"] = rsnh

        # Relative Strength New High Before Price
        stock_high = d.close == d.close.rolling(i).max()
        rsnhbp = rsnh & ~stock_high
        cols[f"RSNHBP_{i}M"] = rsnhbp

    return cols


def adr(d: pd.DataFrame):
    period = [1, 2, 5, 10, 14, 20]
    cols = {}
    for i in period:
        rng = d.high - d.low
        adr_value = rng.rolling(i).mean()
        adr_pct = adr_value / d.close * 100
        cols[f"ADR_{i}D"] = adr_value
        cols[f"ADR_pct_{i}D"] = adr_pct

    return cols


def atr(d: pd.DataFrame):
    period = [2, 5, 10, 14, 20]
    cols = {}
    for i in period:
        cols[f"ATR_{i}D"] = ta.atr(d.high, d.low, d.close, i)

    return cols


def alpha(d: pd.DataFrame, market_close: pd.Series):
    #6 Month
    market_return = ta.percent_return(market_close, 6 * 21)
    symbol_return = ta.percent_return(d.close, 6 * 21)
    cols = {
        "alpha_6M": np.nan if market_return is None or symbol_return is None else (symbol_return - market_return) * 100,
    }

    return cols


def safe_call_cdl_pattern(d: pd.DataFrame, name: str, bearish=False) -> pd.Series | bool:
    # noinspection PyBroadException
    try:
        result = ta.cdl(d.open, d.high, d.low, d.close, name=name)
        if len(result.columns) == 0:
            result = None
    except:
        result = None

    if result is None:
        return False

    series = result[result.columns[0]]
    if bearish:
        return series < 0
    return series > 0


def indicators(d: pd.DataFrame, w: pd.DataFrame, market_close: pd.Series):
    # Link: https://deepvue.com/knowledge-base/technical/
    prev = d.shift(1)
    prev_2 = d.shift(2)
    prev_w = w.shift(1)
    ema_4_high = d.high.ewm(span=4).mean()
    ema_10_close = d.close.ewm(span=10).mean()
    ema_21_close = d.close.ewm(span=21).mean()
    sma_50_close = d.close.rolling(50).mean()
    w_ema_5_close = w.close.ewm(span=5).mean()
    w_sma_10_close = w.close.rolling(10).mean()

    inside = safe_call_cdl_pattern(d, name='inside')
    inside_week = safe_call_cdl_pattern(w, name='inside')
    cols = {
        "inside_day": inside,
        "double_inside_day": inside & safe_call_cdl_pattern(prev, name='inside'),
        "inside_week": inside_week,
        "double_inside_week": inside_week & safe_call_cdl_pattern(prev_w, name='inside'),
        "outside_day": safe_call_cdl_pattern(d, name='engulfing'),
        "outside_week": safe_call_cdl_pattern(w, name='engulfing'),
        "outside_bullish_day": (d.open < prev.low) & (d.close > prev.high),
        "outside_bearish_day": (d.open > prev.high) & (d.close < prev.low),
        "outside_bullish_week": (w.open < prev_w.low) & (w.close > prev_w.high),
        "outside_bearish_week": (w.open > prev_w.high) & (w.close < prev_w.low),
        "wick_play": ((d.low > prev.open) | (d.low > prev.close)) & (d.high < prev.high),
        "in_the_wick": (d.open < prev.high) & ((d.low > prev.low) | (d.open > prev.high)),
        "3_line_strike_bullish": safe_call_cdl_pattern(d, name='3linestrike'),
        "3_line_strike_bearish": safe_call_cdl_pattern(d, name='3linestrike', bearish=True),
        "3_bar_break": d.close > prev.high.rolling(3).max(),
        "bullish_reversal": (d.low < prev.low) & d.close > prev.close,
        "upside_reversal": (d.low < prev.low) & (d.close > (d.high + d.low) / 2),
        "oops_reversal": (d.open < prev.low) & (d.close > prev.low),
        "key_reversal": (d.open < prev.low) & (d.close < prev.high),
        "pocket_pivot": pocket_pivot(d, prev),
        "volume_dry_up": d.volume == d.volume.rolling(window=10, min_periods=1).min(),
        "slingshot": (d.close > ema_4_high) & (d.close <= ema_4_high.shift(1)),
        "minicoil": minicoil(d, prev, prev_2),
        "3_week_tight": three_week_tight(w),
        "5_week_up": five_week_up(w),
        "high_tight_flag": high_tight_flag(d),
        "ants": ants(d, prev),
        "power_trend": power_trend(d, ema_21_close, sma_50_close),
        "power_of_three": power_of_three(d, ema_10_close, ema_21_close, sma_50_close),
        "launchpad": launchpad_daily(ema_21_close, sma_50_close),
        "launchpad_weekly": launchpad_weekly(w_ema_5_close, w_sma_10_close),
        #TODO: Green Line Breakout
        "doji": safe_call_cdl_pattern(d, name='doji'),
        "morning_star": safe_call_cdl_pattern(d, name='morningstar'),
        "evening_star": safe_call_cdl_pattern(d, name='eveningstar'),
        "shooting_star": safe_call_cdl_pattern(d, name='shootingstar'),
        "hammer": safe_call_cdl_pattern(d, name='hammer'),
        "inverted_hammer": safe_call_cdl_pattern(d, name='invertedhammer'),
        "bullish_harami": safe_call_cdl_pattern(d, name='harami'),
        "bearish_harami": safe_call_cdl_pattern(d, name='harami', bearish=False),
        #TODO: Bullish engulfing and bearish engulfing
        #TODO: Bullish kicker and bearish engulfing,
        "piercing_line": safe_call_cdl_pattern(d, name='piercing'),
        "hanging_man": safe_call_cdl_pattern(d, name='hangingman'),
        "dark_cloud_cover": safe_call_cdl_pattern(d, name='darkcloudcover'),
        "gravestone_doji": safe_call_cdl_pattern(d, name='gravestonedoji'),
        "3_back_crows": safe_call_cdl_pattern(d, name='3blackcrows'),
        "dragonfly_doji": safe_call_cdl_pattern(d, name='dragonflydoji'),
        "3_white_soldiers": safe_call_cdl_pattern(d, name='3whitesoldiers'),
        "sigma_spike": sigma_spike(d),
        "stan_weinstein_stage": stan_weinstein_stage_analysis(d),
    }
    return cols


def technical(d: pd.DataFrame, w: pd.DataFrame, market_close: pd.Series, row):
    cols = price_compare(d)
    sma_200_close = d.close.rolling(200).mean()
    try:
        cols = cols | indicators(d, w, market_close)
        #Alpha
        cols = cols | alpha(d, market_close)
        # SMA Comparison Months Back
        cols = cols | sma_comparison(d, sma_200_close)
        # SMA Comparison with EMA Slop
        cols = cols | sma_vs_ema_slope(d, "D", [10, 20, 30, 40, 50, 100, 200])
        # Relative Strength
        cols = cols | relative_strength(d, market_close)
        # ADR
        cols = cols | adr(d)
        # ATR
        cols = cols | atr(d)

    except Exception as e:
        print("Error in technical indicators: ", row.name)
        print(d.head())
        print(market_close.head())
        raise e
    return cols

In [None]:
def process_candles(row):
    ticker = row.name
    d = daily_candles[ticker]
    w = weekly_candles[ticker]
    m = monthly_candles[ticker]
    y = yearly_candle[ticker]
    market_close = daily_candles[market_ticker[row.exchange]].reindex(d.index).close.fillna(0)
    if d.volume.empty:
        d['volume'] = np.nan

    last_trading_day = d.index[-1]
    year_back = last_trading_day - pd.DateOffset(years=1)
    d_w52 = d[d.index > year_back]

    last_earning_date = row.earnings_release_trading_date
    d_since_earning = d[d.index >= last_earning_date]

    #Meta
    days_since_earning = len(d_since_earning)
    cols = {"days_since_latest_earning": pd.Series([days_since_earning])}

    #OHLCV
    cols = cols | ohlcv(d, name='day') | ohlcv(d, name='prev_day', shift=1)
    cols = cols | ohlcv(w, name='week') | ohlcv(w, name='prev_week', shift=1)
    cols = cols | ohlcv(m, name='month') | ohlcv(m, name='prev_month', shift=1)
    cols = cols | ohlcv(y, name='year') | ohlcv(y, name='prev_year', shift=1)

    daily_periods = [5, 10, 20, 30, 40, 50, 100, 200]
    weekly_periods = [10, 20, 30, 40, 50]

    # Price Action
    cols = cols | price_action(d, d_w52, w, m, y, days_since_earning, last_trading_day, daily_periods, weekly_periods)

    # Volume Action
    cols = cols | volume_action(d, d_w52, d_since_earning, w, row, daily_periods, weekly_periods, last_trading_day)

    # Technical
    cols = cols | technical(d, w, market_close, row)

    return {k: get_latest(v) for k, v in cols.items()}


In [None]:
######################## ~~~Technical~~~~ ########################
df['no_volume'] = q['has-no-volume']
# Beta
df['beta_1_year'] = q['beta_1_year']
df['beta_3_year'] = q['beta_3_year']
df['beta_5_year'] = q['beta_5_year']

# To Weekly, Monthly and Yearly Candle
market_ticker = {"NSE": "NSE:CNX500", "BSE": "NSE:CNX500"}
weekly_candles = {ticker: to_weekly_candles(d) for ticker, d in daily_candles.items()}
monthly_candles = {ticker: to_monthly_candles(d) for ticker, d in daily_candles.items()}
yearly_candle = {ticker: to_yearly_candles(d) for ticker, d in monthly_candles.items()}
ta_metrics_data = df.apply(process_candles, axis=1)
ta_df: pd.DataFrame = ta_metrics_data.apply(pd.Series)
df = df.join(ta_df)
######################## ~~~Technical~~~~ ########################

## Save

In [None]:
len(df)

In [None]:
from sqlalchemy import create_engine, text
from sqlalchemy.dialects.postgresql import JSONB

engine = create_engine(connection_string)

In [None]:
def detect_and_transform_json_columns(df: pd.DataFrame):
    json_columns = {}
    df: pd.DataFrame = df.copy()

    for col in df.columns:
        if df[col].dtype == 'object':  # Only process object-type columns
            # Check if any value in the column is a dict or list
            if any(isinstance(val, (dict, list)) for val in df[col]):
                # Mark the column for JSONB storage
                json_columns[col] = JSONB
        # Replace NaN with None for PostgreSQL compatibility
        df[col] = df[col].where(pd.notnull(df[col]), None)

    df = df.replace('nan', None)
    return df, json_columns

In [None]:
export_df, json_columns = detect_and_transform_json_columns(df)

In [None]:
# Reset the temp table
final_table = "symbols"

with engine.connect() as conn:
    export_df.to_sql(final_table, conn, if_exists='replace', index=True, dtype=json_columns)
    conn.execute(text(f"ALTER TABLE {final_table} ADD PRIMARY KEY (ticker)"))
    conn.commit()
    print("Table reset to symbols")