In [1]:
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
import pandas as pd

In [235]:
def read_csv(stock):
    filepath = f"C:/Users/danie/Documents/Software/Python-Finance-QuantConnect/DATA/{stock}.csv"
    df = pd.read_csv(filepath)
    print("len(df): ", len(df))
    return df

In [236]:
def LR_SMA(df, n_days, n_sma):    
    # Calculate the simple moving average (SMA) for a window of n days
    df['SMA'] = df['Log Returns'].rolling(window=n_sma).mean()

    df = df.dropna(subset=['Log Returns', 'SMA'])
    df.reset_index(drop=True, inplace=True)
    
    # Initialize X and Y
    X = []
    Y = []
    
    # Populate X and Y
    for i in range(n_days*n_sma, len(df)):
        #print("i: ", i)
        X.append(df[['SMA']].iloc[i-n_days*n_sma:i:n_sma].values.flatten())
#         X.append(df[['SMA']].iloc[i-n_days*n_sma:i:n_sma].values.flatten())
        Y.append(df['SMA'].iloc[i])

    # split data
    test_size = int(len(X) * 0.2)

    # Training set
    X_train = X[:-test_size]
    y_train = Y[:-test_size]

    # Testing set
    X_test = X[-test_size:]
    y_test = Y[-test_size:]

    # Fit the model on the training data
    model = LinearRegression(n_jobs=-1).fit(X_train, y_train)
    # normalize = False (input data)
    # copy_x = True (overwrite input variables)
    # n_jobs = None (number of parallelism. -1 uses all available processors)

    r_sq = model.score(X_test, y_test)
#     print(f"coefficient of determination: {r_sq}")
#     print(f"intercept: {model.intercept_}")
#     print(f"slope: {model.coef_}")
    return r_sq

In [246]:
def technical_indicators(df, n_ma, n_future):
    """
    calculate technical indicators of the stock
    :returns: updated dataframe
    """
    df_tech = df
    df_tech['Daily Returns'] = df_tech["Adj Close"].pct_change(1)
    df_tech['Log Returns'] = np.log(1 + df_tech['Daily Returns'])
    df_tech.dropna(inplace=True)
    df_tech.reset_index(drop=True, inplace=True)
    
    # Calculate Exponential Moving Average (EMA)
    df_tech['EMA'] = df_tech['Log Returns'].ewm(span=n_ma, adjust=False).mean()
    # Calculate the simple moving average (SMA) for a window of n days
    df_tech['SMA'] = df_tech['Log Returns'].rolling(window=n_ma).mean()
    # SMA used for dependent variable
#     df_tech['SMA_future'] = 
    
    # Calculate the short-term EMA (12 periods)
    df_tech['EMA_12'] = df_tech['Adj Close'].ewm(span=12, adjust=False).mean()
    # Calculate the long-term EMA (26 periods)
    df_tech['EMA_26'] = df_tech['Adj Close'].ewm(span=26, adjust=False).mean()
    # Calculate the MACD line
    df_tech['MACD'] = df_tech['EMA_12'] - df_tech['EMA_26']
    # Calculate the Signal line
    df_tech['Signal_Line'] = df_tech['MACD'].ewm(span=9, adjust=False).mean()

    df_tech = df_tech.dropna(subset=['Log Returns', 'EMA'])
    df_tech.reset_index(drop=True, inplace=True)
    return df_tech

In [238]:
def LR_EMA(df, n_days, n_ema):    
    # Calculate Exponential Moving Average (EMA)
    df['EMA'] = df['Log Returns'].ewm(span=n_ema, adjust=False).mean()

    df = df.dropna(subset=['Log Returns', 'EMA'])
    df.reset_index(drop=True, inplace=True)
    
    # Initialize X and Y
    X = []
    Y = []
    
    # Populate X and Y
    for i in range(n_days*n_ema, len(df)):
        #print("i: ", i)
        X.append(df[['EMA']].iloc[i-n_days*n_ema:i:n_ema].values.flatten())
        Y.append(df['EMA'].iloc[i])

    # split data
    test_size = int(len(X) * 0.2)

    # Training set
    X_train = X[:-test_size]
    y_train = Y[:-test_size]

    # Testing set
    X_test = X[-test_size:]
    y_test = Y[-test_size:]

    # Fit the model on the training data
    model = LinearRegression(n_jobs=-1).fit(X_train, y_train)
    # normalize = False (input data)
    # copy_x = True (overwrite input variables)
    # n_jobs = None (number of parallelism. -1 uses all available processors)

    r_sq = model.score(X_test, y_test)
