In [2]:
import pandas as pd
from scipy.interpolate import UnivariateSpline

In [3]:
# calculates simple moving average based on a certain period
# incorporates lagging information, provides a degree of smoothness and generalizability

# can be used on price, volume or any numerical indicator data
def SimpleMA(df: pd.DataFrame, period: int, ma_columns: list)->pd.DataFrame:
  """
    Calculates the Simple Moving Average for a specified column and adds it to the DataFrame.

    Args:
        df (pd.DataFrame): The time-series DataFrame.
        column_name (str): The name of the column containing the data for the SMA.
        window_size (int): The number of periods to use in the SMA calculation.
    """


  assert isinstance(df, pd.DataFrame), "Input must be a Pandas DataFrame"

  for column in ma_columns:
    assert column in df.columns, f"{column} column is not found in the Table"
    assert not df[column].isna().any(), f'{column} column has missing values (NaN)'

    sma_column_name = column + f"_sma ({period})"
    sma_series = df[column].rolling(window=period).mean()

    # Find the index of the first valid (non-NaN) SMA value
    first_valid_index = sma_series.first_valid_index()

    # Create a spline interpolator for the initial NaN values
    x = df.index[:first_valid_index]
    y = df[column][:first_valid_index]
    spline = UnivariateSpline(x, y, k=3)  # Cubic spline interpolation (k=3)

    # Interpolate the initial NaN values using the spline
    df.loc[:first_valid_index, sma_column_name] = spline(x)

    # Assign the rest of the SMA values
    df.loc[first_valid_index:, sma_column_name] = sma_series[first_valid_index:]

  return None












In [4]:
# can be used on price, volume or any numerical indicator data to incoporate lagging information
# provides smoothness and generizability

def ExpMA(df: pd.DataFrame, period: int, ma_columns):

  """
    Calculates the Exponential Moving Average (EMA) for specified columns, interpolating initial NaNs with spline.

    Args:
        df (pd.DataFrame): The time-series DataFrame.
        period (int): The number of periods to use in the EMA calculation.
        ma_columns (list): List of column names for which EMA should be calculated.

    Returns:
        pd.DataFrame: The original DataFrame with added EMA columns, initial NaNs filled with spline interpolation.
    """



  assert isinstance(df, pd.DataFrame), "Input must be a Pandas DataFrame"

  for column in ma_columns:
    assert column in df.columns, f"{column} column is not found in the Table"
    assert not df[column].isna().any(), f'{column} column has missing values (NaN)'

    ema_column_name = column + f"_ema_{period}"
    ema_series = df[column].ewm(span=period, adjust=True).mean()

    # Find the index of the first valid (non-NaN) EMA value
    first_valid_index = ema_series.first_valid_index()

    # Create a spline interpolator for the initial NaN values
    x = df.index[:first_valid_index]
    y = df[column][:first_valid_index]
    spline = UnivariateSpline(x, y, k=3)  # Cubic spline interpolation (k=3)

    # Interpolate the initial NaN values using the spline
    df.loc[:first_valid_index, ema_column_name] = spline(x)

    # Assign the rest of the EMA values
    df.loc[first_valid_index:, ema_column_name] = ema_series[first_valid_index:]

    return None


In [5]:
#can be used for trendfollowing as well as mean reversion strategies
#measures by how much the intstanteneous difference between short ema and long ema differs from the ema of the difference

def MACD(df:pd.DataFrame, column_name, short_period: int=12, long_period: int=26, signal_period: int=9, inplace:bool=False)->pd.DataFrame:


  assert isinstance(df, pd.DataFrame), "Input must be a pandas DataFrame."
  assert column_name in df.columns, f"Column {column_name} provided for MACD calculation wasn't found in the DataFrame."
  assert not df[column_name].isna().any(), f"Column {column_name} has missin values (NaN). Remove or impute them."

  new_df = df.copy() if not inplace else df

  #calculate EMAs
  ema_short = new_df[column_name].ewm(span=short_period, adjust=False).mean()
  ema_long = new_df[column_name].ewm(span=long_period, adjust=False).mean()

  #Calculate MACD Line
  macd = ema_short-ema_long
  new_df['MACD'] = macd

  #Calculate Signal Line
  signal_line = macd.ewm(span=signal_period, adjust=False).mean()
  new_df['Signal Line'] = signal_line

  #Calculate MACD Histogram
  new_df['MACD Histogram'] = macd - signal_line

  return new_df if not inplace else None

In [6]:
#ADX (average directional index): Measures the strength of a trend, regardless of direction

# usefull for trend-following strategies, helps to identify strong trends and filter out bad ones

# the scale is on the range 0-100
# above 25 (ex.) is considered a strong enough trend for trend following strategies

# lagging indicator (reacts to price changes rather then predicting them)

# useful for price data (Is the price trend strong enough or not to deploy a trend following strategy?)

