In [None]:
import re
from modules.Twitter import Twitter
import logging
import coloredlogs
import time
import os
import pandas as pd
from modules.Models import *
from modules.NewsHeadLine import NewsHeadLine
import yfinance
from datetime import datetime

In [None]:
logging.basicConfig(filename=f'{os.getcwd()}/output/logs/{time.strftime("%m-%d-%Y %I-%M%p")}.log',
                    encoding='utf-8',
                    level=logging.DEBUG,
                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
                    datefmt='%m/%d/%Y %I:%M:%S %p')
coloredlogs.install(fmt='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
                    datefmt='%m/%d/%Y %I:%M:%S %p',
                    level="debug")
logging.getLogger('matplotlib').setLevel(logging.WARNING)

In [None]:
# --- model building --- #

import pandas as pd
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.feature_extraction.text import CountVectorizer
from nltk.sentiment import SentimentIntensityAnalyzer
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
import nltk
from nltk.stem.snowball import SnowballStemmer
import re
from sklearn import svm
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import train_test_split

In [None]:
# model helper functions

def prediction_to_str(prediction):
    '''_summary_

    Args:
        prediction (_type_): _description_

    Returns:
        _type_: _description_
    '''
    if isinstance(prediction, list):
        prediction = int(prediction[0])
    Logger.debug("Prediction %s ", prediction)
    if prediction == 0:
        return "NEUTRAL"
    elif prediction == 1:
        return "POSITIVE"
    elif prediction == -1:
        return "NEGATIVE"
    else:
        return "N/A"
def prediction_to_color(prediction):
    if prediction == 0:
        return "yellow"
    elif prediction == 1:
        return "green"
    elif prediction == -1:
        return "red"
    else:
        return "blue"
    
def model_analysis(Y_test, prediction):
    '''Display model metrics

    Args:
        prediction (_type_): _description_
    '''
    accuracy = accuracy_score(Y_test, prediction)
    Logger.debug('Model accuracy score\n%s', accuracy)

    report = classification_report(Y_test, prediction)
    Logger.debug('Classification report\n%s', report)

In [None]:
training_data_path = os.getcwd() + "/input/training_data.csv"
dataframe = pd.read_csv(training_data_path, sep=';', names=['ID', 'Ticker', 'Date', 'Text', 'Sentiment'])

In [None]:
dataframe.head()

In [None]:
dataframe.tail()

In [None]:
dataframe.info()

In [None]:
dataframe.Sentiment.value_counts()

In [None]:
dataframe.Sentiment.value_counts().plot(kind="pie",autopct="%1.0f%%")

In [None]:
dataframe.isnull().sum()

In [None]:
dataframe.Ticker.value_counts().plot(kind="pie",autopct="%1.0f%%")

In [None]:
dataframe = dataframe.dropna()
dataframe.Text = dataframe['Text'].apply(Preprocess.clean_text)
dataframe['Sentiment'] = dataframe['Sentiment'].astype(int)

In [None]:
X = dataframe['Text'].values
Y = dataframe['Sentiment'].values
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.3, random_state=42)

In [None]:
vector = TfidfVectorizer()
vector.fit(X_train)

X_train = vector.transform(X_train)
X_test = vector.transform(X_test)

In [None]:
# Model selection; cross validation and hyper parameter tuning

model_params = {
    'svm': {
        'model': svm.LinearSVC(max_iter=10000),
        'params': {
            'C': [1, 10, 20],
        }
    },
    'random_forest': {
        'model': RandomForestClassifier(),
        'params': {
            'n_estimators': [1, 5, 10]
        }
    },
    'logistic_regression': {
        'model': LogisticRegression(max_iter=10000),
        'params': {
            'C': [1, 5, 10]
        }
    },
    'gradient_boost':{
        'model': GradientBoostingClassifier(),
        'params':{
            'n_estimators': [1,5,10]
        }
    }

}
scores = []
for model_name, mp in model_params.items():
    clf = GridSearchCV(mp['model'], mp['params'],
                       cv=5, return_train_score=False)
    clf.fit(X_train, Y_train)
    scores.append({
        'model': model_name,
        'best_score': clf.best_score_,
        'best_params': clf.best_params_
    })
model_selection = pd.DataFrame(scores, columns=['model', 'best_score', 'best_params'])
model_selection

In [None]:
model = svm.LinearSVC(max_iter=1000, C=1)
model.fit(X_train, Y_train)
model_str = "Linear Support vector Machine"
predictions = model.predict(X_test)

In [None]:
print(classification_report(Y_test, predictions))

In [None]:
print(confusion_matrix(Y_test, predictions))

In [None]:
#print(confusion_matrix(Y_test, predictions))
sns.heatmap(confusion_matrix(Y_test, predictions), annot=True, fmt='g')

