In [None]:
# Standard data manipulation libraries
import pandas as pd
import numpy as np

# Regular expression operations
import re

# Natural Language Toolkit
import nltk
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.corpus import wordnet

# Machine learning libraries
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
import xgboost as xgb
from xgboost import XGBClassifier

# Deep learning libraries
import tensorflow_hub as hub

# Utility libraries
import random

# Balance data sets
from imblearn.over_sampling import SMOTE

# Evolutionary computation
from deap import base, creator, tools, algorithms

# Hypothesis testing
from scipy.stats import norm

In [None]:
# Get posts and articles from ProfitsBot 
csv_path = '/ProfitsBot_V0_OLLM/ds_builder/combined_data.csv'
df = pd.read_csv(csv_path)

# Get USE embeddings
use_url = "https://tfhub.dev/google/universal-sentence-encoder-large/5"
use_model = hub.load(use_url)

In [12]:
# Get average score of token stream
def average_glove_vectors(tokens):
    vectors = [glove_embeddings[token] for token in tokens if token in glove_embeddings]
    if vectors:
        return np.mean(vectors, axis=0)
    else:
        return np.zeros(len(next(iter(glove_embeddings.values()))))



stop_words = set(stopwords.words('english'))
lemmatizer = WordNetLemmatizer()



def lemmatize_tokens(tokens):
    # Standardise words via lemmantisation
    def get_wordnet_pos(word):
        tag = nltk.pos_tag([word])[0][1][0].upper()
        tag_dict = {"J": wordnet.ADJ,
                    "N": wordnet.NOUN,
                    "V": wordnet.VERB,
                    "R": wordnet.ADV}
        return tag_dict.get(tag, wordnet.NOUN)
    
    lemmas = [lemmatizer.lemmatize(token, get_wordnet_pos(token)) for token in tokens]
    
    return lemmas


def preprocess_text(text):
    # Get rid of symbols and links ect
    text = text.lower()
    text = re.sub(r'https?://\S+', '', text)
    text = re.sub(r'[^\w\s]', '', text)
    text = re.sub(r'\d', '', text)
    
    tokens = word_tokenize(text)
    tokens = [word for word in tokens if word not in stop_words]
    tokens = lemmatize_tokens(tokens)
    
    return tokens

# Load GloVe embeddings
glove_path = '/emebeddings/glove.6B.50d.txt'
glove_embeddings = {}
with open(glove_path, encoding='utf-8') as file:
    for line in file:
        values = line.split()
        word = values[0]
        vector = np.asarray(values[1:], dtype='float32')
        glove_embeddings[word] = vector


# Apply preprocessing to title in data frame
df['title'] = df['title'].apply(preprocess_text)    

In [14]:
# TF-IDF initialisation
tfidf_vectorizer = TfidfVectorizer()
X_tfidf = tfidf_vectorizer.fit_transform(df['title'].apply(lambda x: ' '.join(x)))

In [13]:
# Tokens into lists = corpus
corpus = df['title'].apply(lambda x: ' '.join(x)).tolist()

# Function to convert a single text into its GloVe embedding
def text_to_glove_embedding(text):
    embedding = np.zeros(50)  
    for word in text.split():
        if word in glove_embeddings:
            embedding += glove_embeddings[word]
    return embedding

X_glove = np.array([text_to_glove_embedding(text) for text in corpus])

In [15]:
# Function to obtain USE embeddings
def text_to_use_embedding(text):
    embedding = use_model([text])[0].numpy()
    return embedding

X_utf = np.array([text_to_use_embedding(text) for text in corpus])

In [16]:
# Set min fitness (just base fitness) / define individual - list eg - [0,1,1,1,0]
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", list, typecode="d", fitness=creator.FitnessMin, strategy=None)

In [17]:
# Define the classifers
classifiers = {
    'KNN': KNeighborsClassifier(n_neighbors=3),
    'Random Forest': RandomForestClassifier(n_estimators=50, random_state=42),
    'SVM': SVC(kernel='linear', C=1.0, random_state=42,probability=True),
}

# Apply even class distribution via SMOTE
def apply_smote(X, y):
    smote = SMOTE(sampling_strategy='auto', random_state=42)
    X_resampled, y_resampled = smote.fit_resample(X, y)
    return X_resampled, y_resampled


# Parameter grid defined 
alpha_grid = [0.47,0.5,0.55]
embeddings = ['TF-IDF','GloVe','UTF']

# Set to 0 for initial attempt
multi_day = 1 