def ADX(df: pd.DataFrame, high: str='High', low:str='Low', close: str='Close', period: int = 14, inplace: bool = False) -> pd.DataFrame:
    """
    Calculates the Average Directional Index (ADX) for a given DataFrame.

    Args:
        df (pd.DataFrame): DataFrame containing price data.
        high, low, close (str): Column names for column values that are required for calculations.
        period (int): Time period for ADX calculation (default: 14).
        inplace (bool): If True, modify the DataFrame in-place; otherwise, return a copy.

    Returns:
        pd.DataFrame: The DataFrame with ADX, +DI, and -DI columns added (or the original if inplace=True).
    """

    # Input validation:
    assert isinstance(df, pd.DataFrame), "Input must be a Pandas DataFrame"

    assert high in df.columns, f"Column '{high}' not found in DataFrame."
    assert low in df.columns, f"Column '{low}' not found in DataFrame."
    assert close in df.columns, f"Column '{close}' not found in DataFrame."

    result = df.copy() if not inplace else df

    # Calculate True Range (TR)
    high = result[high].shift(1)  # Previous High
    low = result[low].shift(1)   # Previous Low
    close = result[close].shift(1) # Previous Close

    #Calculate Directional Movement
    result["+DM"] = result[high] - high
    result["-DM"] = low - result[low]
    result["+DM"] = result["+DM"].apply(lambda x: x if x > 0 else 0)
    result["-DM"] = result["-DM"].apply(lambda x: x if x > 0 else 0)

    # Find the true range of each period
    result['TR1'] = result[low] - low
    result['TR2'] = abs(result[close] - close)
    result['TR3'] = high - low
    result['TR'] = result[['TR1', 'TR2', 'TR3']].max(axis=1)

    # Calculate Smoothed Directional Movement (+DI and -DI)
    result["+DI"] = 100 * result["+DM"].ewm(span=period, adjust=False).mean() / result["TR"].ewm(span=period, adjust=False).mean()
    result["-DI"] = 100 * result["-DM"].ewm(span=period, adjust=False).mean() / result["TR"].ewm(span=period, adjust=False).mean()

    # Calculate Directional Index (DX)
    result["DX"] = 100 * (result["+DI"] - result["-DI"]).abs() / (result["+DI"] + result["-DI"])

    # Calculate ADX
    result["ADX"] = result["DX"].ewm(span=period, adjust=False).mean()

    # Remove temporary columns
    result = result.drop(columns=['+DM', '-DM', 'TR1', 'TR2', 'TR3', 'TR', 'DX'])

    return result if not inplace else None


In [7]:
def RSI(df: pd.DataFrame, column_name = 'Close', period = 14, inplace=False)-> pd.DataFrame:

  """
    Calculates the Relative Strength Index (RSI) and adds it as a new column to the DataFrame.

    Args:
        df (pd.DataFrame): The DataFrame containing the price data.
        column_name (str, optional): The name of the column to calculate RSI on. Defaults to 'Close'.
        period (int, optional): The lookback period for RSI calculation. Defaults to 14.
        inplace (bool, optional): If True, modify the DataFrame in place. Defaults to False.

    Returns:
        pd.DataFrame or None: If inplace is False, returns a copy of the DataFrame with the RSI column added.
                              Otherwise, returns None (modifies the original DataFrame in place).
    """

  # Input validation:
  assert isinstance(df, pd.DataFrame), "Input must be a Pandas DataFrame"
  assert column_name in df.columns, f"Column '{column_name}' not found in DataFrame."


  result = df.copy() if not inplace else df

  delta = result[column_name].diif()
  gain = delta.where(delta>0,0)
  loss = delta.where(delta<0,0)

  avg_gain = gain.rolling(window=period).mean()
  avg_loss=  loss.rolling(window=period).mean()

  rs = avg_gain / avg_loss
  rsi = 100 - (100/(1+rs))

  result['RSI'] = rsi

  return result if not inplace else None

In [8]:
# mostly used on price data to understand the price's recent momentum

# helps predict trend reversals, and entry and exit points

# if momentum slows and price was going down, potential for an upswing
# if momentum slow and price was going up, potential for a downswing

# on a scale of 0-100
#above 80 considered overbought
#below 20 considered oversold

#Crossovers:
#if %K (fast stoch) crosses above the %D (slow stoch), bullish entry signal
#if %K crosses below the %D, bearish entry signal

# Divergences:
# if the price makes lower lows while %K makes higher lows, indicates bullish reversal
# if the price makes higher highs while %K makes lower highs, it could indicate a bearish reversal

#Crossovers are more reliable in overbought/oversold regions

