<a href="https://colab.research.google.com/github/canamac/zeka-can/blob/main/daralma.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import pandas as pd
import numpy as np

def calculate_devisso_trend_signals(df, zigzag_high_period=10, zigzag_low_period=10, min_movement_percent=0.16,
                                    ma_period=20, std_dev_multiplier=2.0, daralma_esik=0.5, ma_length=10):
    """
    Calculate Devis'So Trend signals based on the provided DataFrame with OHLC data.

    Parameters:
    df (pd.DataFrame): DataFrame containing columns ['open', 'high', 'low', 'close', 'volume']
    zigzag_high_period (int): Period for ZigZag high calculation
    zigzag_low_period (int): Period for ZigZag low calculation
    min_movement_percent (float): Minimum percentage movement for signal validation
    ma_period (int): Period for moving averages
    std_dev_multiplier (float): Standard deviation multiplier for bands
    daralma_esik (float): Threshold for contraction condition
    ma_length (int): Length for signal moving averages

    Returns:
    pd.DataFrame: Original DataFrame with added signal columns
    """

    # Copy dataframe to avoid modifying original
    df = df.copy()

    # Helper functions for crossover/crossunder
    def crossover(s1, s2):
        return (s1.shift(1) < s2.shift(1)) & (s1 > s2)

    def crossunder(s1, s2):
        return (s1.shift(1) > s2.shift(1)) & (s1 < s2)

    # Calculate ZigZag high/low points
    df['zigzag_high'] = df['high'] == df['high'].rolling(zigzag_high_period).max()
    df['zigzag_low'] = df['low'] == df['low'].rolling(zigzag_low_period).min()

    # Calculate minimum movement
    df['min_movement'] = df['close'] * min_movement_percent / 100

    # Calculate previous ZigZag values
    for side in ['high', 'low']:
        df[f'zigzag_{side}_value'] = df[side].where(df[f'zigzag_{side}'])
        df[f'prev_zigzag_{side}_value'] = df[f'zigzag_{side}_value'].ffill().shift(1)
        df[f'prev_zigzag_{side}_close'] = df['close'].where(df[f'zigzag_{side}']).ffill().shift(1)

    # Calculate LongMumu and ShortMumu signals
    df['LongMumu'] = (df['zigzag_high'] &
                     (df['high'] > df['prev_zigzag_high_value']) &
                     (df['close'] - df['prev_zigzag_high_close'] >= df['min_movement']))

    df['ShortMumu'] = (df['zigzag_low'] &
                      (df['low'] < df['prev_zigzag_low_value']) &
                      (df['prev_zigzag_low_close'] - df['close'] >= df['min_movement']))

    # Calculate moving averages and bands
    df['ma'] = df['close'].rolling(ma_period).mean()
    df['std_dev'] = df['close'].rolling(ma_period).std()
    df['upper_band'] = df['ma'] + std_dev_multiplier * df['std_dev']
    df['lower_band'] = df['ma'] - std_dev_multiplier * df['std_dev']

    # Calculate long/short signals with band conditions
    df['long_signal'] = df['LongMumu'] & (df['close'] > df['ma']) & (df['close'] > df['upper_band'])
    df['short_signal'] = df['ShortMumu'] & (df['close'] < df['ma']) & (df['close'] < df['lower_band'])

    # Calculate pullback levels
    df['long_pullback_level'] = np.where(df['LongMumu'], df['close'], np.nan).ffill()
    df['short_pullback_level'] = np.where(df['ShortMumu'], df['close'], np.nan).ffill()

    # Calculate consecutive up/down counts
    for side in ['up', 'down']:
        consecutive = []
        count = 0
        for val in df[f'LongMumu' if side == 'up' else 'ShortMumu']:
            count = count + 1 if val else 0
            consecutive.append(count)
        df[f'consecutive_{side}'] = consecutive

    # Calculate moving averages of consecutive counts
    df['ma_consecutive_up'] = df['consecutive_up'].rolling(ma_period).mean()
    df['ma_consecutive_down'] = df['consecutive_down'].rolling(ma_period).mean()

    # Calculate crossover signals
    df['crossover_up'] = crossover(df['ma_consecutive_up'], df['ma_consecutive_down'])
    df['crossover_down'] = crossunder(df['ma_consecutive_up'], df['ma_consecutive_down'])

    # Calculate potential candles
    df['shortPotentialCandles'] = ((df['close'] > df['ma']) &
                                  crossunder(df['close'], df['long_pullback_level']) &
                                  ~crossunder(df['close'], df['upper_band']))

    df['longPotentialCandles2'] = (crossover(df['close'], df['short_pullback_level']) &
                                  crossover(df['close'], df['shortMumu_moving_avg']) &
                                  (df['close'] < df['ma']))

    # Calculate signal moving averages
    df['allSignals'] = np.where(df['LongMumu'], df['low'],
                               np.where(df['ShortMumu'], df['high'], np.nan))
    df['ma_allSignals'] = df['allSignals'].rolling(ma_length).mean()

    df['allSignals2'] = np.where(df['shortPotentialCandles'], df['low'],
                                np.where(df['longPotentialCandles2'], df['high'], np.nan))
    df['ma_allSignals2'] = df['allSignals2'].rolling(ma_length).mean()

    # Calculate combined average
    df['ma_combined'] = (df['ma_allSignals'] + df['ma_allSignals2']) / 2

    # Calculate daralma condition
    df['daralma_kosulu'] = (df['ma_allSignals'] - df['ma_allSignals2']).abs() < daralma_esik

    # Generate final signals
    df['shortCrossover'] = crossunder(df['close'], df['ma_allSignals2'])
    df['longCrossover'] = crossover(df['close'], df['ma_allSignals2'])
    df['cross'] = crossover(df['close'], df['ma_allSignals2']) & crossover(df['close'], df['ma_allSignals'])

    # Create buy/sell signals
    df['BUY'] = df['longCrossover'] & df['daralma_kosulu']
    df['SELL'] = df['shortCrossover'] & df['daralma_kosulu']
    df['TRADE'] = df['cross'] & df['daralma_kosulu']

    return df

# Example usage:
# Assuming you have a DataFrame 'ohlc_data' with OHLCV columns
# signals_df = calculate_devisso_trend_signals(ohlc_data)
# print(signals_df[['date', 'close', 'BUY', 'SELL', 'TRADE']].tail(20))