In [4]:
# !pip install finta
# https://github.com/kieran-mackle/AutoTrader/blob/main/autotrader/indicators.py

Collecting finta
  Downloading finta-1.3-py3-none-any.whl (29 kB)
Installing collected packages: finta
Successfully installed finta-1.3


In [5]:
import numpy as np
import pandas as pd
from finta import TA
from typing import Union

In [21]:
from  helpers_db import get_engine, run_sql
engine = get_engine()
ticker = "PYPL"
with engine.begin() as conn:
  data = run_sql(conn, f"SELECT * FROM ticks_1w WHERE symbol='{ticker}' AND dt::date >= '2024-01-01' ORDER BY dt")
  df = pd.DataFrame(data)
  df.rename(columns={"open": "Open", "close": "Close", "high": "High", "low": "Low"}, inplace=True)
  df.set_index("dt", inplace = True)
# print(df)


In [25]:
def candles_between_crosses(
    crosses: Union[list, pd.Series], initial_count: int = 0
) -> Union[list, pd.Series]:
    """Returns a rolling sum of candles since the last cross/signal occurred.

    Parameters
    ----------
    crosses : list | pd.Series
        The list or Series containing crossover signals.

    Returns
    -------
    counts : TYPE
        The rolling count of bars since the last crossover signal.

    See Also
    ---------
    indicators.crossover
    """

    count = 0
    counts = []

    for i in range(len(crosses)):
        if crosses[i] == 0:
            # Change in signal - reset count
            count += 1
        else:
            count = initial_count

        counts.append(count)

    if isinstance(crosses, pd.Series):
        # Convert to Series
        counts = pd.Series(data=counts, index=crosses.index, name="counts")

    return counts

def unroll_signal_list(signals: Union[list, pd.Series]) -> np.array:
    """Unrolls a rolled signal list.

    Parameters
    ----------
    signals : Union[list, pd.Series]
        DESCRIPTION.

    Returns
    -------
    unrolled_signals : np.array
        The unrolled signal series.

    See Also
    --------
    This function is the inverse of rolling_signal_list.

    Examples
    --------
    >>> unroll_signal_list([0, 1, 1, 1, 1, -1, -1, -1, 1, 1, 1])
        array([ 0.,  1.,  0.,  0.,  0., -1.,  0.,  0.,  1.,  0.,  0.])

    """
    unrolled_signals = np.zeros(len(signals))
    for i in range(1, len(signals)):
        if signals[i] != signals[i - 1]:
            unrolled_signals[i] = signals[i]

    if isinstance(signals, pd.Series):
        unrolled_signals = pd.Series(data=unrolled_signals, index=signals.index)

    return unrolled_signals