def stochastic_oscillator(df: pd.DataFrame, column_name: str = 'Close', period: int = 14, smoothing_period: int = 3, inplace: bool = False) -> pd.DataFrame:
    """Calculates the Stochastic Oscillator (%K and %D) and adds them as new columns to the DataFrame.

    Args:
        df (pd.DataFrame): The DataFrame containing the price data.
        column_name (str, optional): The name of the column to calculate the Stochastic Oscillator on. Defaults to 'Close'.
        period (int, optional): The lookback period for calculating the highest high and lowest low. Defaults to 14.
        smoothing_period (int, optional): The period for smoothing the %K line to get %D. Defaults to 3.
        inplace (bool, optional): If True, modify the DataFrame in place. Defaults to False.

    Returns:
        pd.DataFrame or None: If inplace is False, returns a copy of the DataFrame with the %K and %D columns added.
                              Otherwise, returns None (modifies the original DataFrame in place).
    """

    # Input validation:
    assert isinstance(df, pd.DataFrame), "Input must be a Pandas DataFrame"
    assert column_name in df.columns, f"Column '{column_name}' not found in DataFrame."
    assert period > 0 and isinstance(period, int), "Period must be a positive integer."
    assert smoothing_period > 0 and isinstance(smoothing_period, int), "Smoothing period must be a positive integer."
    assert pd.api.types.is_numeric_dtype(df[column_name]), f"Column '{column_name}' must contain numeric values."

    result = df.copy() if not inplace else df

    # Calculate highest high and lowest low over the lookback period
    result['HH'] = result[column_name].rolling(window=period).max()
    result['LL'] = result[column_name].rolling(window=period).min()

    # Calculate %K
    result['%K'] = 100 * (result[column_name] - result['LL']) / (result['HH'] - result['LL'])

    # Calculate %D (smoothed %K)
    result['%D'] = result['%K'].rolling(window=smoothing_period).mean()

    #Calculate Crossovers between %K and %D
    result['Crossover (%K-%D)'] = result['%K']-result['%D']

    #Remove the NaN values that resulted from lagged calculations
    result.dropna(subset=['%K', '%D', 'Crossover (%K-%D)'])

    # Remove temporary columns
    result = result.drop(columns=['HH', 'LL'])

    return result if not inplace else None

In [9]:
def ATR(df: pd.DataFrame, high_col: str = "High", low_col: str = "Low", close_col: str = "Close", period: int = 14, inplace: bool = False) -> pd.DataFrame:
    """Calculates the Average True Range (ATR) and adds it as a new column to the DataFrame.

    Args:
        df (pd.DataFrame): The DataFrame containing OHLC (Open, High, Low, Close) data.
        high_col (str): The name of the column containing the high prices. Defaults to 'High'.
        low_col (str): The name of the column containing the low prices. Defaults to 'Low'.
        close_col (str): The name of the column containing the close prices. Defaults to 'Close'.
        period (int, optional): The lookback period for ATR calculation. Defaults to 14.
        inplace (bool, optional): If True, modify the DataFrame in place. Defaults to False.

    Returns:
        pd.DataFrame or None: If inplace is False, returns a copy of the DataFrame with the ATR column added.
                              Otherwise, returns None (modifies the original DataFrame in place).
    """

    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a Pandas DataFrame.")
    required_columns = [high_col, low_col, close_col]
    for col in required_columns:
        if col not in df.columns:
            raise KeyError(f"Column '{col}' not found in DataFrame.")

    result = df.copy() if not inplace else df

    # Calculate True Range (TR)
    result['H-L'] = result[high_col] - result[low_col]
    result['H-PC'] = abs(result[high_col] - result[close_col].shift(1)) #PC stands for previous close
    result['L-PC'] = abs(result[low_col] - result[close_col].shift(1))
    result['TR'] = result[['H-L', 'H-PC', 'L-PC']].max(axis=1)

    # Calculate ATR using a Simple Moving Average (SMA)
    result['ATR'] = result['TR'].rolling(window=period).mean()

    #backward fill the NaN values for the ATR column
    result['ATR'] = result['ATR'].bfill()

    # Remove temporary columns
    result = result.drop(columns=['H-L', 'H-PC', 'L-PC', 'TR'])

    return result if not inplace else None



