Definde a logger method to log the messages to another file.

In [1]:
import os

def logger(input_string: str) -> None:
    file_path = "output/log.txt"

    if not os.path.exists(file_path):
        with open(file_path, "w") as file:
            file.write(input_string + "\n")
    else:
        with open(file_path, "a") as file:
            file.write(input_string + "\n")
    print("Logger message: " + input_string)

Start the Script by adding a message to the logger file.

In [2]:
import pandas as pd

time_start = pd.Timestamp.now()

logger("\n\n_________________________________________________________________")
logger("New script run: " + str(time_start))

Logger message: 

_________________________________________________________________
Logger message: New script run: 2024-06-05 23:42:25.213920


Read in the raw data and convert it to pandas dataframe.

In [3]:
data = pd.read_csv("data_raw.csv")

Convert first column to datetime format.

In [4]:
data["date"] = pd.to_datetime(data[data.columns[0]], format="%Y%m%d")
data.drop(data.columns[0], axis=1, inplace=True)
data.sort_values(by='date', inplace=True)
data.reset_index(drop=True, inplace=True)

On weekend the exchange is closed so we need to fill the missing values with linear interpolation

In [5]:
min_date = data["date"].min()
max_date = data["date"].max()
all_dates = pd.date_range(start=min_date, end=max_date)

todo_data = ["date_global_quote","open_price","high_price","low_price","closing_price","volume"]
data_helper = data[todo_data].copy()
#get a list with only the dates we have data for
data_helper["date_global_quote"] = pd.to_datetime(data_helper["date_global_quote"])
data_helper.drop_duplicates(keep='first', inplace=True)

#merge the list of all dates with the data we have
data_with_missing_dates = pd.merge(pd.DataFrame({"date_global_quote": all_dates}), data_helper, on="date_global_quote", how="left")
data_with_missing_dates.sort_values(by="date_global_quote", inplace=True) 
data_with_missing_dates.interpolate(method='linear', inplace=True)

data[todo_data] = data_with_missing_dates[todo_data]

Add the target, which is the percentage the stock price will increase or decrease the next day.

In [6]:
data['mean_price'] = data[['open_price', 'low_price', 'high_price', 'closing_price']].mean(axis=1)

data["next_day_percentage"] = (data["mean_price"].shift(-1) / data["mean_price"] - 1)*100
data.fillna({'next_day_percentage': 0}, inplace=True)

Add features

In [7]:
data['weekday'] = data['date'].dt.dayofweek #add weekday column to the dataframe

data["price_change_1"] = data["mean_price"] / data["mean_price"].shift(1)#calculate the price change last day to current day
data.fillna({'price_change_1': 1}, inplace=True)

data["price_change_3"] = data["mean_price"] / data["mean_price"].shift(3)#calculate the price change 3 rdlast day to current day
data.fillna({'price_change_1': 1}, inplace=True)

Download the needed Finbert model and tokenizer if not already downloaded.

In [8]:
from transformers import BertTokenizer, BertForSequenceClassification, pipeline

finbert = BertForSequenceClassification.from_pretrained('.venv/Transformer/Finbert_Offline',num_labels=3)
tokenizer = BertTokenizer.from_pretrained('.venv/Transformer/Tokenizer_Offline')
nlp = pipeline("sentiment-analysis", model=finbert, tokenizer=tokenizer)


if not os.path.exists('.venv/Transformer'):
    finbert = BertForSequenceClassification.from_pretrained('yiyanghkust/finbert-tone',num_labels=3)
    finbert.save_pretrained('.venv/Transformer/Finbert_Offline')
    tokenizer = BertTokenizer.from_pretrained('yiyanghkust/finbert-tone')
    tokenizer.save_pretrained('.venv/Transformer/Tokenizer_Offline')
    logger("Transfomers were downloaded because no folder was found")
else: logger(".venv/Transformer were found and reused offline")

Logger message: .venv/Transformer were found and reused offline


Finbert function that takes in string and returns the sentiment score.