#     print(f"coefficient of determination: {r_sq}")
#     print(f"intercept: {model.intercept_}")
#     print(f"slope: {model.coef_}")
    return r_sq

In [254]:
def LR_tomorrow(df, n_days, n_ma, n_future):  
    
    df_tech = technical_indicators(df, n_ma, n_future)
    
    # Initialize X and Y
    X = []
    Y = []
    
    
#     n_days:  5
#     n_ma:  19
#     n_future:  6
#     n_days*n_ma = 5 * 19 = 100 - 5 = 95
    
    # Populate X and Y
    
    for i in range(n_days*n_ma, len(df_tech)-(-n_ma+1+n_future)):
        emas = df_tech[['EMA']].iloc[i-n_days*n_ma:i:n_ma].values.flatten()
#         for ema in emas:
#             if ema is None:
#                 print("I'm None")
        
#         if not emas.size or pd.isna(emas).any():
#             print(f"Empty or NaN values in emas at index {i}, emas: {emas}")
        X.append(emas)
        start = i - n_ma + 1
        end = start + n_future
        mean_log_returns = df_tech['Log Returns'].iloc[start:end].mean()
        Y.append(mean_log_returns)

    # split data
    test_size = int(len(X) * 0.2)

    # Training set
    X_train = X[:-test_size]
    y_train = Y[:-test_size]

    # Testing set
    X_test = X[-test_size:]
    y_test = Y[-test_size:]

#     if not X_train:
#         print(f"range({n_days*n_ma}, {len(df_tech)-(-n_ma+1+n_future)})")
#         print(f"emas: {emas}")
#         print("n_days: ", n_days)
#         print("n_ma: ", n_ma)
#         print("n_future: ", n_future)
#         print("test_size: ", test_size)
#         print("X is empty, check your loop conditions and input data.")
#         raise ValueError(f"")
    
    # Fit the model on the training data
    model = LinearRegression(n_jobs=-1).fit(X_train, y_train)
    # normalize = False (input data)
    # copy_x = True (overwrite input variables)
    # n_jobs = None (number of parallelism. -1 uses all available processors)

    r_sq = model.score(X_test, y_test)
#     print(f"coefficient of determination: {r_sq}")
#     print(f"intercept: {model.intercept_}")
#     print(f"slope: {model.coef_}")
    return r_sq

In [None]:
stocks = ['AAPL','BAC','COST','C','DG','FB','HSBC','JPM']
best = {}
secondbest = {}
for stock in stocks:
    df = read_csv(stock)
    best[stock] = {'r_sq': -100}
    secondbest[stock] = {'r_sq': -100}
    
     # Get the ranges for the current stock
    stock_ranges = ranges[stock]
    print(f"stock_ranges: {stock_ranges}")
    days_list = list(range(*stock_ranges['days']))
    n_mas = list(range(*stock_ranges['n_ma']))
    n_futures = list(range(*stock_ranges['n_future']))
    
    
    for days in days_list:
        for n_ma in n_mas:
            for n_future in n_futures:
                r_sq = LR_tomorrow(df.copy(), days, n_ma, n_future)
                if r_sq > best[stock]['r_sq']:
                    if best[stock]['r_sq'] > secondbest[stock]['r_sq']:
                        secondbest[stock] = best[stock]
                    best[stock] = {'r_sq': r_sq, 'days': days, 
                                   'n_ma': n_sma, 'n_future': n_future}
    print(f"best[{stock}] is: {best[stock]}")
# print("best:\n", best)
# print("secondbest:\n", secondbest)
# printfunc(best)
print_comparison3(best, secondbest)

len(df):  1258
best[AAPL] is: {'r_sq': 0.013318159352890158, 'days': 6, 'n_ma': 13, 'n_future': 8}
len(df):  1259
best[BAC] is: {'r_sq': -0.0011984628165084832, 'days': 4, 'n_ma': 13, 'n_future': 1}
len(df):  1258
best[COST] is: {'r_sq': 0.18192839058993204, 'days': 9, 'n_ma': 13, 'n_future': 22}
len(df):  1259
best[C] is: {'r_sq': 0.007217831487729143, 'days': 2, 'n_ma': 13, 'n_future': 1}
len(df):  1258
best[DG] is: {'r_sq': 0.008895093752942618, 'days': 7, 'n_ma': 13, 'n_future': 2}
len(df):  1258
best[FB] is: {'r_sq': 0.0429900015005491, 'days': 1, 'n_ma': 13, 'n_future': 8}
len(df):  1259
best[HSBC] is: {'r_sq': 0.01823042043719425, 'days': 3, 'n_ma': 13, 'n_future': 1}
len(df):  1259


