In [2]:
import os
import pandas as pd
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import sqlite3
import numpy as np
from functools import reduce
from collections import Counter
from tqdm import tqdm

In [3]:
#set local path
database_path =  'c:\\Users\\iamch\\OneDrive\\Documents\\python\\stocks\\trading\\database.db'

In [4]:
#function to get data for an individual stock from database
def get_individual_data(ticker, database_path, table_name, days):
  conn = sqlite3.connect(database_path)
  cursor = conn.cursor()
  #end date is the latest date available, start_date is the end date minus number of days
  end_date = cursor.execute(f"SELECT MAX(date) FROM {table_name} WHERE ticker = '{ticker}'").fetchone()[0]
  start_date = datetime.strptime(end_date, '%Y-%m-%d').date()-timedelta(days=days)
  back_data = cursor.execute(f"SELECT * FROM {table_name} WHERE ticker='{ticker}' AND date BETWEEN '{start_date}' AND '{end_date}'").fetchall()
  #convert the result into a dataframe
  df = pd.DataFrame(back_data, columns=['ticker', 'Date', 'Open', 'High', 'Low', 'Close', 'Volume'])
  df['Date'] = pd.to_datetime(df['Date'])
  return df

In [6]:
#get all tickers from current database
def list_from_db(database_path, table_name):
  conn = sqlite3.connect(database_path)
  cursor = conn.cursor()

  #list of all stocks within database
  tickers = cursor.execute(f'SELECT ticker FROM {table_name} GROUP BY ticker').fetchall()
  #store as a list
  tickers_ls = [n[0] for n in tickers]

  conn.close()
  return tickers_ls

In [None]:
class SMASignal:

  def __init__(self, backcandles=3, short_SMA=5, long_SMA=20):
    self.backcandles = backcandles
    self.short_SMA = short_SMA
    self.long_SMA = long_SMA

  def calculate(self, df):
    #create a copy so it does not write on original dataframe
    df_copy = df.copy()
    pd.options.mode.chained_assignment = None #switch off warnings
    #calculate SMAs
    df_copy['SMA1'] = df_copy['Close'].rolling(window=self.short_SMA).mean()
    df_copy['SMA2'] = df_copy['Close'].rolling(window=self.long_SMA).mean()
    #first few rows are NaN and reset index
    df_copy = df_copy.dropna().reset_index(drop=True)
    #set columns for buy and sell signals
    df_copy['buy'] = 0
    df_copy['sell'] = 0
    #for each day, evaluate whether there is any day in the last (backcandles days) where price is above SMA2
    for row in range(self.backcandles, len(df_copy)):
      counter = 0
      #check all closing price below SMA2
      for i in range(row-self.backcandles, row):
        if df_copy.Close[i] > df_copy.SMA2[i]:
          counter = 1
      #if it none of the (backcandle days) above SMA2, and then the current day above SMA2
      if counter == 0 and df_copy.Close[row] > df_copy.SMA2[row]:
        df_copy.buy[row]=1
    #set sell signal
    for row in range(0, len(df_copy)):
      if df_copy.Close[row] < df_copy.SMA1[row]:
        df_copy.sell[row]=1
    return df_copy

In [None]:
class EMASignal:

  def __init__(self, backcandles=3, short_EMA=5, long_EMA=20):
    self.backcandles = backcandles
    self.short_EMA = short_EMA
    self.long_EMA = long_EMA

  def calculate(self, df):
    #create a copy so it does not write on original dataframe
    df_copy = df.copy()
    pd.options.mode.chained_assignment = None #switch off warnings
    #calculate EMAs
    df_copy['EMA1'] = df['Close'].ewm(span=self.short_EMA, adjust=False).mean()
    df_copy['EMA2'] = df['Close'].ewm(span=self.long_EMA, adjust=False).mean()
    #first few rows are NaN
    df_copy = df_copy.dropna().reset_index(drop=True)
    #set columns for buy and sell signals
    df_copy['buy'] = 0
    df_copy['sell'] = 0
    #for each day, evaluate whether there is any day in the last (backcandles days) where price is above EMA2
    for row in range(self.backcandles, len(df_copy)):
      counter = 0
      #check all closing price below EMA2
      for i in range(row-self.backcandles, row):
        if df_copy.Close[i] > df_copy.EMA2[i]:
          counter = 1
      #if it none of the (backcandle days) above EMA2, and then the current day above EMA2
      if counter == 0 and df_copy.Close[row] > df_copy.EMA2[row]:
        df_copy.buy[row]=1
    for row in range(0, len(df_copy)):
      if df_copy.Close[row] < df_copy.EMA1[row]:
        df_copy.sell[row]=1
    return df_copy