In [9]:
import ast #for converting string to list
def finbert(input_string):
    # Return 0 immediately if input is "0"
    if input_string == "0":
        return 0

    text_list = ast.literal_eval(input_string)  # Convert string to list
    if not text_list:
        return 0

    scores = []
    for text in text_list:
        # Truncate text to fit tokenization limit
        while len(tokenizer.tokenize(text)) > 500:
            text = text[:-1]

        # Analyze text with finbert
        results = nlp(text)
        for item in results:
            score = item['score']
            label = item['label']
            # Convert label to numerical value and multiply by score
            if label == "Neutral":
                scores.append(0)
            elif label == "Negative":
                scores.append(-1 * score)
            elif label == "Positive":
                scores.append(1 * score)
    score = sum(scores) / len(scores) if scores else 0
    print(str(score) + " --> " + str(text_list))
    return score

In [10]:
import pandas as pd

columns_to_check = ["com_title_finbert", "ceo_title_finbert"]
columns_exist = all(column in data.columns for column in columns_to_check)

if not columns_exist: #if the columns not already exists
    local_time_start = pd.Timestamp.now()
    data_finbert = pd.DataFrame()
    data_finbert[["com_title_finbert", "ceo_title_finbert"]] = data[["com_title_list", "ceo_title_list"]].apply(lambda col: col.map(finbert))
    local_time_end = pd.Timestamp.now()
    time_delta = local_time_end - local_time_start
    logger("Finbert was applied to the data it took "+ str(time_delta))
    
    data[["com_title_finbert","ceo_title_finbert"]] = data_finbert[["com_title_finbert","ceo_title_finbert"]]

    # Read the CSV file
    data_raw = pd.read_csv('data_raw.csv')
    # Add the two columns
    data_raw['com_title_finbert'] = data['com_title_finbert']
    data_raw['ceo_title_finbert'] = data['ceo_title_finbert']
    # Close the file
    data_raw.to_csv('data_raw.csv', index=False)

In [11]:
data.fillna(0.0, inplace=True)
data_no_dates = data.drop(columns=data.filter(like='date').columns)

We only need the parameters, that have several unique values, so we can use them to predict the target. Also we cant use text parameters.

In [12]:
features = []
needed_unique = 5

try:
    for column in data_no_dates.columns[1:]:#sorts the columns in number, text, norchanging
        if pd.to_numeric(data_no_dates[column], errors='coerce').notnull().all():# Check if the column is numeric
            data_no_dates[column] = pd.to_numeric(data_no_dates[column])# Convert to number
            if data_no_dates[column].nunique() >= needed_unique:#check if all calues are the same. if so we dont need them    
                features.append(column)
    data_features = data_no_dates[features]
    logger("Features: " + str(features))
except Exception as e:
    logger("Error: " + str(e))

Logger message: Features: ['ceo_news_amount', 'alpha_news_amount', 'alpha_news_sentiment_mean', 'open_price', 'high_price', 'low_price', 'closing_price', 'volume', 'currency_exchange_rate', 'cpi', 'eps', 'gdp', 'retail_sales', 'market_capitalization', 'pe_ratio', 'peg_ratio', 'eps.1', 'diluted_eps_ttm', 'analyst_target_price', 'trailing_pe', 'forward_pe', 'price_to_sales_ratio_ttm', 'price_to_book_ratio', 'ev_to_revenue', 'ev_to_ebitda', 'beta', 'week_high_52', 'week_low_52', 'day_moving_average_50', 'day_moving_average_200', 'otherCurrentAssets', 'com_title_finbert', 'ceo_title_finbert', 'mean_price', 'next_day_percentage', 'weekday', 'price_change_1', 'price_change_3']


Standardize the data by subtracting the mean and dividing by the standard deviation (Z-Score Normalization)

In [13]:
data_features = data_features.drop(['next_day_percentage'], axis=1)
data_normalized = data_features.sub(data_features.mean(axis=0), axis=1).div((data_features.max(axis=0)-data_features.min(axis=0)), axis=1)

PCA- Principal Component Analysis to reduce the dimensionality of the data.

In [14]:
from sklearn.decomposition import PCA