def eval(individual):

    # Where we store all the probabilities for each day 
    day_hashmap = {}

    param_grid = {
    'KNN': {
        'n_neighbors': [3, 5, 7],
        'weights': ['uniform', 'distance','uniform']
    },
    'Random Forest': {
        'n_estimators': [50, 100, 200],
        'max_depth': [None, 10, 20]
    },
    'SVM': {
        'C': [0.1, 1, 10],
        'kernel': ['linear', 'rbf', 'poly']
    },
    }
    
    # We do MOD 3 in case mutation causes value outside of range
    embedding_method = embeddings[int(individual[1])%3]

    alpha = alpha_grid[int(individual[-1])%3]

    split_index = int(len(df) * 0.8) 

    # Apply different embeddings depending on gene encoding
    if embedding_method == 'TF-IDF':
        X_embedding = X_tfidf
    elif embedding_method == 'GloVe':
        X_embedding = np.array([text_to_glove_embedding(text) for text in corpus])
    elif embedding_method == 'UTF':
        X_embedding = np.array([text_to_use_embedding(text) for text in corpus])


    # Split days 
    X_train = X_embedding[:split_index]
    y_train = df['sell'][:split_index]
    X_test = X_embedding[split_index:]
    y_test = df['sell'][split_index:]
    test_dates = df['Date'][split_index:].to_numpy() 
    multi_dates = sorted(list(set(test_dates)))
    X_train, y_train = apply_smote(X_train, y_train)  

    classifier = list(classifiers.keys())[int(individual[0])%3]
    params = param_grid[classifier]

    threshold = alpha

    # Apply params based of classifier
    if classifier == 'KNN':
        params['n_neighbors'] = params['n_neighbors'][int(individual[2])%3]
        params['weights'] = params['weights'][int(individual[3])%3]
    elif classifier == 'Random Forest':
        params['n_estimators'] = params['n_estimators'][int(individual[2])%3]
        params['max_depth'] = params['max_depth'][int(individual[3])%3]
    elif classifier == 'SVM':
        params['C'] = params['C'][int(individual[2])%3]
        params['kernel'] = params['kernel'][int(individual[3])%3]
    

    model = classifiers[classifier].set_params(**params)
    model.fit(X_train, y_train)


    if multi_day:
        # Predict probabilities for posts in test set 
        probs = model.predict_proba(X_test)

        
        # Get a hashmap of date - probability pairs 
        for i in range(len(probs)):
            if test_dates[i] in day_hashmap:
                day_hashmap[test_dates[i]].append(probs[i][0])
            else:
                day_hashmap[test_dates[i]] = [probs[i][0]]

        # Binary classifier - predicted for each post
        predicted = []
        
        for i in range(0,len(multi_dates)):
            # This is 3 days considered but 2 / 1 days are same intuition 
            today = day_hashmap[multi_dates[i]]
            yday = day_hashmap[multi_dates[max(0,i-1)]]
            yday2 = day_hashmap[multi_dates[max(0,i-2)]]
            multidate_probs = today + yday + yday2
            dateProbs = len(today)
            multilen = len(multidate_probs)

            # Ensemble 
            average = (sum(multidate_probs) / multilen)
            if average >= threshold:
                predicted.extend([0]*dateProbs)
            else:
                predicted.extend([1]*dateProbs)
    else:
        # Initial attempt
        predicted = model.predict(X_test)

    accuracy = accuracy_score(np.array(predicted), y_test)  

    # Fitness = accuracy = sum(predicted - actual)
    fitness_values = (accuracy,)
    return fitness_values

In [9]:
# Accuracy of best model vs linear 
accuracy_1 = 0.538
accuracy_2 = 0.5

# Size of the test set 
n_1 = n_2 = 9544

# Pooled sample proportion
p_hat = (accuracy_1 * n_1 + accuracy_2 * n_2) / (n_1 + n_2)

# Test statistic
z = (accuracy_1 - accuracy_2) / np.sqrt(p_hat * (1 - p_hat) * (1/n_1 + 1/n_2))

p_value = 1 - norm.cdf(z)

# Print results
print("Test Statistic (z):", z)
print("P-value:", p_value)

# Significance level
alpha = 0.05

# Make decision
if p_value < alpha:
    print("Reject null hypothesis: The first algorithm is statistically significantly better.")
else:
    print("Fail to reject null hypothesis: There is not enough evidence to suggest a difference.")


5.25384912533967
7.447645733460462e-08
Test Statistic (z): 5.25384912533967
P-value: 7.447645733460462e-08
Reject null hypothesis: The first algorithm is statistically significantly better.