In [None]:
class BBSignal:

  def __init__(self, window=14, std=1.5):
    self.window = window
    self.std = std

  def calculate(self, df):
    #create a copy so it does not write on original dataframe
    df_copy = df.copy()
    pd.options.mode.chained_assignment = None #switch off warnings
    #calculate the bands
    df_copy['SMA'] = df_copy['Close'].rolling(window=self.window).mean()
    df_copy['STD'] = df_copy['Close'].rolling(window=self.window).std()
    df_copy['Upper'] = df_copy['SMA'] + (df_copy['STD']*self.std)
    df_copy['Lower'] = df_copy['SMA'] - (df_copy['STD']*self.std)
    #first few rows are NaN
    df_copy = df_copy.dropna().reset_index(drop=True)
    #evalute buy and sell
    df_copy['buy'] = (df_copy['Close'] < df_copy['Lower']).astype(int)
    df_copy['sell'] = (df_copy['Close'] > df_copy['Upper']).astype(int)
    return df_copy

In [None]:
class RSISignal:

  def __init__(self, window=14, low=30, high=70):
    self.window = window
    self.low = low
    self.high = high

  def calculate(self, df):
    #Create a copy so it does not write on the original DataFrame
    df_copy = df.copy()
    pd.options.mode.chained_assignment = None #switch off warnings
    #Calculate price changes and gains/losses using rolling window
    delta = df_copy['Close'].diff()
    gains = delta.where(delta > 0, 0)
    losses = -delta.where(delta < 0, 0)
    # Calculate average gains and losses over the specified window
    avg_gains = gains.rolling(window=self.window).mean()
    avg_losses = losses.rolling(window=self.window).mean()
    # Calculate RS and RSI using the average gains and losses
    rs = avg_gains / avg_losses
    rsi = 100 - (100 / (1 + rs))
    # Set RSI values in the DataFrame
    df_copy['RSI'] = rsi
    df_copy = df_copy.dropna().reset_index(drop=True)
    # Evaluate buy/sell signals
    df_copy['buy'] = (df_copy['RSI'] < self.low).astype(int)
    df_copy['sell'] = (df_copy['RSI'] > self.high).astype(int)
    return df_copy

In [None]:
class EngulfSignal:

  def __init__(self):
    #no parameter needed
    pass

  def calculate(self, df):
    #Create a copy so it does not write on the original DataFrame
    df_copy = df.copy()
    pd.options.mode.chained_assignment = None #switch off warnings
    # Set columns for buy and sell signals using where() and astype()
    df_copy['buy'] = ((df_copy['Close'].shift() < df_copy['Open'].shift()) &
                      (df_copy['Open'] < df_copy['Close'].shift()) &
                      (df_copy['Close'] > df_copy['Open'].shift())).astype(int)
    df_copy['sell'] = ((df_copy['Close'].shift() > df_copy['Open'].shift()) &
                       (df_copy['Open'] > df_copy['Close'].shift()) &
                       (df_copy['Close'] < df_copy['Open'].shift())).astype(int)
    return df_copy

In [None]:
def combine_strategy(df_list):
  match_ls = ['Date', 'ticker', 'Open', 'High', 'Low', 'Close', 'Volume']
  #remove all other columns
  df_clean_list = []
  columns = ['Date', 'ticker', 'Open', 'High', 'Low', 'Close', 'Volume', 'buy', 'sell']
  for df in df_list:
    df = df[columns]
    df_clean_list.append(df)
  #merge the dataframes
  merged_df = df_clean_list[0]
  for n in df_clean_list[1:]:
    merged_df = pd.merge(merged_df, n, on=match_ls, how='inner')
    #new 'buy' if both df1 and df2 are 1
    merged_df['buy'] = (merged_df['buy_x'] & merged_df['buy_y']).astype(int)
    merged_df['sell'] = (merged_df['sell_x'] | merged_df['sell_y']).astype(int)
    #remove the extra columns
    merged_df = merged_df.drop(['buy_x', 'buy_y', 'sell_x', 'sell_y'], axis=1)
  return merged_df