In [10]:
def Boilinger_Bands(df:pd.DataFrame, column_name:str, window:int, num_of_std: float=1.96, inplace: bool=False):

  """
    Calculates Bollinger Bands (using SMA based on some lookback window) based on the given number of standard deviations from the mean and adds them to the DataFrame.

    Args:
        df (pd.DataFrame): DataFrame containing price data.
        column_name (str): Name of the column to calculate Bollinger Bands on.
        window (int): Lookback period for the SMA and standard deviation calculation.
        num_of_std (float, optional): Number of standard deviations for the bands (default: 1.96).
        inplace (bool, optional): If True, modify the DataFrame in place. Defaults to False.

    Returns:
        pd.DataFrame or None: The DataFrame with 'Upper Bollinger Band', 'Lower Bollinger Band'
                              columns added (or the original if inplace=True).

    Raises:
        TypeError: If df is not a Pandas DataFrame.
        KeyError: If column_name is not in the DataFrame.
        ValueError: If window is not a positive integer or num_of_std is not positive.
    """

  #input validation
  if not isinstance(df, pd.DataFrame):
    raise TypeError('Inpute must be a Pandas DataFrame')

  if column_name not in df.columns:
    raise KeyError(f"Column {column_name} not found in the DataFrame.")

  if df[column_name].isnull().any():
    raise ValueError(f'Column {column_name} has missing values.')

  if not isinstance(window, int) or window<=0:
    raise ValueError('Window must be a positive integer.')

  if num_of_std <= 0:
    raise ValueError("Number of standard deviations (num_of_std) must be positive.")



  result = df.copy() if not inplace else df

  #Calculate the simple moving average
  result[f'boilinger_SMA ({window})'] = result[column_name].rolling(window=window).mean()

  #Calculate the standard deviation
  result[f'boilinger STD ({num_of_std})'] = result[column_name].rolling(window=window).std()

  # Calculate upper and lower bands around the simple moving average
  result[f'Upper Boilinger Band (+{num_of_std})'] = result['SMA'] + (num_of_std * result['STD'])
  result[f'Lower Boilinger Band (-{num_of_std})'] = result['SMA'] - (num_of_std * result['STD'])


  result[[f"Upper Boilinger Band (+{num_of_std})", f"Lower Boilinger Band (-{num_of_std})", f"boiliner_SMA ({window})", f"boilinger_STD ({num_of_std})"]] = result[[f"Upper Boilinger Band (+{num_of_std})", f"Lower Boilinger Band (-{num_of_std})", f"boiliner_SMA ({window})", f"boilinger_STD ({num_of_std})"]].fillna(method='bfill', inplace=True)

  return result if not inplace else None




In [11]:
# On-Balance Volume
# used to gauge buying or selling pressure based on volume change over time
# start at 0, positive volume change (if price increase) get added, negative volume change (if price decrease) gets subtracted

def OBV(df: pd.DataFrame, column: str = 'Volume', close_column: str = 'Close', inplace: bool = False) -> pd.DataFrame:
    """
    Calculates the On-Balance Volume (OBV) indicator and adds it to the DataFrame.

    Args:
        df (pd.DataFrame): DataFrame containing price and volume data.
        column (str, optional): Name of the volume column. Defaults to 'Volume'.
        close_column (str, optional): Name of the closing price column. Defaults to 'Close'.
        inplace (bool, optional): If True, modify the DataFrame in place. Defaults to False.

    Returns:
        pd.DataFrame or None: The DataFrame with the 'OBV' column added (or the original if inplace=True).

    Raises:
        TypeError: If df is not a Pandas DataFrame.
        KeyError: If either column or close_column is not in the DataFrame.
    """

    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a Pandas DataFrame.")

    if column not in df.columns:
        raise KeyError(f"Column '{column}' not found in DataFrame.")

    if close_column not in df.columns:
        raise KeyError(f"Column '{close_column}' not found in DataFrame.")

    result = df.copy() if not inplace else df

    # Determine daily price change
    price_change = result[close_column].diff()

    # Calculate OBV
    result['OBV'] = 0  # Initialize OBV column with zeros
    result.loc[price_change > 0, 'OBV'] = result.loc[price_change > 0, column]
    result.loc[price_change < 0, 'OBV'] = -result.loc[price_change < 0, column]
    result['OBV'] = result['OBV'].cumsum()

    return result if not inplace else None


In [12]:
# Chaikin Money Flow

#individual volumes of some lookback period get scaled by their particular Money Flow Multipliers, added up (weighted sum) and devided by the total volume over the lookback period
#measures buying and selling pressures over a specific lookback period for a security
#gauges the amount of money flowing into or out of an asset
#provides insights into the strength of the price move

#flucutates around the 0 line.
# positive CMF shows more buying pressure than selling pressure
# negative CMF shows more selling pressure than buying presssure
# 0-line crossover--> above (buying signal); below (selling signal)
#Divergence or price and CMF indicates potential reversals (weakening buying or selling pressure, and strengthening of the opposite)