pca = PCA(n_components='mle',svd_solver = 'full')
data_pca = pca.fit_transform(data_normalized)
#convert to dataframe
data_pca = pd.DataFrame(data_pca)

In [15]:
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt

plt.figure()
for column in data_normalized.columns:
    plt.plot(data['date'], data_normalized[column])
plt.xlabel('Date')
plt.ylabel('Value')
plt.xticks(data['date'][::10], rotation=45)
plt.tight_layout()
plt.savefig('output/Features.pdf', format='pdf')

We split the data in training and testing data.

In [16]:
from sklearn.model_selection import train_test_split
target = data['next_day_percentage'] #use the one from data so its not normalized
features = data_pca
X_train, X_test, y_train, y_test = train_test_split(features, target, test_size=0.3, random_state=42)

Make a list with all the Models and their parameters.

In [17]:
from sklearn.neural_network import MLPRegressor
from sklearn.linear_model import LinearRegression
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.svm import SVR
from sklearn.neighbors import KNeighborsRegressor

models = [
    {
        "name": "Linear Regression",
        "model": LinearRegression(),
        "param_grid": {}
    },
    {
        "name": "Decision Tree",
        "model": DecisionTreeRegressor(),
        "param_grid": {}
    },
    {
        "name": "Random Forest",
        "model": RandomForestRegressor(),
        "param_grid": {
            'n_estimators': [100, 200, 300, 1000],
            'max_features': [None, 'sqrt', 'log2'],
            'max_depth': [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, None],
            'min_samples_split': [2, 5, 10],
            'min_samples_leaf': [1, 2, 4],
            'bootstrap': [True, False]
        }
    },
    {
        "name": "Gradient Boosting",
        "model": GradientBoostingRegressor(),
        "param_grid": {
            'n_estimators': [100,500,1000],
            'max_features': [None, 'sqrt', 'log2'],
            'max_depth': [10,50,100, None],
            'min_samples_split': [2, 5, 10],
            'min_samples_leaf': [1, 2, 4],
            'learning_rate': [0.01,0.1,0.5]
        }
    },
    {
        "name": "KNN",
        "model": KNeighborsRegressor(),
        "param_grid": {
            'n_neighbors': [3, 5, 11, 19],
            'weights': ['uniform', 'distance'],
            'metric': ['euclidean', 'manhattan']
        }
    },
    {
        "name": "Neural Network",
        "model": MLPRegressor(max_iter=10000),
        "param_grid": {
            'hidden_layer_sizes': [(100,), (50, 50), (100, 50, 25)],
            'activation': ['relu', 'tanh'],
            'solver': ['adam', 'lbfgs'],
            'alpha': [0.0001, 0.001, 0.01, 0.1],
            'learning_rate': ['constant', 'adaptive']
        }
     },
    {
        "name": "SVR",
        "model": SVR(),
        "param_grid": {
            'C': [0.1, 1, 10, 100, 1000],
            'gamma': [1, 0.1, 0.01, 0.001, 0.0001],
            'kernel': ['linear', 'rbf']
        }
    }
]

In [18]:
import csv

def save_models_to_csv(mse_list,r2_list,combined_list):
    # Open the modells.csv file in append mode
    with open('output/modells_mse.csv', 'a', newline='') as file:
        # Add the mse_list to the csv file
        csv_writer_mse = csv.writer(file)
        csv_writer_mse.writerow(mse_list)
    
    # Open the modells.csv file in append mode
    with open('output/modells_r2.csv', 'a', newline='') as file:
        # Add the r2_list to the csv file
        csv_writer_r2 = csv.writer(file)
        csv_writer_r2.writerow(r2_list)

    # Open the modells.csv file in append mode
    with open('output/modells_together.csv', 'a', newline='') as file:
        # Add the combined to the csv file
        csv_writer_r2 = csv.writer(file)
        csv_writer_r2.writerow(combined_list)

In [19]:
from sklearn.model_selection import RandomizedSearchCV, KFold
from sklearn.metrics import mean_squared_error
from sklearn.metrics import r2_score

