In [45]:
import yfinance as yf
import pandas as pd
import numpy as np
import os
import pickle
import joblib
import plotly.graph_objects as go
pd.set_option('display.max_columns', None)

class CandleFit:
    def __init__(self, ticker: str, period: str = '5y'):
        """
        Initializes the CandleFit object with a specified ticker and period.

        Parameters:
        ticker (str): The ticker symbol of the stock.
        period (str): The period for which to download historical data.
        """
        self.ticker = ticker
        self.period = period
        self.data = self.get_ticker()
        self.features = self.get_price_features()
        self.threshold_dict: dict = None
      

    def get_ticker(self):
        """
        Downloads historical data for the specified ticker and period.

        Returns:
        pd.DataFrame: A DataFrame containing the historical data with formatted dates and column names.
        """
        try:
            ticker_obj = yf.Ticker(self.ticker)
            hist = ticker_obj.history(period=self.period)
            hist.index = pd.to_datetime(hist.index).strftime('%Y-%m-%d')
            hist.columns = [col.lower() for col in hist.columns]
            return hist
        except Exception as e:
            print(f"Error downloading aux: {e}")
            return pd.DataFrame()
    
    @staticmethod
    def load_dict(key: str) -> dict:
        """
        Loads a dictionary from a pickle file based on the provided key.

        Parameters:
        key (str): The key to search for in the dictionary.

        Returns:
        dict: The dictionary associated with the provided key.

        Raises:
        FileNotFoundError: If the pickle file is not found.
        KeyError: If the key is not found in the dictionary.
        ValueError: If there is an error loading the pickle file.
        """
        filepath = os.path.join('..', 'pkl', 'threshold_dicts.pkl')
        try:
            with open(filepath, 'rb') as file:
                data = pickle.load(file)
            
            for item in data:
                if key in item:
                    return item[key]
            raise KeyError(f"Key '{key}' not found in the threshold dictionary.")
        except FileNotFoundError:
            raise FileNotFoundError(f"The file '{filepath}' was not found.")
        except pickle.PickleError:
            raise ValueError("Error occurred while loading the pickle file.")
        
    def get_price_features(self):
        """
        Calculates various price and volume features from historical data.

        Returns:
        pd.DataFrame: A DataFrame containing the calculated features.
        """
        aux = self.data.copy()
        df = pd.DataFrame(index=aux.index)
        df.index = pd.to_datetime(df.index).strftime('%Y-%m-%d')
        df['return_rate'] = aux['close'].pct_change()
        df['volume_change'] = aux['volume'].diff()
        df['volume_var'] = aux['volume'].pct_change() + 1
        df['price_range'] = aux['high'] - aux['low']
        df['price_var'] = df['price_range'] / aux['low']
        df['price_change'] = aux['close'] - aux['open']
        df['close_vol'] = aux['close'].expanding().std()
        df['low_vol'] = aux['low'].expanding().std()
        df['high_vol'] = aux['high'].expanding().std()
        df['open_vol'] = aux['open'].expanding().std()
        df['upper_wick'] = aux['high'] - aux[['open', 'close']].max(axis=1)
        df['lower_wick'] = aux[['open', 'close']].min(axis=1) - aux['low']
        df['wick_change'] = df['upper_wick'] - df['lower_wick']
        df['wick_var'] = df['wick_change'] / df['lower_wick']
        df['wick_vol'] = df['wick_change'].abs().expanding().std()
        df = df.apply(pd.to_numeric, errors='coerce')
        df = pd.concat([aux, df], axis=1)
        
        return df

    def get_candle_features(self, 
                            doji_threshold: float = None, 
                            bullish_threshold : float = None,
                            bearish_threshold: float = None,
                            volatility_window: int = None):
                            
 
        """
        Calculates various candlestick features based on price data and a threshold dictionary.

        Parameters:
        threshold_dict (dict): A dictionary containing thresholds for calculating features. If None, it loads the default dictionary.

        Returns:
        pd.DataFrame: A DataFrame containing the calculated candlestick features.
        """
        
        aux = self.features.copy()
        aux = aux.loc[:, ~aux.columns.duplicated()]

        threshold_dict = self.load_dict(key='threshold_dict')
        threshold_dict = {
            'doji_threshold': doji_threshold if doji_threshold is not None else threshold_dict.get('doji_threshold'),
            'bullish_threshold': bullish_threshold if bullish_threshold is not None else threshold_dict.get('bullish_threshold'),
            'bearish_threshold': bearish_threshold if bearish_threshold is not None else threshold_dict.get('bearish_threshold'),
            'volatility_window': volatility_window if volatility_window is not None else threshold_dict.get('volatility_window')
        }
        
        df = pd.DataFrame(index=aux.index)
        df[f'std_{threshold_dict["volatility_window"]}'] = aux['price_change'].rolling(window=threshold_dict['volatility_window']).std().abs()
        df['bearish_threshold'] = pd.to_numeric(threshold_dict["bearish_threshold"] * df[f'std_{threshold_dict["volatility_window"]}'], errors='coerce').fillna(0)
        df['bullish_threshold'] = pd.to_numeric(threshold_dict["bullish_threshold"] * df[f'std_{threshold_dict["volatility_window"]}'], errors='coerce').fillna(0)
        df['is_bearish'] = (aux['close'] <= (aux['open'] - df['bearish_threshold'])).astype(int)
        df['is_bullish'] = (aux['close'] >= (aux['open'] + df['bullish_threshold'])).astype(int)
        df['is_doji'] = (abs(aux['close'] - aux['open']) <= threshold_dict['doji_threshold']).astype(int)
        df['is_bearish_open_gap'] = (aux['open'] < aux['close'].shift(1)).astype(int)
        df['is_bullish_open_gap'] = (aux['open'] > aux['close'].shift(1)).astype(int)
        self.threshold_dict = threshold_dict
        df = pd.concat([aux, df], axis=1)

        self.features = df
        return  self.features

    def fit_morning_star(self,
                         doji_threshold: float = None,
                         bullish_threshold : float = None,
                         bearish_threshold: float = None,
                         volatility_window: int = None):

        """
        Identifies the morning star candlestick pattern in the historical data.

        Parameters:
        threshold_dict (dict): A dictionary containing thresholds for the morning star pattern. If None, it loads the default dictionary.

        Returns:
        pd.DataFrame: A DataFrame with a column indicating the presence of the morning star pattern.
        """
        
        df = self.get_candle_features()
        df = df.loc[:, ~df.columns.duplicated(keep='last')]


        threshold_dict = self.load_dict(key='morning_star_dict')
        threshold_dict = {
            'doji_threshold': doji_threshold if doji_threshold is not None else threshold_dict.get('doji_threshold'),
            'bullish_threshold': bullish_threshold if bullish_threshold is not None else threshold_dict.get('bullish_threshold'),
            'bearish_threshold': bearish_threshold if bearish_threshold is not None else threshold_dict.get('bearish_threshold'),
            'volatility_window': volatility_window if volatility_window is not None else threshold_dict.get('volatility_window')
                }
        display(pd.DataFrame([threshold_dict]))
            # Condition 1: Two days ago was a bearish candle and the close of that day is lower than the open of that day adjusted by the threshold
        df['is_bearish_morning_star'] = ((df['is_bearish'].shift(2) == 1) & 
                                        (df['close'].shift(2) + threshold_dict['bearish_threshold'] * df['price_change'].shift(2) 
                                        <= df['open'].shift(2))).astype(int)

        # Condition 2: The previous day was a doji candle and had a bearish open gap
        df['is_bearish_open_gap_morning_star'] = (df['is_bearish_open_gap'].shift(1) == 1).astype(int)
        df['is_doji_morning_star'] = (df['is_doji'].shift(1) == 1).astype(int)

        # Condition 3: Today is a bullish candle and the price change from two days ago to today is significant
        df['is_bullish_morning_star'] = ((df['is_bullish'] == 1) & 
                                        (df['close'] - df['close'].shift(2) >= threshold_dict['bullish_threshold'] 
                                        * df['price_change'].shift(2).abs())).astype(int)

        # Combine all conditions to determine the morning star pattern
        df['is_morning_star'] = df[['is_bearish_morning_star', 'is_bearish_open_gap_morning_star', 'is_doji_morning_star', 
                                    'is_bullish_morning_star']].all(axis=1).astype(int)

        self.threshold_dict = threshold_dict
        
        self.features = df
        return self.features
    
    def get_movings(self, short:int = None, long:int = None, strategy: str = 'test'):

        """
        Calculates buy and sell signals based on moving averages for different strategies and parameters.

        Parameters:
        - threshold_dict (dict, optional): Dictionary containing moving average settings for different strategies. If any of parameters is None,
        it will be loaded with `self.load_dict(key='rolling_cross_dict')` and available strategies will be measured.  
        - short (int, optional): Time window for the short moving average. Not used directly in the function.
        - long_ (int, optional): Time window for the long moving average. Not used directly in the function.
        - signal_window (int, optional): Time window for signal calculation. Not used directly in the function.

        Returns:
        - pd.DataFrame: DataFrame with additional columns for moving averages and buy/sell signals.
        """
        df = self.get_candle_features()  
        
        if (short is None) != (long is None):
            raise ValueError("Please set both short and long to valid int or set both to None.")
        elif all(x is not None for x in (short, long)):
            threshold_dict = {f'{strategy}_{short}_{long}': {'short': short, 'long': long}}
        else:
            display('Loading standard strategies')
            threshold_dict = self.load_dict(key='rolling_cross_dict')
            display(threshold_dict)
            
    
        tolerance = 0.0001  
        for key, value in threshold_dict.items():
            strategy = key.split('_')[0] if '_' in key else key
            short = value['short']
            long = value['long']
            
            df[f'{strategy}_short_{short}'] = df['close'].rolling(window=short, min_periods=1).mean()
            df[f'{strategy}_long_{long}'] = df['close'].rolling(window=long, min_periods=1).mean()
            
            # Detectar cruzamento de médias móveis com margem de tolerância
            buy = (df[f'{strategy}_short_{short}'] > df[f'{strategy}_long_{long}'] + tolerance) & \
                (df[f'{strategy}_short_{short}'].shift(1) < df[f'{strategy}_long_{long}'].shift(1) + tolerance)
            
            sell = (df[f'{strategy}_short_{short}'] < df[f'{strategy}_long_{long}'] - tolerance) & \
                (df[f'{strategy}_short_{short}'].shift(1) > df[f'{strategy}_long_{long}'].shift(1) - tolerance)
            
            df[f'{strategy}_{short}_{long}'] = np.where(buy, 1, np.where(sell, -1, 0))
                        
            df = df.loc[:, ~df.columns.duplicated(keep='last')]

        self.features = df
        self.threshold_dict = threshold_dict  
        return self.features          

    def candlestick_chart(self, 
                      key: str = 'is_morning_star', 
                      plot_type: str = 'pattern',
                      height: int = 900, 
                      offset: float = 12):
        """
        Generates a candlestick chart with optional pattern or price action markers.

        Parameters:
        - key (str): Key for identifying the pattern or price action indicators. Defaults to 'is_morning_star'.
        - plot_type (str): Type of plot to generate. Options are 'pattern' or 'price_action'. Defaults to 'pattern'.
        - height (int): Height of the plot in pixels. Defaults to 900.
        - offset (float): Vertical offset for pattern markers. Defaults to 10.

        Returns:
        - go.Figure: A Plotly Figure object with the candlestick chart.
        """

        aux = self.features.copy()
        aux = aux.loc[:, ~aux.columns.duplicated(keep='last')]
        if key not in aux.columns:
            print(f"Key '{key}' not found in aux columns.")
            return None
            
        if plot_type == 'pattern':     
            markers = aux[aux[key] == 1].copy()
            markers['close'] = markers['low'] - offset
            markers = markers.dropna(subset=['close'])
            marker_trace = go.Scatter(
                x=markers.index,
                y=markers['close'],
                mode='markers',
                marker=dict(
                    color='blue',
                    size=8,
                    symbol='triangle-up'
                ),
                name=key,
                yaxis="y"
            )
        
        elif plot_type == 'moving_cross':
            strategy, short, long = key.split('_')
            short_col = f'{strategy}_short_{short}'
            long_col = f'{strategy}_long_{long}'
            signal_col = f'{strategy}_{short}_{long}'
            short_trace = go.Scatter(
                x=aux.index,
                y=aux[short_col],
                mode='lines',
                name=f'{strategy.capitalize()} Short {short}',
                line=dict(color='purple')  
            )
            long_trace = go.Scatter(
                x=aux.index,
                y=aux[long_col],
                mode='lines',
                name=f'{strategy.capitalize()} Long {long}',
                line=dict(color='orange')  
            )

            markers = aux[aux[signal_col] != 0].copy()  
            markers['close'] = markers.apply(
                lambda row: row['low'] - offset if row[signal_col] == 1 else row['high'] + offset,
                axis=1
            )
            markers['color'] = markers[signal_col].apply(lambda x: 'green' if x == 1 else 'red')
            markers['symbol'] = markers[signal_col].apply(lambda x: 'arrow-up' if x == 1 else 'arrow-down')
            
            marker_trace = go.Scatter(
                x=markers.index,
                y=markers['close'],
                mode='markers',
                marker=dict(
                    color=markers['color'],
                    size=8,
                    symbol=markers['symbol']
                ),
                name='Signal',
                yaxis="y"
            )
            
            trace = go.Candlestick(
                x=aux.index,
                open=aux["open"],
                high=aux["high"],
                low=aux["low"],
                close=aux["close"],
                name=self.ticker,
                yaxis="y"
            )
            
            volume_colors = ['green' if aux['volume'][i] > aux['volume'][i-1] else 'red' for i in range(1, len(aux))]
            volume_colors.insert(0, 'green')
            volume_trace = go.Bar(
                x=aux.index,
                y=aux['volume'],
                marker_color=volume_colors,
                name='Volume',
                yaxis="y2"
            )
            
            layout = go.Layout(
                title=f"{self.ticker} Candlestick Chart markers: {key.capitalize()}",
                xaxis=dict(title="Date"),
                yaxis=dict(title="Price", domain=[0.3, 1]),
                yaxis2=dict(title="Volume", domain=[0, 0.2]),
                height=height,
                barmode='relative'
            )
            
            fig = go.Figure(data=[trace, short_trace, long_trace, marker_trace, volume_trace], layout=layout)
            return fig
        
    @staticmethod      
    def get_trades(df, 
               strategy: str = None,
               reward_risk_ratio: list = None, 
               price_col: str = 'close',
               trade_period=7,
               target_return=0.01):
    
        cols = ['open', 'high', 'low', 'close']
        shifted_cols = []
        stop_loss_cols = []

        for col in cols:
            for i in range(2, trade_period + 1):
                df[f'{col}_{i}'] = df[col].shift(-i)
                shifted_cols.append(f'{col}_{i}')
        
        has_signals = (df[strategy] == 1) | (df[strategy] == -1)
        if not has_signals.any():
            return "The strategy did not generate buy or sell signals."
        
        df = df[cols + [strategy] + shifted_cols][has_signals].copy()
        df['side'] = df[strategy].apply(lambda x: 'long' if x == 1 else 'short')
        df['target_price'] = df.apply(lambda row: (1 + target_return) * row[price_col] if row[strategy] == 1 else (1 - target_return) * row[price_col], axis=1)
        if reward_risk_ratio is None:      
            reward_risk_ratio = np.arange(0.25, 2.25, 0.25)
        
        for rw in reward_risk_ratio:
            df[f'stop_loss_reward_risk_ratio_{rw}'] = np.where(df['side'] == 'long', 
                                                                df[price_col] * (1 - target_return * rw),
                                                                df[price_col] * (1 + target_return * rw))
            stop_loss_cols.append(f'stop_loss_reward_risk_ratio_{rw}')

        
        for loss_col in stop_loss_cols:
            df[f'out_of_position_{loss_col}'] = np.nan
            df[f'result_{loss_col}'] = np.nan
            df[f'result_{loss_col}_value'] = np.nan
            
            for index, row in df.iterrows():
                stop_loss_val = row[loss_col]
                target_price = row['target_price']
                entry_price = row[price_col] 
                
                for i in range(2, trade_period + 1):
                    next_open = row[f'open_{i}']
                    next_high = row[f'high_{i}']
                    next_low = row[f'low_{i}']
                    next_close = row[f'close_{i}']
                    
                    if row['side'] == 'long':
                        price_vars = [next_open, next_low, next_high, next_close]
                        for price_var in price_vars:
                            if price_var > target_price:
                                df.at[index, f'out_of_position_{loss_col}'] = i
                                df.at[index, f'result_{loss_col}'] = 'profit'
                                df.at[index, f'result_{loss_col}_value'] = price_var - entry_price
                                break
                            elif price_var < stop_loss_val:
                                df.at[index, f'out_of_position_{loss_col}'] = i
                                df.at[index, f'result_{loss_col}'] = 'loss'
                                df.at[index, f'result_{loss_col}_value'] = entry_price - price_var
                                break
                    else:  # 'short'
                        price_vars = [next_open, next_high, next_low, next_close]
                        for price_var in price_vars:
                            if price_var < target_price:
                                df.at[index, f'out_of_position_{loss_col}'] = i
                                df.at[index, f'result_{loss_col}'] = 'profit'
                                df.at[index, f'result_{loss_col}_value'] = entry_price - price_var
                                break
                            elif price_var > stop_loss_val:
                                df.at[index, f'out_of_position_{loss_col}'] = i
                                df.at[index, f'result_{loss_col}'] = 'loss'
                                df.at[index, f'result_{loss_col}_value'] = entry_price - price_var
                                break
                    
                    if not np.isnan(df.at[index, f'out_of_position_{loss_col}']):
                        break
                
        return df