def CMF(df:pd.DataFrame, high: str = 'High', low: str='Low', close: str='Close', volume:str='Volume', window: int=21, inplace: bool = False):

  #input validation
  assert isinstance(df, pd.DataFrame), 'Input must be a pandas DataFrame.'
  assert high in df.columns, f'{high} column not found in DataFrame'
  assert low in df.columns, f'{low} column not found in DataFrame'
  assert close in df.columns, f'{close} column not found in DataFrame'
  assert volume in df.columns, f'{volume} column not found in DataFrame'
  assert window> 0 and isinstance(window, int), "Period must be a positive integer."
  assert not df.high.isna().any(), f'{high} column has missing values.'
  assert not df.low.isna().any(), f'{low} column has missing values.'
  assert not df.close.isna().any(), f'{close} column has missing values.'



  result = df.copy() if not inplace else df

  #Calculate the Money Flow Multiplier
  result['MFM'] = ((result.close - result.low)-(result.high-result.close))/(result.high - result.low)
  result['MFM'] = result['MFM'].fillna(0)

  #Calculate MFV (Money FLow Volume)
  result ['MFV'] = result['MFM'] * result.volume

  #Calculate the Chaikin Money Flow indicator
  result['CMF'] = result['MFV'].rolling(window = window).sum()/result[volume].rolling(window=window).sum()

  #fill in the missing values
  result['CMF'].fillna(method='bfill', inplace=True)

  # Remove temporary columns
  result = result.drop(columns=['MFM', 'MFV'])

  return result if not inplace else df

In [13]:
# Williams %R Indicator (Inverse of Stoch Oscilator)

# momentum indicator that measures overbought and oversold levels.
# scale: -100 - 0

# Williams %R = -100 * (HH - Close) / (HH - LL)

# -20 to 0 region: price closed close to high, overbought region, reversal down
# -100 to -80 region: price closed close to low, oversold region, reversal up

def Williams_R(df: pd.DataFrame, column_name: str = "Close", period: int = 14, inplace: bool = False) -> pd.DataFrame:
    """
    Calculates the Williams %R indicator and adds it as a new column to the DataFrame.
    Args:
        df (pd.DataFrame): The DataFrame containing price data.
        column_name (str, optional): The name of the column with closing prices (default: "Close").
        period (int, optional): Time period for Williams %R calculation (default: 14).
        inplace (bool, optional): If True, modify the DataFrame in-place; otherwise, return a copy.

    Returns:
        pd.DataFrame or None: The DataFrame with the Williams %R column added (or the original if inplace=True).

    Raises:
        TypeError: If df is not a Pandas DataFrame.
        KeyError: If column_name is not in the DataFrame.
        ValueError: If period is not a positive integer.
    """

    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a Pandas DataFrame.")

    if column_name not in df.columns:
        raise KeyError(f"Column '{column_name}' not found in DataFrame.")

    if not isinstance(period, int) or period <= 0:
        raise ValueError("Period must be a positive integer.")

    result = df.copy() if not inplace else df

    # Calculate Highest High and Lowest Low over the lookback period
    result['HH'] = result[column_name].rolling(window=period).max()
    result['LL'] = result[column_name].rolling(window=period).min()

    # Calculate Williams %R
    result["Williams %R"] = -100 * (result['HH'] - result[column_name]) / (result['HH'] - result['LL'])

    # Remove temporary columns
    result = result.drop(columns=['HH', 'LL'])

    return result if not inplace else None



In [14]:
def ROC(df: pd.DataFrame, column_name: str = "Close", period: int = 12, inplace: bool = False) -> pd.DataFrame:
    """
    Calculates the Rate of Change (ROC) indicator and adds it as a new column to the DataFrame.

    Args:
        df (pd.DataFrame): DataFrame containing price data.
        column_name (str, optional): Name of the column to calculate ROC on. Defaults to 'Close'.
        period (int, optional): The lookback period for ROC calculation. Defaults to 12.
        inplace (bool, optional): If True, modify the DataFrame in place. Defaults to False.

    Returns:
        pd.DataFrame or None: If inplace is False, returns a copy of the DataFrame with the ROC column added.
                              Otherwise, returns None (modifies the original DataFrame in place).

    Raises:
        TypeError: If df is not a Pandas DataFrame.
        KeyError: If column_name is not in the DataFrame.
        ValueError: If period is not a positive integer.
    """
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a Pandas DataFrame.")

    if column_name not in df.columns:
        raise KeyError(f"Column '{column_name}' not found in DataFrame.")

    if not isinstance(period, int) or period <= 0:
        raise ValueError("Period must be a positive integer.")

    result = df.copy() if not inplace else df

    # Calculate ROC
    result["ROC"] = (result[column_name] - result[column_name].shift(period)) / result[column_name].shift(period) * 100

    # Backward fill NaNs
    result["ROC"] = result["ROC"].fillna(method='bfill')
    return result if not inplace else None

In [15]:
# Commodity Channle Index (momentum oscillator)

#above 100-> overbought
#below -100-> oversold
# between 100 and -100-> normal range

#70-80% of the CCI values will be inside the 100 to -100 range
#the further the CCI moves from 0 the stronger the momentum of the price

#crossover above 0-> bullish signal
#crossover below 0-> bearish signal

# divergence: price makes lower lows, but CCI makes higher lows -> bullish reversal
# divergence: price makes higher highs, but CCI makes lower highs -> bearish reversal