In [211]:
ranges = {'AAPL': {'days': (4, 11), 'n_ma': (9, 16), 'n_future': (5, 11)},
         'BAC':   {'days': (1, 6),  'n_ma': (9, 15), 'n_future': (1, 5)},
         'COST':  {'days': (8, 15), 'n_ma': (15, 25),'n_future': (18, 24)},
         'C':     {'days': (1, 6),  'n_ma': (1, 11), 'n_future': (1, 5)},
         'DG':    {'days': (3, 11), 'n_ma': (7, 20), 'n_future': (1, 5)},
         'FB':    {'days': (1, 9), 'n_ma': (8, 21), 'n_future': (4, 11)},
         'HSBC':  {'days': (1, 6), 'n_ma': (1, 6), 'n_future': (1, 5)},
         'JPM':   {'days': (1, 6), 'n_ma': (10, 15), 'n_future': (1, 5)}}

# Ticker        R^2                 Days       n_ma       n_future
# -----------------------------------------------------------------
# AAPL      0.02480 (   0.02455)      6 (  6)     17 ( 17)         8 (        6)
# BAC       0.00026 (  -0.00018)      4 (  3)     12 ( 12)         1 (        1)
# COST      0.06092 (   0.05711)      9 (  7)     16 ( 16)        10 (       10)
# C         0.00722 (   0.00099)      2 (  1)      1 ( 20)         1 (        2)
# DG        0.01292 (   0.00944)      7 (  5)      9 ( 18)         2 (        2)
# FB        0.06486 (   0.05884)     10 (  6)     10 ( 18)         5 (        8)
# HSBC      0.01981 (   0.01823)      7 (  3)      2 (  1)         1 (        1)
# JPM       0.00436 (   0.00224)      4 (  3)     12 ( 12)         2 (        2)

In [209]:
import statistics
import math

def print_comparison3(best, secondbest):
    # Print the header
    header = f"{'Ticker':<6} {'R^2':>10} {'':>13} {'Days':>6} {'':>4} {'n_ma':>6} {'':>4} {'n_future':>6}"
    print(header)
    print('-' * len(header))

    # Initialize lists to hold the values for calculating the summary statistics
    r2_values = []
    r2_values_second = []
    days_values = []
    days_values_second = []
    n_ma_values = []
    n_ma_values_second = []
    n_future_values = []
    n_future_values_second = []

    # Print each item and collect values for the summary
    for ticker in best:
        best_metrics = best[ticker]
        secondbest_metrics = secondbest.get(ticker, {})
        
        # Append values for best and second best performance
        r2_values.append(best_metrics['r_sq'])
        r2_values_second.append(secondbest_metrics.get('r_sq', float('nan')))
        days_values.append(best_metrics['days'])
        days_values_second.append(secondbest_metrics.get('days', float('nan')))
        n_ma_values.append(best_metrics['n_ma'])
        n_ma_values_second.append(secondbest_metrics.get('n_ma', float('nan')))
        n_future_values.append(best_metrics['n_future'])
        n_future_values_second.append(secondbest_metrics.get('n_future', float('nan')))
        
        # Format and print the line for each ticker
        line = f"{ticker:<6} {best_metrics['r_sq']:>10.5f} ({secondbest_metrics.get('r_sq', 'n/a'):>10.5f}) "
        line += f"{best_metrics['days']:>6} ({secondbest_metrics.get('days', 'n/a'):>3}) "
        line += f"{best_metrics['n_ma']:>6} ({secondbest_metrics.get('n_ma', 'n/a'):>3})"
        line += f" {best_metrics['n_future']:>6} ({secondbest_metrics.get('n_future', 'n/a'):>3})"
        print(line)

    # Calculate the summary statistics for best and second best performances
    def calculate_summary(values):
        # Filter out nan values for accurate calculation
        filtered_values = [v for v in values if not math.isnan(v)]
        average = statistics.mean(filtered_values)
        median = statistics.median(filtered_values)
        stdev = statistics.stdev(filtered_values) if len(filtered_values) > 1 else 0
        return average, median, stdev

    # Calculate and print the best performance summary
    r2_avg, r2_med, r2_stdev = calculate_summary(r2_values)
    days_avg, days_med, days_stdev = calculate_summary(days_values)
    n_ma_avg, n_ma_med, n_ma_stdev = calculate_summary(n_ma_values)
    n_future_avg, n_future_med, n_future_stdev = calculate_summary(n_future_values)
    
    # Calculate and print the second best performance summary
    r2_avg_second, r2_med_second, r2_stdev_second = calculate_summary(r2_values_second)
    days_avg_second, days_med_second, days_stdev_second = calculate_summary(days_values_second)
    n_ma_avg_second, n_ma_med_second, n_ma_stdev_second = calculate_summary(n_ma_values_second)
    n_future_avg_second, n_future_med_second, n_future_stdev_second = calculate_summary(n_future_values_second)


    # Print the rows for Average, Median, and StDev with the calculated values
    print("\nPerformance Summary:")
    print(f"{'Metric':<8} {'R^2':>8.5} {'':>10} {'Days':>8.5} {'':>10} {'n_ma':>8.5} {'':>10} {'n_future':>8.5}")
    # Print the rows for Average, Median, and StDev with the best and second-best values
    print(f"{'Average':<8} {r2_avg:>8.5f} ({r2_avg_second:>8.5f}) {days_avg:>5.2f} ({days_avg_second:>5.2f}) {n_ma_avg:>5.2f} ({n_ma_avg_second:>5.2f}) {n_future_avg:>5.2f} ({n_future_avg_second:>5.2f})")
    print(f"{'Median':<8} {r2_med:>8.5f} ({r2_med_second:>8.5f}) {days_med:>5.2f} ({days_med_second:>5.2f}) {n_ma_med:>5.2f} ({n_ma_med_second:>5.2f}) {n_future_med:>5.2f} ({n_future_med_second:>5.2f})")
    print(f"{'StDev':<8} {r2_stdev:>8.5f} ({r2_stdev_second:>8.5f}) {days_stdev:>5.2f} ({days_stdev_second:>5.2f}) {n_ma_stdev:>5.2f} ({n_ma_stdev_second:>5.2f}) {n_future_stdev:>5.2f} ({n_future_stdev_second:>5.2f})")