def classify_swings(swing_df: pd.DataFrame, tol: int = 0) -> pd.DataFrame:
    """Classifies a dataframe of swings (from find_swings) into higher-highs,
    lower-highs, higher-lows and lower-lows.


    Parameters
    ----------
    swing_df : pd.DataFrame
        The dataframe returned by find_swings.

    tol : int, optional
        The classification tolerance. The default is 0.

    Returns
    -------
    swing_df : pd.DataFrame
        A dataframe containing the classified swings.
    """
    # Create copy of swing dataframe
    swing_df = swing_df.copy()

    new_level = np.where(swing_df.Last != swing_df.Last.shift(), 1, 0)

    candles_since_last = candles_between_crosses(new_level, initial_count=1)

    # Add column 'candles since last swing' CSLS
    swing_df["CSLS"] = candles_since_last

    # Find strong Support and Resistance zones
    swing_df["Support"] = (swing_df.CSLS > tol) & (swing_df.Trend == 1)
    swing_df["Resistance"] = (swing_df.CSLS > tol) & (swing_df.Trend == -1)

    # Find higher highs and lower lows
    swing_df["Strong_lows"] = (
        swing_df["Support"] * swing_df["Lows"]
    )  # Returns high values when there is a strong support
    swing_df["Strong_highs"] = (
        swing_df["Resistance"] * swing_df["Highs"]
    )  # Returns high values when there is a strong support

    # Remove duplicates to preserve indexes of new levels
    swing_df["FSL"] = unroll_signal_list(
        swing_df["Strong_lows"]
    )  # First of new strong lows
    swing_df["FSH"] = unroll_signal_list(
        swing_df["Strong_highs"]
    )  # First of new strong highs

    # Now compare each non-zero value to the previous non-zero value.
    low_change = np.sign(swing_df.FSL) * (
        swing_df.FSL
        - swing_df.Strong_lows.replace(to_replace=0, method="ffill").shift()
    )
    high_change = np.sign(swing_df.FSH) * (
        swing_df.FSH
        - swing_df.Strong_highs.replace(to_replace=0, method="ffill").shift()
    )

    # the first low_change > 0.0 is not a HL
    r_hl = []
    first_valid_idx = -1
    for i in low_change.index:
        v = low_change[i]
        if first_valid_idx == -1 and not np.isnan(v) and v != 0.0:
            first_valid_idx = i
        if first_valid_idx != -1 and i > first_valid_idx and v > 0.0:
            hl = True
        else:
            hl = False
        r_hl.append(hl)

    # the first high_change < 0.0 is not a LH
    r_lh = []
    first_valid_idx = -1
    for i in high_change.index:
        v = high_change[i]
        if first_valid_idx == -1 and not np.isnan(v) and v != 0.0:
            first_valid_idx = i
        if first_valid_idx != -1 and i > first_valid_idx and v < 0.0:
            lh = True
        else:
            lh = False
        r_lh.append(lh)

    swing_df["LL"] = np.where(low_change < 0, True, False)
    # swing_df["HL"] = np.where(low_change > 0, True, False)
    swing_df["HL"] = r_hl
    swing_df["HH"] = np.where(high_change > 0, True, False)
    # swing_df["LH"] = np.where(high_change < 0, True, False)
    swing_df["LH"] = r_lh

    return swing_df

def find_swings(data: pd.DataFrame, n: int = 2) -> pd.DataFrame:
    """Locates swings in the inputted data using a moving average gradient
    method.

    Parameters
    ----------
    data : pd.DataFrame | pd.Series | list | np.array
        An OHLC dataframe of price, or an array/list/Series of data from an
        indicator (eg. RSI).

    n : int, optional
        The moving average period. The default is 2.

    Returns
    -------
    swing_df : pd.DataFrame
        A dataframe containing the swing levels detected.

    pd.Series(hl2, name="hl2"),
    """
    # Prepare data
    if isinstance(data, pd.DataFrame):
        # OHLC data
        hl2 = (data["High"].values + data["Low"].values) / 2
        swing_data = pd.Series(ema(hl2, n), index=data.index)
        low_data = data["Low"].values
        high_data = data["High"].values

    elif isinstance(data, pd.Series):
        # Pandas series data
        swing_data = pd.Series(ema(data.fillna(0), n), index=data.index)
        low_data = data
        high_data = data

    else:
        # Find swings in alternative data source
        data = pd.Series(data)

        # Define swing data
        swing_data = pd.Series(ema(data, n), index=data.index)
        low_data = data
        high_data = data

    signed_grad = np.sign((swing_data - swing_data.shift(1)).bfill())
    swings = (signed_grad != signed_grad.shift(1).bfill()) * -signed_grad

    # Calculate swing extrema
    lows = []
    highs = []
    for i, swing in enumerate(swings):
        if swing < 0:
            # Down swing, find low price
            highs.append(0)
            lows.append(min(low_data[i - n + 1 : i + 1]))
        elif swing > 0:
            # Up swing, find high price
            highs.append(max(high_data[i - n + 1 : i + 1]))
            lows.append(0)
        else:
            # Price movement
            highs.append(0)
            lows.append(0)

    # Determine last swing
    trend = rolling_signal_list(-swings)
    swings_list = merge_signals(lows, highs)
    last_swing = rolling_signal_list(swings_list)

    # Need to return both a last swing low and last swing high list
    last_low = rolling_signal_list(lows)
    last_high = rolling_signal_list(highs)

    swing_df = pd.DataFrame(
        data={"Highs": last_high, "Lows": last_low, "Last": last_swing, "Trend": trend},
        index=swing_data.index,
    )

    return swing_df

