# Project 5 : Predict markets stocks of Google, Facebook & Amazon.

## MAP670G - Data Stream (2021-2022)
<blockquote> 
Alexandre PERBET<br>
Cyril NERIN<br>
Hugo RIALAN<br>
</blockquote>

__PART 2 : ON-LINE MACHINE LEARNING WITH RIVER AND KAFKA__

<br>
<img src="Google_Facebook_Amazon.PNG" width="600" height="600">
<br>

__Project 5 : Collect trading data using Yahoo finance API and use online regression to predict markets stocks of Google, Facebook & Amazon.__

__Option 2 :__ For each of these 5 countries, use 1 major industry stock data
For ex, in US Google,  in France BNP Paribas, in China Alibaba, in Russia or England, use a major international industry.
This option was initially given in the project.

For each option, each group should use at least 3 different data streams, with online and adaptive regression on RIVER (such as https://riverml.xyz/latest/api/tree/HoeffdingAdaptiveTreeRegressor/) and compare the performances with batch regression model (scikit-learn).

__ToDo:__ Compare online Regression vs Batch Regression and discuss the performance.

__Bonus :__ Use recent stock market data (from January to March 2022).

__Online resources:__ 
You can use the Python library to collect Yahoo Finance data in streaming https://pypi.org/project/yfinance/
You can compute time-series statistics and moving averages (MACD) for features engineering https://www.statsmodels.org/stable/tsa.html


In [12]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
%load_ext autoreload
%autoreload 2
%matplotlib inline

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## Libraries

In [13]:
!pip3 install --quiet -r requirements.txt

In [14]:
import sys 
print("Python version: {}". format(sys.version))

import IPython
from IPython import display
from IPython.display import Markdown, display
print("IPython version: {}". format(IPython.__version__))

import numpy as np
print("NumPy version: {}". format(np.__version__))

import scipy as sp
from scipy import stats
print("SciPy version: {}". format(sp.__version__))

import pandas as pd
print("pandas version: {}". format(pd.__version__))

import matplotlib
import matplotlib.pyplot as plt
print("matplotlib version: {}". format(matplotlib.__version__))

import seaborn as sns
print("seaborn version : {}". format(sns.__version__))
sns.set()

import kafka
from kafka.admin import KafkaAdminClient, NewTopic
from kafka import KafkaProducer
from kafka import KafkaConsumer
print("kafka version : {}". format(kafka.__version__))

import yfinance as yf
print("yfinance version : {}". format(yf.__version__))

import statsmodels
print("statsmodels version : {}". format(statsmodels.__version__))
import statsmodels.api as sm
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.tsa.stattools import adfuller, kpss

import sklearn
from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error, mean_absolute_error, r2_score
print("scikit-learn version : {}". format(sklearn.__version__))

import river
from river import base
from river import compose
from river import metrics
from river import preprocessing
from river import evaluate
from river import tree
from river import ensemble
from river import linear_model
from river import stats
from river import optim
print("river version : {}". format(river.__version__))

import ipywidgets as widgets
print("ipywidgets version : {}". format(widgets.__version__))

import time
from time import perf_counter
from datetime import datetime, date, timedelta

from enum import Enum, auto

import math

import pprint

import json

import urllib.request

import warnings
warnings.filterwarnings('ignore')

print("====================================")

Python version: 3.9.7 (default, Sep 16 2021, 16:59:28) [MSC v.1916 64 bit (AMD64)]
IPython version: 7.27.0
NumPy version: 1.20.3
SciPy version: 1.7.1
pandas version: 1.3.2
matplotlib version: 3.4.2
seaborn version : 0.11.2
kafka version : 2.0.2
yfinance version : 0.1.70
statsmodels version : 0.13.2
scikit-learn version : 1.0.1
river version : 0.9.0
ipywidgets version : 7.6.5


## Utility functions

In [15]:
def printmd(text, couleur=None):
    """
    Printing of the string text by choosing the style (for example bold) and the color (couleur)
    """
    colorstr = "<span style='color:{}'>{}</span>".format(couleur, text)
    display(Markdown(colorstr))

## Launching the servers
<p><b>LAUNCHING ZOOKEEPER AND KAFKA SERVER ON WINDOWS</b></p>
<p> In a <b>first terminal</b>, run the following commands:
<p>cd %KAFKA_DIR%
<p>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
<p>  </p>
<p> In a <b>second terminal</b>, run the following commands:
<p>cd %KAFKA_DIR%
<p>.\bin\windows\kafka-server-start.bat .\config\server.properties
<p>  -- </p>
<p>we assume that Zookeeper is running default on localhost:2181 and Kafka on localhost:9092.</p>

# Processing of stock market data in Kafka

![Archi_KAFKA_2.PNG](attachment:Archi_KAFKA_2.PNG)

__We create a Kafka topic for each company and stream the stock data retrieved with yfinance into these topics.__

__In another notebook, we will retrieve this stock market data to apply a Machine Learning model with RIVER.__

__In the last notebook, we plot the prediction results from the data stored in the Kafka predicts topics__

## stock market data identifiers

In [16]:
All_the_companies = ["google",
                     "facebook",
                     "amazon",
                     "total",
                     "gazprom",
                     "alibaba",
                     "bnp_paribas",
                     "BP",
                     "ferrari"]

## Notebook settings

In [17]:
trace_on_line_ML = True
Nb_to_print = 500
pp = pprint.PrettyPrinter()

## Creation of topics if needed

In [18]:
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092", client_id="Project_DataStream")

topic_name = {}
topic_predict_name = {}

for company in All_the_companies:
    topic_name[company] = company
    topic_predict_name[company] = "predict__{}".format(company)

    # The stock data
    if topic_name[company] not in admin_client.list_topics():
        topic_list = []
        topic_list.append(NewTopic(name=topic_name[company], num_partitions=1, replication_factor=1))
        admin_client.create_topics(new_topics=topic_list, validate_only=False)

    # The predict results
    if topic_predict_name[company] not in admin_client.list_topics():
        topic_list = []
        topic_list.append(NewTopic(name=topic_predict_name[company], num_partitions=1, replication_factor=1))
        admin_client.create_topics(new_topics=topic_list, validate_only=False)

# Stock market prediction using On Line regression

## Select the company

In [19]:
No_Company = 0              # value from 0 to 8

The_selected_company = All_the_companies[No_Company]
print("The selected company is {}".format(The_selected_company))

The selected company is google


## RIVER Regression model

In [20]:
def get_timestamp_date(x):
    return {'timestamp_date': x}

model = compose.Pipeline(
    ('timestamp_date', compose.FuncTransformer(get_timestamp_date)),    
    ('scale', preprocessing.StandardScaler()),
    ('lin_reg', linear_model.LinearRegression(intercept_lr=0, optimizer=optim.SGD(0.03)))
)

model = preprocessing.TargetStandardScaler(regressor=model)

model

TargetStandardScaler (
  regressor=Pipeline (
    steps=OrderedDict([('timestamp_date', FuncTransformer (
  func="get_timestamp_date"
)), ('scale', StandardScaler (
  with_std=True
)), ('lin_reg', LinearRegression (
  optimizer=SGD (
    lr=Constant (
      learning_rate=0.03
    )
  )
  loss=Squared ()
  l2=0.
  intercept_init=0.
  intercept_lr=Constant (
    learning_rate=0
  )
  clip_gradient=1e+12
  initializer=Zeros ()
))])
  )
)

## Kafka Consumer : get stock market data and run On Line Machine Learning

In [21]:
def evaluate_model(model, The_selected_company, The_value_type="Close"): 
    
    if trace_on_line_ML:
        print("-----  ON-LINE MACHINE LEARNING FOR {} ----".format(The_selected_company))

    metric = metrics.Rolling(metrics.MAE(), 7)
    topic_name = The_selected_company
    topic_predict_name = "predict__{}".format(The_selected_company)
    consumer_group_name = "{}_on_line_ML".format(The_selected_company)

    consumer = KafkaConsumer(topic_name,
                             bootstrap_servers='localhost:9092',
                             group_id=consumer_group_name)

    producer = KafkaProducer(bootstrap_servers="localhost:9092")

    stock_market = {}
    nb_msg = 0
    # The first prediction is far from the target. It is ignored to make the results plot more readable.
    result_to_store = False
    
    # Init time counter
    temps = perf_counter()
    
    try:
        # Infinite loop : the consumer is waiting for message 
        for message in consumer:
            stock_market = json.loads(message.value.decode())
            x = stock_market["date"]
            y = stock_market[The_value_type]

            # There may be some missing data
            if not math.isnan(y):
                nb_msg += 1

                # Obtain the prior prediction and update the model in one go
                y_pred = model.predict_one(x)
                model.learn_one(x, y)
            
                # Update the error metric
                MAE = metric.update(y, y_pred).get()
                
                if result_to_store:
                    # The true value and the prediction 
                    # The MAE and the CPU time are sent to the topic <predict>
                    predict_result = {}
                    predict_result["date"] = x
                    predict_result["y_true"] = y
                    predict_result["y_pred"] = y_pred
                    predict_result["CPU_time"] = perf_counter() - temps
                    predict_result["MAE"] = MAE           
                    producer.send(topic_predict_name, json.dumps(predict_result).encode())
                else:
                    result_to_store = True

                if nb_msg == 5:
                    if trace_on_line_ML:
                        print("")
                        pp.pprint(predict_result)
                        print("")

                if nb_msg % Nb_to_print == 0:
                    if trace_on_line_ML:
                        print("{} - {} prediction values sent to the Kafka topic {}" \
                                .format(time.strftime("%d/%m/%Y %H:%M:%S"),
                                        nb_msg,
                                        topic_predict_name))
    except KeyboardInterrupt:
        if trace_on_line_ML:
            print ("\n-------  END OF ON-LINE MACHINE LEARNING  -------")
    except Exception as e:
        print("An arror has occurred")
        print(e)

In [22]:
evaluate_model(model, The_selected_company)

-----  ON-LINE MACHINE LEARNING FOR google ----

{'CPU_time': 0.6017079000002923,
 'MAE': 359.52136674721476,
 'date': 1609785000000,
 'y_pred': 1726.6958917122881,
 'y_true': 1724.39501953125}

27/03/2022 15:01:28 - 500 prediction values sent to the Kafka topic predict__google
27/03/2022 15:01:28 - 1000 prediction values sent to the Kafka topic predict__google
27/03/2022 15:01:28 - 1500 prediction values sent to the Kafka topic predict__google
27/03/2022 15:01:28 - 2000 prediction values sent to the Kafka topic predict__google

-------  END OF ON-LINE MACHINE LEARNING  -------