In [None]:
def classify(data):
    data_vector = vector.transform([data])
    prediction = model.predict(data_vector)
    Logger.debug("Classification using %s for '%s' is %s",
                 model_str,
                 data,
                 prediction_to_str(prediction))

    Logger.debug("Classification using sentiment analyzer for '%s' is %s",
                 data,
                 SENTIMENT_ANALYZER.polarity_scores(data))

    return prediction
classify("AMAZON is the worst")

In [None]:
def diff_seconds(d1, d2):
    d1 = datetime.strptime(d1, "%Y-%m-%d %H:%M:%S")
    d2 = datetime.strptime(d2, "%Y-%m-%d %H:%M:%S")
    return abs((d2 - d1).seconds)
def is_same_day(d1, d2):
    d1 = datetime.strptime(d1, "%Y-%m-%d %H:%M:%S")
    d2 = datetime.strptime(d2, "%Y-%m-%d %H:%M:%S")
    return abs((d2 - d1).days) == 0
def get_financial_data(tickers, period, interval):
    return yfinance.download(tickers=tickers, period=period, interval=interval)

In [None]:
import matplotlib.pyplot as plt
import json
import time
import ast

plt.rcParams["figure.figsize"] = (7.5,13)
plt.rcParams["figure.autolayout"] = True
plt.rcParams['font.size'] = 12

# plot financial data

financial_data = get_financial_data("TSLA", "3d", "30m")

datetimes = []
pricing = []

for key,value in json.loads(financial_data.Open.to_json(date_format='iso')).items():
    parsed = key.split("T")
    timestamp = parsed[0] + " " + parsed[1].split(".")[0]
    datetimes.append(timestamp)
    pricing.append(value)
    
plt.plot(datetimes, pricing, color="green")
plt.xticks(datetimes, rotation=80)
plt.ylabel("Stock price")
plt.xlabel("Date timestamp")
plt.title(f"Price fluctuation of Tesla ($TSLA)")
plt.show()

# plot sentiment
data = {}
for currdate in datetimes:
    data[currdate] = {1: 0, -1: 0, 0: 0}
for row in ast.literal_eval(dataframe[dataframe['Ticker'].str.contains("TSLA", na=False)].to_json(orient = 'records', date_format='iso')):
    post_date = row["Date"]
    sentiment = int(row["Sentiment"])
    
    for key,val in data.items():
        if is_same_day(post_date, key) and divmod(diff_seconds(post_date, key),60)[0] <= 15:
            val[sentiment] += 1

for i in range(-1, 2):
    x_arr = []
    y_arr = []
    for key,val in data.items():
        x_arr.append(key)
        y_arr.append(val[i])
    plt.plot(x_arr,y_arr,label=f"{prediction_to_str(i)}", color=prediction_to_color(i))
plt.xticks(datetimes, rotation=80)
plt.ylabel("Amount of posts")
plt.xlabel("Date timestamp")
plt.title(f"Online posts related to $TSLA")
plt.legend()
plt.show()



In [None]:
import matplotlib.pyplot as plt
import json
import time
import ast

plt.rcParams["figure.figsize"] = (7.5,13)
plt.rcParams["figure.autolayout"] = True
plt.rcParams['font.size'] = 12

# plot financial data

financial_data = get_financial_data("TSLA", "7d", "30m")

datetimes = []
pricing = []

for key,value in json.loads(financial_data.Open.to_json(date_format='iso')).items():
    parsed = key.split("T")
    timestamp = parsed[0] + " " + parsed[1].split(".")[0]
    datetimes.append(timestamp)
    pricing.append(value)
    
plt.plot(datetimes, pricing, color="green")
plt.xticks(datetimes, rotation=80)
plt.ylabel("Stock price")
plt.xlabel("Date timestamp")
plt.title(f"Price fluctuation of Tesla ($TSLA)")
plt.show()

# plot sentiment
data = {}
for currdate in datetimes:
    data[currdate] = {1: 0, -1: 0, 0: 0}
for row in ast.literal_eval(dataframe[dataframe['Ticker'].str.contains("TSLA", na=False)].to_json(orient = 'records', date_format='iso')):
    post_date = row["Date"]
    sentiment = int(row["Sentiment"])
    
    for key,val in data.items():
        if is_same_day(post_date, key) and divmod(diff_seconds(post_date, key),60)[0] <= 15:
            val[sentiment] += 1

for i in range(-1, 2):
    x_arr = []
    y_arr = []
    for key,val in data.items():
        x_arr.append(key)
        y_arr.append(val[i])
    plt.plot(x_arr,y_arr,label=f"{prediction_to_str(i)}", color=prediction_to_color(i))
plt.xticks(datetimes, rotation=80)
plt.ylabel("Amount of posts")
plt.xlabel("Date timestamp")
plt.title(f"Online posts related to $TSLA")
plt.legend()
plt.show()