def CCI(df: pd.DataFrame, column_name: str = "Close", period: int = 20, constant: float = 0.015, inplace: bool = False) -> pd.DataFrame:
    """
    Calculates the Commodity Channel Index (CCI) and adds it as a new column to the DataFrame.

    Args:
        df (pd.DataFrame): DataFrame containing price data.
        column_name (str, optional): Name of the column to calculate CCI on (default: "Close").
        period (int, optional): The lookback period for CCI calculation (default: 20).
        constant (float, optional): The constant used in the CCI formula (default: 0.015).
        inplace (bool, optional): If True, modify the DataFrame in place. Defaults to False.

    Returns:
        pd.DataFrame or None: If inplace is False, returns a copy of the DataFrame with the CCI column added.
                              Otherwise, returns None (modifies the original DataFrame in place).
    """

    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a Pandas DataFrame.")
    if column_name not in df.columns:
        raise KeyError(f"Column '{column_name}' not found in DataFrame.")
    if not isinstance(period, int) or period <= 0:
        raise ValueError("Period must be a positive integer.")

    result = df.copy() if not inplace else df

    # Calculate High and Low using rolling window
    result["High"] = result[column_name].rolling(window=period).max()
    result["Low"] = result[column_name].rolling(window=period).min()

    # Calculate Typical Price (TP)
    result["TP"] = (result[column_name] + result["High"] + result["Low"]) / 3

    # Calculate Simple Moving Average of TP
    result["SMA"] = result["TP"].rolling(window=period).mean()

    # Calculate Mean Deviation
    result["MAD"] = result["TP"].rolling(window=period).apply(lambda x: pd.Series(x).mad())

    # Calculate CCI
    result["CCI"] = (result["TP"] - result["SMA"]) / (constant * result["MAD"])

    # Remove temporary columns
    result = result.drop(columns=["TP", "SMA", "MAD", "High", "Low"])  # Drop calculated High and Low columns

    return result if not inplace else None

In [16]:
# above 80-> overbought (maybe bearish reversal)
# below 20-> oversold (potentially bullish reversal)

# if price makes a higher high but MFI makes a lower high -> bearish divergence
# if price makes a lower lower but MFI gives a higher low -> bullish divergence

def MFI(df: pd.DataFrame, high_col: str = "High", low_col: str = "Low", close_col: str = "Close", volume_col: str = "Volume", period: int = 14, inplace: bool = False) -> pd.DataFrame:
    """
    Calculates the Money Flow Index (MFI) and adds it as a new column to the DataFrame.

    Args:
        df (pd.DataFrame): DataFrame containing OHLCV (Open, High, Low, Close, Volume) data.
        high_col (str): Name of the column containing the high prices (default: "High").
        low_col (str): Name of the column containing the low prices (default: "Low").
        close_col (str): Name of the column containing the close prices (default: "Close").
        volume_col (str): Name of the column containing the volume data (default: "Volume").
        period (int, optional): The lookback period for MFI calculation (default: 14).
        inplace (bool, optional): If True, modify the DataFrame in place. Defaults to False.

    Returns:
        pd.DataFrame or None: If inplace is False, returns a copy of the DataFrame with the MFI column added.
                              Otherwise, returns None (modifies the original DataFrame in place).
    """

    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a Pandas DataFrame.")

    required_columns = [high_col, low_col, close_col, volume_col]
    for col in required_columns:
        if col not in df.columns:
          raise KeyError(f"Column '{col}' not found in DataFrame.")

        if df.col.isnull().any():
          raise ValueError(f'Column {col} has missing values (NaN).')


    if not isinstance(period, int) or period <= 0:
        raise ValueError("Period must be a positive integer.")


    result = df.copy() if not inplace else df

    # Calculate Typical Price (TP)
    result['TP'] = (result[high_col] + result[low_col] + result[close_col]) / 3

    # Calculate Raw Money Flow (RMF)
    result['RMF'] = result['TP'] * result[volume_col]

    # Calculate Positive and Negative Money Flow
    delta = result['TP'].diff(1)
    result['+MF'] = result['RMF'].where(delta > 0, 0)
    result['-MF'] = result['RMF'].where(delta < 0, 0)

    # Calculate Money Flow Ratio
    sum_positive_mf = result['+MF'].rolling(window=period).sum()
    sum_negative_mf = result['-MF'].rolling(window=period).sum()
    money_ratio = sum_positive_mf / sum_negative_mf

    # Calculate MFI
    result['MFI'] = 100 - (100 / (1 + money_ratio))

    # Backward fill NaNs
    result["MFI"].fillna(method='bfill', inplace=True)

    # Remove temporary columns
    result = result.drop(columns=['TP', 'RMF', '+MF', '-MF'])

    return result if not inplace else None

In [17]:
# Accumulation/Distribution Line (lagging indicator)

#rising line -> increasing buying pressure (accumulation of the asset)
#falling line -> increasing selling pressure (distribution of the asset)