def ema(data: pd.DataFrame, period: int = 14, smoothing: int = 2) -> list:
    """Exponential Moving Average."""
    ema = [sum(data[:period]) / period]
    for price in data[period:]:
        ema.append(
            (price * (smoothing / (1 + period)))
            + ema[-1] * (1 - (smoothing / (1 + period)))
        )
    for i in range(period - 1):
        ema.insert(0, np.nan)
    return ema

def rolling_signal_list(signals: Union[list, pd.Series]) -> list:
    """Returns a list which repeats the previous signal, until a new
    signal is given.

    Parameters
    ----------
    signals : list | pd.Series
        A series of signals. Zero values are treated as 'no signal'.

    Returns
    -------
    list
        A list of rolled signals.

    Examples
    --------
    >>> rolling_signal_list([0,1,0,0,0,-1,0,0,1,0,0])
        [0, 1, 1, 1, 1, -1, -1, -1, 1, 1, 1]

    """
    rolling_signals = [0]
    last_signal = rolling_signals[0]

    if isinstance(signals, list):
        for i in range(1, len(signals)):
            if signals[i] != 0:
                last_signal = signals[i]
            rolling_signals.append(last_signal)
    else:
        for i in range(1, len(signals)):
            if signals.iloc[i] != 0:
                last_signal = signals.iloc[i]
            rolling_signals.append(last_signal)

    if isinstance(signals, pd.Series):
        rolling_signals = pd.Series(data=rolling_signals, index=signals.index)

    return rolling_signals

def merge_signals(signal_1: list, signal_2: list) -> list:
    """Returns a single signal list which has merged two signal lists.

    Parameters
    ----------
    signal_1 : list
        The first signal list.

    signal_2 : list
        The second signal list.

    Returns
    -------
    merged_signal_list : list
        The merged result of the two inputted signal series.

    Examples
    --------
    >>> s1 = [1,0,0,0,1,0]
    >>> s2 = [0,0,-1,0,0,-1]
    >>> merge_signals(s1, s2)
        [1, 0, -1, 0, 1, -1]

    """
    merged_signal_list = signal_1.copy()
    for i in range(len(signal_1)):
        if signal_2[i] != 0:
            merged_signal_list[i] = signal_2[i]

    return merged_signal_list

In [27]:
swings_df = find_swings(df, 11)
df2 = classify_swings(swings_df)
df2.sort_values(by=["dt"], ascending=False).head(20)

  if signals[i] != signals[i - 1]:
  if signals[i] != signals[i - 1]:
  unrolled_signals[i] = signals[i]
  - swing_df.Strong_lows.replace(to_replace=0, method="ffill").shift()
  - swing_df.Strong_highs.replace(to_replace=0, method="ffill").shift()


Unnamed: 0_level_0,Highs,Lows,Last,Trend,CSLS,Support,Resistance,Strong_lows,Strong_highs,FSL,FSH,LL,HL,HH,LH
dt,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
2024-04-22 18:30:00+00:00,68.21,0,68.21,-1.0,1,False,True,0,68.21,0.0,68.21,False,False,True,False
2024-04-15 18:30:00+00:00,0.0,0,0.0,0.0,16,False,False,0,0.0,0.0,0.0,False,False,False,False
2024-04-08 18:30:00+00:00,0.0,0,0.0,0.0,15,False,False,0,0.0,0.0,0.0,False,False,False,False
2024-04-01 18:30:00+00:00,0.0,0,0.0,0.0,14,False,False,0,0.0,0.0,0.0,False,False,False,False
2024-03-25 17:30:00+00:00,0.0,0,0.0,0.0,13,False,False,0,0.0,0.0,0.0,False,False,False,False
2024-03-18 17:30:00+00:00,0.0,0,0.0,0.0,12,False,False,0,0.0,0.0,0.0,False,False,False,False
2024-03-11 17:30:00+00:00,0.0,0,0.0,0.0,11,False,False,0,0.0,0.0,0.0,False,False,False,False
2024-03-04 19:30:00+00:00,0.0,0,0.0,0.0,10,False,False,0,0.0,0.0,0.0,False,False,False,False
2024-02-26 19:30:00+00:00,0.0,0,0.0,0.0,9,False,False,0,0.0,0.0,0.0,False,False,False,False
2024-02-20 19:30:00+00:00,0.0,0,0.0,0.0,8,False,False,0,0.0,0.0,0.0,False,False,False,False