today = pd.Timestamp.now().strftime("%Y-%m-%d")
description = 'nix'#input("Enter a description for this run: ")

logger("Description: " + description)
mse_list = []
r2_list = []
combined_list = []

mse_list.append(today)
mse_list.append(description)
r2_list.append(today)
r2_list.append(description)
combined_list.append(today)
combined_list.append(description)

best_model = None

for model_info in models:#iterate through the models, predict and save the mse and r2
    local_time_start = pd.Timestamp.now()
    name = model_info["name"]
    model = model_info["model"]
    param_grid = model_info["param_grid"]
    
    estimator = model
    kfold = KFold(n_splits=10, random_state=42, shuffle=True)
    random_search = RandomizedSearchCV(estimator=estimator, param_distributions=param_grid, cv=kfold, n_jobs=-1, verbose=0)
    random_search.fit(X_train, y_train)
    model = model.__class__(**random_search.best_params_)

    model.fit(X_train, y_train)
    predictions = model.predict(X_test)
    
    mse = mean_squared_error(y_test, predictions)
    mse_list.append(mse)

    r2 = r2_score(y_test, predictions)
    r2_list.append(r2)

    combined_metric = (1 - r2) * mse
    combined_list.append(combined_metric)
    
    local_time_end = pd.Timestamp.now()
    time_delta = local_time_end - local_time_start
    logger("Model:" + name + ", Mean Squared Error:" + str(mse) + ", R2 Score:" + str(r2) + " Time taken: " + str(time_delta))
    if best_model is None or combined_metric < best_model["score"]:
        best_model = {
            "name": name,
            "model": model,
            "score": combined_metric
        }

save_models_to_csv(mse_list, r2_list, combined_list)
logger("Models saved to modells.csv")


Logger message: Description: nix




Logger message: Model:Linear Regression, Mean Squared Error:4.047445906691022, R2 Score:0.10196776821073694 Time taken: 0 days 00:00:02.508848
Logger message: Model:Decision Tree, Mean Squared Error:5.166542587545772, R2 Score:-0.14633323767412465 Time taken: 0 days 00:00:00.101765




Logger message: Model:Random Forest, Mean Squared Error:4.184735087439878, R2 Score:0.07150655582378496 Time taken: 0 days 00:00:17.376413
Logger message: Model:Gradient Boosting, Mean Squared Error:4.28379560951455, R2 Score:0.04952737592322998 Time taken: 0 days 00:00:37.570438
Logger message: Model:KNN, Mean Squared Error:4.60102196004534, R2 Score:-0.020857625906832933 Time taken: 0 days 00:00:00.170841


STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
  self.n_iter_ = _check_optimize_result("lbfgs", opt_res, self.max_iter)


Logger message: Model:Neural Network, Mean Squared Error:4.218704754521867, R2 Score:0.0639694925384312 Time taken: 0 days 00:01:16.672242
Logger message: Model:SVR, Mean Squared Error:4.68546926188778, R2 Score:-0.03959447889769807 Time taken: 0 days 00:00:07.278463
Logger message: Models saved to modells.csv


Get best model

In [20]:
model = best_model["model"]
score = best_model["score"]
logger("Best model: " + str(model) + " Mean Squared Error: " + str(mse))

Logger message: Best model: LinearRegression() Mean Squared Error: 4.68546926188778


Predict Values with best model

In [21]:
data["next_day_percentage_predicted"] = model.predict(features)

In [22]:
plt.figure()

plt.plot(data['date'], data[["next_day_percentage", "next_day_percentage_predicted"]])
plt.xlabel('Datum')
plt.ylabel('Preisänderung zum nächsten Tag in %')
plt.legend(["Tatsächlich", "Vorhersage"])

plt.xticks(data['date'][::10], rotation=45)
plt.tight_layout()
plt.savefig('output/Predictions.pdf', format="pdf")

In [23]:
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np