#rising line confirms an uprend
#falling line confirms a downtrend

#Divergence: (1) price makes lower lows but A/D line makes higher lows -> potential for upside reversal
#             (2) price makes higher highs but A/D line makes lower highs -> potential for downside reversal

#evaluates strength of breakouts: if after breakout above resistance the A/D line is rising ->  sustainable breakout (not a false breakout)

def AD_Line(df: pd.DataFrame, high_col: str = "High", low_col: str = "Low", close_col: str = "Close", volume_col: str = "Volume", inplace: bool = False) -> pd.DataFrame:
    """Calculates the Accumulation/Distribution Line (A/D Line) and adds it as a new column to the DataFrame.

    Args:
        df (pd.DataFrame): DataFrame containing OHLCV (Open, High, Low, Close, Volume) data.
        high_col (str): Name of the column containing the high prices (default: "High").
        low_col (str): Name of the column containing the low prices (default: "Low").
        close_col (str): Name of the column containing the close prices (default: "Close").
        volume_col (str): Name of the column containing the volume data (default: "Volume").
        inplace (bool, optional): If True, modify the DataFrame in place. Defaults to False.

    Returns:
        pd.DataFrame or None: If inplace is False, returns a copy of the DataFrame with the A/D Line column added.
                              Otherwise, returns None (modifies the original DataFrame in place).
    """

    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a Pandas DataFrame.")

    required_columns = [high_col, low_col, close_col, volume_col]
    for col in required_columns:
        if col not in df.columns:
            raise KeyError(f"Column '{col}' not found in DataFrame.")

        if df[col].isna().any():
          raise ValueError(f'Column {col} has missing values (NaN).')

    result = df.copy() if not inplace else df

    # Calculate the Close Location Value (CLV)
    result['CLV'] = ((result[close_col] - result[low_col]) - (result[high_col] - result[close_col])) / (result[high_col] - result[low_col])

    # Handle potential division by zero
    result['CLV'] = result['CLV'].fillna(0)  # Or replace with another value if preferred

    # Calculate Money Flow Volume (MFV)
    result['MFV'] = result['CLV'] * result[volume_col]

    # Calculate Accumulation/Distribution Line (A/D Line)
    result['A/D Line'] = result['MFV'].cumsum()

    # Remove temporary column
    result = result.drop(columns=['CLV', 'MFV'])

    return result if not inplace else None

In [18]:
# Donchian Channels
# similar to Boilinger Bands (uses highest high and lowest low over a specified period)

# helps in trend identification
# price consistently closing above the upper channel, strong uptrend
# price consistently closing below the lower channgel, strog downtrend

def Donchian_Channels(df:pd.DataFrame, close: str='Close', window: int=14, inplace: bool=False):

  """
    Calculates Donchian Channels and adds them to the DataFrame.

    Args:
        df (pd.DataFrame): DataFrame containing price data.
        close (str): column that contains the closing prices for a certain time period on which the Donchian Channels will be applied.
        window (int, optional): Lookback period (default: 14).
        inplace (bool, optional): If True, modify the DataFrame in place. Defaults to False.

    Returns:
        pd.DataFrame or None: The DataFrame with 'lower Donch' and 'upper Donch' columns added
                              for each specified column (or the original if inplace=True).

    Raises:
        TypeError: If df is not a Pandas DataFrame or window is not an integer.
        ValueError: If window is not a positive integer.
        AssertionError: If any specified column is not found in the DataFrame.

    """


  assert isinstance(df, pd.DataFrame), 'Input must be a pandas DataFrame.'
  assert isinstance(window, int) and window>0, 'Lookback window must be a positive integer.'

  result = df.copy() if not inplace else df


  assert close in df.columns, f'Column {close} not found in DataFrame.'
  result[f'{close} lower Donch'] = result[close].rolling(window=window).min()
  result[f'{close} upper Donch'] = result[close].rolling(window=window).max()

  # Backward fill NaNs in the Donchian Channel columns directly within result
  result[[f'{close} lower Donch', f'{close} upper Donch']] = result[[f'{close} lower Donch', f'{close} upper Donch']].fillna(method='bfill')

  return result if not inplace else None




In [19]:

def Keltner_Channels(df: pd.DataFrame, close: str = "Close", high: str='High', low: str='Low', period: int = 20, atr_period: int = 10, multiplier: float = 2.0, inplace: bool = False) -> pd.DataFrame:
    """
    Calculates Keltner Channels (using EMA of typical price and ATR) and adds them to the DataFrame.

    Args:
        df (pd.DataFrame): DataFrame containing price data.
        column_name (str, optional): Name of the column to calculate Keltner Channels on (default: "Close").
        period (int, optional): Lookback period for the EMA of typical price (default: 20).
        atr_period (int, optional): Lookback period for the ATR calculation (default: 10).
        multiplier (float, optional): Multiplier for ATR (default: 2.0).
        inplace (bool, optional): If True, modify the DataFrame in place. Defaults to False.

    Returns:
        pd.DataFrame or None: The DataFrame with 'Upper Keltner Channel' and 'Lower Keltner Channel'
                              columns added (or the original if inplace=True).

    Raises:
        TypeError: If df is not a Pandas DataFrame.
        KeyError: If column_name is not in the DataFrame.
        ValueError: If period or atr_period are not positive integers.
    """

    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a Pandas DataFrame.")

    for col in [high, low, close]:
      if col not in df.columns:
          raise KeyError(f"Column '{col}' not found in DataFrame.")

    if not isinstance(period, int) or period <= 0 or not isinstance(atr_period, int) or atr_period <= 0:
        raise ValueError("Period and ATR period must be positive integers.")

    result = df.copy() if not inplace else df

    # Calculate Typical Price (TP)
    result['TP'] = (result[close] + result[high] + result[low]) / 3

    # Calculate EMA of Typical Price
    result['EMA'] = result['TP'].ewm(span=period, adjust=False).mean()

    # Calculate Average True Range (ATR)
    ATR(result, inplace=True, period=atr_period, close_col = close, high_col=high, low_col=low)
    result.rename(columns={'ATR': 'ATR_temp'}, inplace=True)

    # Calculate Upper and Lower Keltner Channels
    result['Upper Keltner Channel'] = result['EMA'] + (multiplier * result['ATR_temp'])
    result['Lower Keltner Channel'] = result['EMA'] - (multiplier * result['ATR_temp'])

    # Remove temporary columns
    result = result.drop(columns=['TP', 'EMA', 'ATR_temp'])

    # Backfill NaNs
    result[['Upper Keltner Channel', 'Lower Keltner Channel']] = result[['Upper Keltner Channel', 'Lower Keltner Channel']].fillna(method='bfill')

    return result if not inplace else None

In [20]:
# VI+ measures the strength of upward price movement.
# VI- measures the strength of downward price movement.

# if VI+ > VI- -> uptrend
# if VI+ < VI- -> downtrend

# the greater the difference between VI+ and VI- the stronger the trend

# when VI+ crosses over VI- indicates a potential trend reversal, from down to up (bullish entry signal)
# when VI- crosses over VI+ indicates a potential trend reversal, from up to down (bearish entry signal)

def Vortex_Indicator(df: pd.DataFrame, close: str='Close', high: str='High', low: str='Low', period: int = 14, inplace: bool = False) -> pd.DataFrame:
    """
    Calculates the Vortex Indicator (VI+, VI-) and adds them to the DataFrame.

    Args:
        df (pd.DataFrame): DataFrame containing High, Low, and Close columns.
        period (int): Lookback period for the calculation (default: 14).
        inplace (bool, optional): If True, modify the DataFrame in place. Defaults to False.

    Returns:
        pd.DataFrame or None: The DataFrame with 'VI+' and 'VI-' columns added (or the original if inplace=True).

    Raises:
        TypeError: If df is not a Pandas DataFrame.
        KeyError: If 'High', 'Low', or 'Close' columns are missing.
        ValueError: If period is not a positive integer or if one of the columns (or more) has missing values.

    """

    # Input validation
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input 'df' must be a Pandas DataFrame.")

    required_columns = [high, low, close]
    for col in required_columns:
        if col not in df.columns:
            raise KeyError(f"Column {col} not found in DataFrame.")
        if df[col].isnull().any():
          raise ValueError(f'Column {col} has missing values (NaN).')

    if not isinstance(period, int) or period <= 0:
        raise ValueError("Period must be a positive integer.")

    result = df.copy() if not inplace else df

    # True Range (TR)
    result['TR1'] = result[high] - result[low]
    result['TR2'] = abs(result[high] - result[close].shift(1))
    result['TR3'] = abs(result[close].shift(1) - result[low])
    result['TR'] = result[['TR1', 'TR2', 'TR3']].max(axis=1)

    # Vortex Movement (VM)
    result['VM+'] = abs(result[high] - result[low].shift(1))
    result['VM-'] = abs(result[low] - result[high].shift(1))

    # Sum VM and TR
    result['SUM_VM+'] = result['VM+'].rolling(window=period).sum()
    result['SUM_VM-'] = result['VM-'].rolling(window=period).sum()
    result['SUM_TR'] = result['TR'].rolling(window=period).sum()

    # Calculate VI+ and VI-
    result['VI+'] = result['SUM_VM+'] / result['SUM_TR']
    result['VI-'] = result['SUM_VM-'] / result['SUM_TR']

    # Remove temporary columns
    result = result.drop(columns=['TR1', 'TR2', 'TR3', 'TR', 'VM+', 'VM-', 'SUM_VM+', 'SUM_VM-', 'SUM_TR'])
    return result if not inplace else None