In [None]:
def get_monthly_returns():

    day_hashmap = {}

    # Just for 2022 - as in test set 
    start_date = '2022-01-01'  
    end_date = '2022-12-31'   

    # Filter out any that are not in date range (2022)
    filtered_df = df[(df['Date'] >= start_date) & (df['Date'] <= end_date)]
    unique_dates_df = filtered_df.drop_duplicates(subset=['Date'])

    # Filter the hashmap
    filtered_hashmap = {date: value for date, value in day_hashmap.items() if date.startswith('2022')}
    date_sell_hashmap = {pd.to_datetime(key): value for key, value in filtered_hashmap.items()}
    unique_dates_df['Date'] = pd.to_datetime(unique_dates_df['Date'])

    # Check accuracy
    def is_sell_correct(row):
        date = row['Date']
        sell_value = row['sell']
        return date in date_sell_hashmap and date_sell_hashmap[date] == sell_value

    # Ignore replicated dates as all predictions in a day will be the same 
    unique_dates_df['is_sell_correct'] = unique_dates_df.apply(is_sell_correct, axis=1)
    print(unique_dates_df)
    monthly_correct_percentages = unique_dates_df.groupby(unique_dates_df['Date'].dt.to_period('M'))['is_sell_correct'].mean() * 100
    print(monthly_correct_percentages)

In [None]:
get_monthly_returns()

In [None]:
def profitability():
    csv_path = '../Market Model/Daily_Stock_Report/StocksBTC-GBP.csv'
    df2 = pd.read_csv(csv_path)
    day_hashmap = {}
    # 2022 filter
    start_date = '2022-01-01'  
    end_date = '2022-12-31'    
    filtered_df = df2[(df2['Date'] >= start_date) & (df2['Date'] <= end_date)]
    filtered_hashmap = {date: value for date, value in day_hashmap.items() if date.startswith('2022')}
    date_sell_hashmap = {pd.to_datetime(key): value for key, value in filtered_hashmap.items()}
    # Initial investment 
    initial_amount = 1000  
    # Current investment amount
    current_amount = initial_amount  
    # Percentage of current amount to buy
    buy_percentage = 1  
    stock_prices = list(filtered_df['Close'])
    buy_sell_metric = list(date_sell_hashmap.values())

    # Iterate through each day
    for i in range(1, len(stock_prices)):
        percentage_change = (stock_prices[i] - stock_prices[i-1]) / stock_prices[i-1] * 100
        current_amount *= (1 + percentage_change/100)
        print(current_amount)
        # Check buy/sell metric for the day
        if buy_sell_metric[i] == 0:
            # Calculate amount to buy
            buy_amount = current_amount * (buy_percentage / 100)
            # Update current amount
            current_amount += buy_amount
        elif buy_sell_metric[i] == 1:
            # Dont do anything if sell
            pass

    # Calculate final amount
    final_amount = current_amount
    print("Final amount:", final_amount)

In [None]:
profitability()

In [None]:
def xgBoost(X_train,y_train,X_test,y_test):
    dtrain = xgb.DMatrix(X_train, label=y_train)
    dtest = xgb.DMatrix(X_test, label=y_test)

    params = {
    # For binary classification
    'objective': 'binary:logistic',  
    'max_depth': 3,
    'learning_rate': 0.1,
    'n_estimators': 100,
    # Use log loss as the evaluation metric
    'eval_metric': 'logloss'  
    }

    model = XGBClassifier(**params)
    model.fit(X_train, y_train)

    model = xgb.train(params, dtrain, num_boost_round=10)

    predictions = model.predict(dtest)

    accuracy = accuracy_score(y_test, predictions.round())
    print(f'Accuracy: {accuracy}')

In [None]:
for embedding_method in embeddings:
    
    # Run all embeddings on XgBoost

    if embedding_method == 'TF-IDF':
        X_embedding = X_tfidf
    elif embedding_method == 'GloVe':
        X_embedding = np.array([text_to_glove_embedding(text) for text in corpus])
    elif embedding_method == 'UTF':
        X_embedding = np.array([text_to_use_embedding(text) for text in corpus])


    X_train, X_test, y_train, y_test = train_test_split(X_embedding, df['sell'], test_size=0.33, random_state=42)

    xgBoost(X_train, X_test, y_train, y_test)

In [18]:
def create_individual():
    individual = []
    individual.append(random.randint(0, 2))  # classifier
    individual.append(random.randint(0, 2))  # embedding
    individual.append(random.randint(0, 2))  # param1
    individual.append(random.randint(0, 2))  # param2
    individual.append(random.randint(0, 2))  # alpha
    return individual

# Set up the Genetic Algorithm
toolbox = base.Toolbox()
toolbox.register("individual", tools.initIterate, creator.Individual, create_individual)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("evaluate", eval)
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutGaussian, mu=0, sigma=1, indpb=0.2)
toolbox.register("select", tools.selTournament, tournsize=3)

In [None]:
if __name__ == "__main__":
    
    # Set up the population and evolve
    hof = tools.HallOfFame(maxsize=5)
    population = toolbox.population(n=10)
    algorithms.eaMuPlusLambda(population, toolbox, mu=10, lambda_=20, cxpb=0.7, mutpb=0.2, ngen=10, stats=None, halloffame=None, verbose=True)

    # Get the best individual from the final population
    best_individual = tools.selBest(population, k=1)[0]
    print("Best Individual:", best_individual)
    print("Best Accuracy:", 1 / best_individual.fitness.values[0])