def plot_table(benchmark):
    df = pd.read_csv(f'output/modells_{benchmark}.csv')
    df.drop(['Date', 'Description'], axis=1, inplace=True)

    # Calculate mean value between all values of one row
    df['Mean'] = df.mean(axis=1)
    df['Improvement'] = df['Mean'] - df['Mean'].shift(1)

    # Create a colormap and reverse it based on the benchmark for main data
    if benchmark == 'r2':
        cmap = sns.color_palette("coolwarm_r", as_cmap=True)
    else:
        cmap = sns.color_palette("coolwarm", as_cmap=True)
    
    # Prepare 'Improvement' column for white background
    improvement_data = df['Improvement'].copy()
    df['Improvement'] = np.nan  # Set to NaN to mask and use default background (white)

    # Plot the main dataframe
    plt.figure(figsize=(12, 8))
    sns.heatmap(df, annot=True, cmap=cmap, fmt=".2f", cbar=False)
    
    # Add the 'Improvement' column text manually with default background
    for i in range(len(improvement_data)):
        plt.text(df.columns.get_loc('Improvement') + 0.5, i + 0.5, f'{improvement_data.iloc[i]:.2f}',
                 ha='center', va='center')

    plt.title(f"Model Performance {benchmark}")
    plt.xticks(rotation=45)  # Rotate column labels
    plt.tight_layout()
    plt.savefig(f"output/colored_table_{benchmark}.pdf", format="pdf")
    

plot_table('mse')
plot_table('r2')
plot_table('together')


Now we need to find the best threshold of the data to make the best predictions

In [24]:
import optuna
iterations = 1000
stock_value = 500
money_value = 500
fee = 1
sell_percentage = 0.5
data["depot_value"] = 0.0

In [25]:
def optuna_loop(trial, data, stock_value, money_value, sell_percentage):
    buy_threshold = trial.suggest_float('buy_threshold', 0, 10)
    sell_threshold = trial.suggest_float('sell_threshold', -10, 0)

    data["depot_value"] =  broker(stock_value, money_value, buy_threshold, sell_threshold, sell_percentage, data)
    last_depot_value = data['depot_value'].iloc[-1]
    return last_depot_value

def broker(stock_value, money_value, buy_threshold, sell_threshold, sell_percentage, data):
    stock_value = stock_value
    money_value = money_value
    depot_value = 0

    for index, row in data.iterrows():
        stock_value *= row["price_change_1"]
        if row['next_day_percentage_predicted'] >= buy_threshold:
            if money_value > 0:
                money_value -= fee
                stock_value += money_value
                money_value = 0
        elif row['next_day_percentage_predicted'] <= sell_threshold:
                if stock_value > 0:
                    money_value += stock_value * sell_percentage
                    stock_value -= stock_value * sell_percentage
                    money_value -= fee
        depot_value = stock_value + money_value
        data.at[index, 'depot_value'] = depot_value
    return data["depot_value"]

In [26]:
study = optuna.create_study(direction='maximize')
for _ in range(iterations):
    study.optimize(lambda trial: optuna_loop(trial, data, stock_value, money_value, sell_percentage), n_trials=1)

[I 2024-06-05 23:44:54,869] A new study created in memory with name: no-name-7974e21c-476a-4e52-8332-e31e4beb4902
[I 2024-06-05 23:44:54,905] Trial 0 finished with value: 917.9239733197228 and parameters: {'buy_threshold': 8.281297528479483, 'sell_threshold': -4.441919885256782}. Best is trial 0 with value: 917.9239733197228.
[I 2024-06-05 23:44:54,926] Trial 1 finished with value: 917.9239733197228 and parameters: {'buy_threshold': 6.77491167189046, 'sell_threshold': -7.026844304562623}. Best is trial 0 with value: 917.9239733197228.
[I 2024-06-05 23:44:55,152] Trial 2 finished with value: 917.9239733197228 and parameters: {'buy_threshold': 8.953899862291232, 'sell_threshold': -8.042784399999643}. Best is trial 0 with value: 917.9239733197228.
[I 2024-06-05 23:44:55,177] Trial 3 finished with value: 917.9239733197228 and parameters: {'buy_threshold': 8.957853649223932, 'sell_threshold': -7.52121833181458}. Best is trial 0 with value: 917.9239733197228.
[I 2024-06-05 23:44:55,200] Tria

