In [1]:
#TO RUN THIS NOTEBOOK YOU NEED TO HAVE STARTED Zookeeper and Kafka
#OPEN 2 GIT BASH CONSOLES AND RUN ()
# context/kafka/bin/zookeeper-server-start.sh context/kafka/config/zookeeper.properties
# context/kafka/bin/kafka-server-start.sh context/kafka/config/server.properties

In [2]:
from kafka import KafkaProducer
from cassandra.cluster import Cluster
import json
import time
from statsmodels.tsa.arima.model import ARIMA
import pandas as pd

#################################################################################################
# OPEN 2 GIT BASH CONSOLES AND RUN ()
# context/kafka/bin/zookeeper-server-start.sh context/kafka/config/zookeeper.properties
# context/kafka/bin/kafka-server-start.sh context/kafka/config/server.properties
#################################################################################################

# Set up Kafka producer
producer = KafkaProducer(bootstrap_servers='localhost:9092',
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

# Set up Cassandra connection
cluster = Cluster(['localhost'], port=9042)
session = cluster.connect()
session.set_keyspace('dmsb_tfm')

# Initialize variable to store the timestamp or identifier of the last processed row

try:
    while True:
        # Execute Cassandra query
        query = "SELECT * FROM configuration LIMIT 1"
        result = session.execute(query)
        # Retrieve values from the first row
        for row in result:
            symbol = row.symbol
            candle_number = row.candle_number
            ARIMA_p = row.arima_p
            ARIMA_d = row.arima_d
            ARIMA_q = row.arima_q

        query = "SELECT * FROM candlesticks LIMIT 5000"
        result = session.execute(query)
        rows = [row._asdict() for row in result]

        # This is the df from our table. It is ordered by event_time as designed in cassandra
        df_candlesticks = pd.DataFrame(rows)
        df_candlesticks = df_candlesticks.sort_values(by='start_time', ascending=True)

        # We get the time used for the prediction:
        df_prediction_start_time = df_candlesticks['start_time']
        df_prediction_symbol = df_candlesticks['symbol']

        prediction_start_time = df_prediction_start_time.iloc[-1]
        prediction_symbol = df_prediction_symbol.iloc[-1]

        # Filter only the ended candles to permorm our model
        df_kline_closed = df_candlesticks[df_candlesticks['kline_closed'] == True]

        # Used to limit the amount of rows applied to ARIMA
        df_kline_closed = df_kline_closed.tail(candle_number)

        # Create our each df
        df_close = df_kline_closed[['start_time', 'close']]
        df_open = df_kline_closed[['start_time', 'open']]
        df_high = df_kline_closed[['start_time', 'high']]
        df_low = df_kline_closed[['start_time', 'low']]

        # Create the ARIMA model
        close_model = ARIMA(df_close['close'].tolist(), order=(ARIMA_p, ARIMA_d, ARIMA_q))
        open_model = ARIMA(df_open['open'].tolist(), order=(ARIMA_p, ARIMA_d, ARIMA_q))
        high_model = ARIMA(df_high['high'].tolist(), order=(ARIMA_p, ARIMA_d, ARIMA_q))
        low_model = ARIMA(df_low['low'].tolist(), order=(ARIMA_p, ARIMA_d, ARIMA_q))

        # Fit the model
        close_model_fit = close_model.fit()
        open_model_fit = open_model.fit()
        high_model_fit = high_model.fit()
        low_model_fit = low_model.fit()

        # Get our predictions.
        close_prediction = close_model_fit.forecast()[0]
        open_prediction = open_model_fit.forecast()[0]
        high_prediction = high_model_fit.forecast()[0]
        low_prediction = low_model_fit.forecast()[0]

        # Send data to Kafka topic
        # Convert row to a dictionary (or any format suitable for your use case)
        message = {'symbol':prediction_symbol,'start_time': str(prediction_start_time), 'open': open_prediction,
                    'high': high_prediction, 'low': low_prediction, 'close': close_prediction}

        # Send the message to Kafka topic
        
        try:
            producer.send('candlestickStream', value=message)
            # print(message)
        except Exception as e:
            print(f'Error sending message: {e}')

        # Flush messages to ensure they are sent immediately
        producer.flush()

except KeyboardInterrupt:
    # Close connections if the program is terminated by a keyboard interrupt (Ctrl+C)
    producer.close()
    session.shutdown()
    cluster.shutdown()
    print('All connections have been closed')