In [46]:

ticker = CandleFit('GOOG')
 

In [47]:
test = ticker.get_movings()



'Loading standard strategies'

{'sma_5_l0': {'short': 5, 'long': 10},
 'sma_5_20': {'short': 5, 'long': 20},
 'sma_10_30': {'short': 10, 'long': 30},
 'sma_10_50': {'short': 10, 'long': 50},
 'sma_20_50': {'short': 20, 'long': 50},
 'sma_20_100': {'short': 20, 'long': 100},
 'sma_50_100': {'short': 50, 'long': 100},
 'sma_50_150': {'short': 50, 'long': 150},
 'sma_100_200': {'short': 100, 'long': 200},
 'ema_5_10': {'short': 5, 'long': 10},
 'ema_5_20': {'short': 5, 'long': 20},
 'ema_10_30': {'short': 10, 'long': 30},
 'ema_10_50': {'short': 10, 'long': 50},
 'ema_20_50': {'short': 20, 'long': 50},
 'ema_20_100': {'short': 20, 'long': 100},
 'ema_50_100': {'short': 50, 'long': 100},
 'ema_50_150': {'short': 50, 'long': 150},
 'ema_100_200': {'short': 100, 'long': 200},
 'macd_9_12': {'short': 12, 'long': 26},
 'macd_9_30': {'short': 30, 'long': 60},
 'macd_12_26': {'short': 12, 'long': 26},
 'macd_12_50': {'short': 12, 'long': 50},
 'macd_26_50': {'short': 26, 'long': 50},
 'macd_26_100': {'short': 26, 'long': 100}

In [48]:
ticker.candlestick_chart(key='sma_20_100', plot_type='moving_cross')

In [39]:
test = ticker.features.copy()

In [40]:
def get_trades(df, 
               strategy: str = None,
               reward_risk_ratio: list = None, 
               price_col: str = 'close',
               trade_period=7,
               target_return=0.01):
    
    cols = ['open', 'high', 'low', 'close']
    shifted_cols = []
    stop_loss_cols = []
    
    # Criar colunas deslocadas
    for col in cols:
        for i in range(2, trade_period + 1):
            df[f'{col}_{i}'] = df[col].shift(-i)
            shifted_cols.append(f'{col}_{i}')
    
    # Verificar se há sinais
    has_signals = (df[strategy] == 1) | (df[strategy] == -1)
    if not has_signals.any():
        return "The strategy did not generate buy or sell signals."
    
    df = df[cols + [strategy] + shifted_cols][has_signals].copy()
    df['side'] = df[strategy].apply(lambda x: 'long' if x == 1 else 'short')
    df['target_price'] = df.apply(lambda row: (1 + target_return) * row[price_col] if row[strategy] == 1 else (1 - target_return) * row[price_col], axis=1)
    if reward_risk_ratio is None:      
        reward_risk_ratio = np.arange(0.25, 2.25, 0.25)
    
    for rw in reward_risk_ratio:
        df[f'stop_loss_reward_risk_ratio_{rw}'] = np.where(df['side'] == 'long', 
                                                            df[price_col] * (1 - target_return * rw),
                                                            df[price_col] * (1 + target_return * rw))
        stop_loss_cols.append(f'stop_loss_reward_risk_ratio_{rw}')

    
    for loss_col in stop_loss_cols:
        df[f'out_of_position_{loss_col}'] = np.nan
        df[f'result_{loss_col}'] = np.nan
        df[f'result_{loss_col}_value'] = np.nan
        
        for index, row in df.iterrows():
            stop_loss_val = row[loss_col]
            target_price = row['target_price']
            entry_price = row[price_col] 
            
            for i in range(2, trade_period + 1):
                next_open = row[f'open_{i}']
                next_high = row[f'high_{i}']
                next_low = row[f'low_{i}']
                next_close = row[f'close_{i}']
                
                if row['side'] == 'long':
                    price_vars = [next_open, next_low, next_high, next_close]
                    for price_var in price_vars:
                        if price_var > target_price:
                            df.at[index, f'out_of_position_{loss_col}'] = int(i)
                            df.at[index, f'result_{loss_col}'] = 'profit'
                            df.at[index, f'result_{loss_col}_value'] = price_var - entry_price
                            break
                        elif price_var < stop_loss_val:
                            df.at[index, f'out_of_position_{loss_col}'] = int(i)
                            df.at[index, f'result_{loss_col}'] = 'loss'
                            df.at[index, f'result_{loss_col}_value'] = price_var - entry_price
                            break
                else:  # 'short'
                    price_vars = [next_open, next_high, next_low, next_close]
                    for price_var in price_vars:
                        if price_var < target_price:
                            df.at[index, f'out_of_position_{loss_col}'] =int(i)
                            df.at[index, f'result_{loss_col}'] = 'profit'
                            df.at[index, f'result_{loss_col}_value'] = entry_price - price_var
                            break
                        elif price_var > stop_loss_val:
                            df.at[index, f'out_of_position_{loss_col}'] = int(i)
                            df.at[index, f'result_{loss_col}'] = 'loss'
                            df.at[index, f'result_{loss_col}_value'] = entry_price - price_var
                            break
                
                if not np.isnan(df.at[index, f'out_of_position_{loss_col}']):
                    break
            
    return df


In [18]:
tes = get_trades(test, 'sma_20_100',target_return=0.03)


In [19]:
tes

'The strategy did not generate buy or sell signals.'