In [27]:
optuna.visualization.plot_parallel_coordinate(study)

In [28]:
optuna.visualization.plot_optimization_history(study)

In [29]:
best_params = study.best_trial.params
buy_threshold = round(best_params['buy_threshold'],3)
sell_threshold = round(best_params['sell_threshold'],3)
stock_value = 1000
money_value = 0
logger("Buy threshold: " + str(buy_threshold) + "%, Sell threshold: " + str(sell_threshold)+"%")

Logger message: Buy threshold: 0.456%, Sell threshold: -0.185%


In [30]:
data["buy_or_sell"] = data["next_day_percentage_predicted"].apply(lambda x: 1 if x >= buy_threshold else (-1 if x <= sell_threshold else 0))
data["depot_value"] = broker(stock_value, money_value, buy_threshold, sell_threshold, sell_percentage, data)

In [31]:
hold_percentage = round(((data['mean_price'].iloc[-1] / data['mean_price'].iloc[0]) - 1) * 100, 2)
spai_percentage = round(((data['depot_value'].iloc[-1] / data['depot_value'].iloc[0]) - 1) * 100, 2)

logger("Hold percentage: " + str(hold_percentage) + "%, SPAI percentage: " + str(spai_percentage)+"%")

Logger message: Hold percentage: -16.42%, SPAI percentage: 31.86%


In [32]:
data['mean_price'] = data['mean_price'] / data['mean_price'].iloc[0]
data['depot_value'] = data['depot_value'] / data['depot_value'].iloc[0]

In [33]:
#get last value of buy_or_sell
today_buy_or_sell = data['buy_or_sell'].iloc[-1]
today = data['date'].iloc[-1]

if today_buy_or_sell == 1:
    print("Buy signal today ", today)
elif today_buy_or_sell == -1:
    print("Sell signal today", today)
elif today_buy_or_sell == 0:
    print("Hold signal today", today)

logger("Signal for today: " + str(today_buy_or_sell) + ", Date: " + str(today))

Hold signal today 2024-05-15 00:00:00
Logger message: Signal for today: 0, Date: 2024-05-15 00:00:00


In [53]:
plt.cla()
plt.clf()
plt.close()
plt.figure(figsize=(10, 6))

plt.plot(data['date'], data["mean_price"], label='Holding', marker='o', color='blue')
plt.plot(data['date'], data["depot_value"], label='Using SPAI', marker='x', color='purple')

buy_label_added = False
sell_label_added = False

for index, row in data.iterrows():
    if row['buy_or_sell'] >= 1:
        if not buy_label_added:
            plt.bar(row['date'], height=0.1, bottom=row["mean_price"] - 0.05, color='green', width=1, alpha=0.5, label ="buy")
            buy_label_added = True
        else:
            plt.bar(row['date'], height=0.1, bottom=row["mean_price"] - 0.05, color='green', width=1, alpha=0.5)
    elif row['buy_or_sell'] <= -1:
        if not sell_label_added:
            plt.bar(row['date'], height=0.1, bottom=row["mean_price"] - 0.05, color='red', width=1, alpha=0.5, label = "sell")
            sell_label_added = True
        else:
            plt.bar(row['date'], height=0.1, bottom=row["mean_price"] - 0.05, color='red', width=1, alpha=0.5)
plt.title(title)
plt.xlabel('Datum')
plt.ylabel('normierter Depotwert')
plt.legend()
plt.xticks(data['date'][::10], rotation=45)
plt.tight_layout()
plt.savefig('output/Simulation.pdf', format='pdf')
plt.show()


In [35]:
#save data to Output folder
data.to_csv('output/data_processed.csv', index=False)

In [36]:
time_end = pd.Timestamp.now()
time_delta = time_end - time_start
logger("Script ended: " + str(time_start))
logger("Script needed: " + str(time_delta))

Logger message: Script ended: 2024-06-05 23:42:25.213920
Logger message: Script needed: 0 days 00:03:06.417175