In [105]:
def printfunc(data):
    # Print the header
    header = f"{'Ticker':<6} {'R^2':>10} {'Days':>6} {'n_SMA':>6}"
    print(header)
    print('-' * len(header))

    # Print each item
    for ticker, metrics in data.items():
        line = f"{ticker:<6} {metrics['r_sq']:>10.5f} {metrics['days']:>6} {metrics['n_sma']:>6}"
        print(line)

In [162]:
import statistics
import math

def print_comparison2(best, secondbest):
    # Print the header
    header = f"{'Ticker':<6} {'R^2':>10} {'':>13} {'Days':>6} {'':>4} {'n_SMA':>6}"
    print(header)
    print('-' * len(header))

    # Initialize lists to hold the values for calculating the summary statistics
    r2_values = []
    r2_values_second = []
    days_values = []
    days_values_second = []
    n_sma_values = []
    n_sma_values_second = []

    # Print each item and collect values for the summary
    for ticker in best:
        best_metrics = best[ticker]
        secondbest_metrics = secondbest.get(ticker, {})
        
        # Append values for best and second best performance
        r2_values.append(best_metrics['r_sq'])
        r2_values_second.append(secondbest_metrics.get('r_sq', float('nan')))
        days_values.append(best_metrics['days'])
        days_values_second.append(secondbest_metrics.get('days', float('nan')))
        n_sma_values.append(best_metrics['n_sma'])
        n_sma_values_second.append(secondbest_metrics.get('n_sma', float('nan')))
        
        # Format and print the line for each ticker
        line = f"{ticker:<6} {best_metrics['r_sq']:>10.5f} ({secondbest_metrics.get('r_sq', 'n/a'):>10.5f}) "
        line += f"{best_metrics['days']:>6} ({secondbest_metrics.get('days', 'n/a'):>3}) "
        line += f"{best_metrics['n_sma']:>6} ({secondbest_metrics.get('n_sma', 'n/a'):>3})"
        print(line)

    # Calculate the summary statistics for best and second best performances
    def calculate_summary(values):
        # Filter out nan values for accurate calculation
        filtered_values = [v for v in values if not math.isnan(v)]
        average = statistics.mean(filtered_values)
        median = statistics.median(filtered_values)
        stdev = statistics.stdev(filtered_values) if len(filtered_values) > 1 else 0
        return average, median, stdev

    # Calculate and print the best performance summary
    r2_avg, r2_med, r2_stdev = calculate_summary(r2_values)
    days_avg, days_med, days_stdev = calculate_summary(days_values)
    n_sma_avg, n_sma_med, n_sma_stdev = calculate_summary(n_sma_values)
    
    # Calculate and print the second best performance summary
    r2_avg_second, r2_med_second, r2_stdev_second = calculate_summary(r2_values_second)
    days_avg_second, days_med_second, days_stdev_second = calculate_summary(days_values_second)
    n_sma_avg_second, n_sma_med_second, n_sma_stdev_second = calculate_summary(n_sma_values_second)

    # Print the rows for Average, Median, and StDev with the calculated values
    print("\nPerformance Summary:")
    print(f"{'Metric':<8} {'R^2':>8.5} {'':>10} {'Days':>8.5} {'':>10} {'n_SMA':>8.5}")
    # Print the rows for Average, Median, and StDev with the best and second-best values
    print(f"{'Average':<8} {r2_avg:>8.5f} ({r2_avg_second:>8.5f}) {days_avg:>8.5f} ({days_avg_second:>8.5f}) {n_sma_avg:>8.5f} ({n_sma_avg_second:>8.5f})")
    print(f"{'Median':<8} {r2_med:>8.5f} ({r2_med_second:>8.5f}) {days_med:>8.5f} ({days_med_second:>8.5f}) {n_sma_med:>8.5f} ({n_sma_med_second:>8.5f})")
    print(f"{'StDev':<8} {r2_stdev:>8.5f} ({r2_stdev_second:>8.5f}) {days_stdev:>8.5f} ({days_stdev_second:>8.5f}) {n_sma_stdev:>8.5f} ({n_sma_stdev_second:>8.5f})")

In [175]:
print_comparison3(best, secondbest)

Ticker        R^2                 Days       n_SMA      n_future
----------------------------------------------------------------
COST      0.14959 (   0.14944)      9 (  9)     18 ( 17)     20 ( 20)

Performance Summary:
Metric        R^2                Days               n_SMA               n_fut
Average   0.14959 ( 0.14944)  9.00000 ( 9.00000) 18.00000 (17.00000) 20.00000 (20.00000)
Median    0.14959 ( 0.14944)  9.00000 ( 9.00000) 18.00000 (17.00000) 20.00000 (20.00000)
StDev     0.00000 ( 0.00000)  0.00000 ( 0.00000)  0.00000 ( 0.00000)  0.00000 ( 0.00000)


In [70]:
def LR_runs(df, n_days):    
    df.reset_index(drop=True, inplace=True)
        
    # Initialize X and Y
    X = []
    Y = []
    
    # Populate X and Y
    # n = max(n_days, n_sma)
#     print(df[['Adj Close','SMA']].head())
#     for i in range(n_days*n_sma, n_days*n_sma+5):
    for i in range(n_days, len(df)):
        #print("i: ", i)
        X.append(df[['Log Returns']].iloc[i-n_days:i].values.flatten())
        Y.append(df['Log Returns'].iloc[i])
#     print(f"len(X)= {len(X)}, X[{i}]: {X}")
#     print(f"len(Y)= {len(Y)}, Y[{i}]: {Y}")
#     print()
#     print(df['SMA'].head(10))

    # split data
    test_size = int(len(X) * 0.2)

    # Training set
    X_train = X[:-test_size]
    y_train = Y[:-test_size]

    # Testing set
    X_test = X[-test_size:]
    y_test = Y[-test_size:]

    # Fit the model on the training data
    model = LinearRegression().fit(X_train, y_train)
    # normalize = False (input data)
    # copy_x = True (overwrite input variables)
    # n_jobs = None (number of parallelism. -1 uses all available processors)

    r_sq = model.score(X_test, y_test)
    print(f"coefficient of determination: {r_sq}")
    print(f"intercept: {model.intercept_}")
    print(f"slope: {model.coef_}")
    return r_sq

In [71]:
days_list = [1,2,3,4,5,6]
for days in days_list:
    print(f"days: {days}")
    coefficient = LR_runs(df, days)
        

days: 1
coefficient of determination: 0.01554317486169876
intercept: 0.0017577578569271442
slope: [-0.12973506]
days: 2
coefficient of determination: 0.01598784011673471
intercept: 0.001760925670991291
slope: [ 0.01261604 -0.12768755]
days: 3
coefficient of determination: 0.017555901723623757
intercept: 0.0017996204465841287
slope: [-0.01348888  0.01197283 -0.12686698]
days: 4
coefficient of determination: 0.01764086609300375
intercept: 0.0017772875517949468
slope: [-0.00013494 -0.01355695  0.01355852 -0.12546436]
days: 5
coefficient of determination: 0.007790938205178843
intercept: 0.0016414388738599202
slope: [ 0.0603223   0.0078162  -0.01271823  0.01555989 -0.12674168]
days: 6
coefficient of determination: 0.009006850546967038
intercept: 0.0017045035362672253
slope: [-0.06040442  0.05210872  0.01240043 -0.01027712  0.01396719 -0.12562007]


In [156]:
lisst = list(range(10))
print(lisst[::2])

[0, 2, 4, 6, 8]
