In [7]:
import kagglehub
import pandas as pd
import json
import time
import uuid
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dense
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from kafka import KafkaProducer, KafkaConsumer,KafkaAdminClient
from datetime import datetime, timezone
from kafka.admin import NewTopic
from kafka.errors import KafkaError, NoBrokersAvailable

In [9]:
#data generation
path = kagglehub.dataset_download("hershyandrew/amzn-dpz-btc-ntfx-adjusted-may-2013may2019")
df = pd.read_csv(f"{path}/portfolio_data.csv")

In [10]:
#predict: How can we attempt to predict future stock behavior?

In [13]:
#closing value of the stock
df = df.melt(id_vars=["Date"], var_name="Stock", value_name="Close")
df = df.sort_values("Date")
df["Date"] = pd.to_datetime(df["Date"], errors='coerce')
df = df.head(100)

In [88]:
# admin_client = KafkaAdminClient(bootstrap_servers=KAFKA_BROKER)
# try:
#     admin_client.delete_topics([INPUT_TOPIC, OUTPUT_TOPIC])
#     time.sleep(2)
# except Exception as e:
#     print(f"Warning: Unable to delete topics. They may not exist. {e}")

#This was implemented to clear out the previously created topicx

In [17]:
df.head()

Unnamed: 0,Date,Stock,Close
4736,2014-01-10,NFLX,47.44857
176,2014-01-10,AMZN,397.660004
1696,2014-01-10,DPZ,67.339592
3216,2014-01-10,BTC,957.76001
5491,2017-01-10,NFLX,129.889999


In [19]:
df.size

300

In [21]:
df.dropna(inplace=True)  # Drop any invalid datetime values

In [23]:
KAFKA_BROKER = "localhost:9092"
INPUT_TOPIC = "stock-datas"
OUTPUT_TOPIC = "predictions"

Producer

In [72]:
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKER,
    value_serializer=lambda v: json.dumps(v).encode("utf-8")
)

In [19]:
print("Sending stock data to Kafka...")
for _, row in df.iterrows():
    message = {
        "id": str(uuid.uuid4()),
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "stock_symbol": row["Stock"],
        "date": str(row["Date"].date()),  # Ensure Date is string format
        "close": float(row["Close"])
    }
    producer.send(INPUT_TOPIC, value=message)
    print(f"Sent: {message}")
    time.sleep(0.1)

Sending stock data to Kafka...
Sent: {'id': '42e43cf9-0562-4871-9997-2dfaec35108b', 'timestamp': '2025-03-16T14:54:46.598597+00:00', 'stock_symbol': 'NFLX', 'date': '2014-01-10', 'close': 47.44857}
Sent: {'id': '145d064f-f52f-473e-af91-25d6a4581f50', 'timestamp': '2025-03-16T14:54:46.922135+00:00', 'stock_symbol': 'AMZN', 'date': '2014-01-10', 'close': 397.660004}
Sent: {'id': 'ae526efc-15d9-4061-94f0-97ce62799817', 'timestamp': '2025-03-16T14:54:47.023287+00:00', 'stock_symbol': 'DPZ', 'date': '2014-01-10', 'close': 67.339592}
Sent: {'id': 'bea754a9-5ba9-4fa4-bbfa-05a50f0a6391', 'timestamp': '2025-03-16T14:54:47.124498+00:00', 'stock_symbol': 'BTC', 'date': '2014-01-10', 'close': 957.76001}
Sent: {'id': '122b25f7-a4b8-4315-8006-a296ad9857dc', 'timestamp': '2025-03-16T14:54:47.226607+00:00', 'stock_symbol': 'NFLX', 'date': '2017-01-10', 'close': 129.889999}
Sent: {'id': '2b5e653c-12d0-4b5c-9f68-94a6a5e2dcc6', 'timestamp': '2025-03-16T14:54:47.327757+00:00', 'stock_symbol': 'AMZN', 'dat

Consumer

In [25]:
consumer = KafkaConsumer(
    INPUT_TOPIC,
    bootstrap_servers=KAFKA_BROKER,
    value_deserializer=lambda m: json.loads(m.decode("utf-8")),
    auto_offset_reset="earliest"
)

In [27]:
#listening to the producer topic

In [31]:
def kafka_consumer():
    print("Listening for stock data...")
    data_list = []
    for message in consumer:
        data = message.value
        data_list.append([data["date"], data["stock_symbol"], data["close"]])
        print(f"Received: {data}")
        if len(data_list) >= 100:
            consumer.close()
            return pd.DataFrame(data_list, columns=["Date", "Stock", "Close"])
    return data_list

df = kafka_consumer()

Listening for stock data...
Received: {'id': '42e43cf9-0562-4871-9997-2dfaec35108b', 'timestamp': '2025-03-16T14:54:46.598597+00:00', 'stock_symbol': 'NFLX', 'date': '2014-01-10', 'close': 47.44857}
Received: {'id': '145d064f-f52f-473e-af91-25d6a4581f50', 'timestamp': '2025-03-16T14:54:46.922135+00:00', 'stock_symbol': 'AMZN', 'date': '2014-01-10', 'close': 397.660004}
Received: {'id': 'ae526efc-15d9-4061-94f0-97ce62799817', 'timestamp': '2025-03-16T14:54:47.023287+00:00', 'stock_symbol': 'DPZ', 'date': '2014-01-10', 'close': 67.339592}
Received: {'id': 'bea754a9-5ba9-4fa4-bbfa-05a50f0a6391', 'timestamp': '2025-03-16T14:54:47.124498+00:00', 'stock_symbol': 'BTC', 'date': '2014-01-10', 'close': 957.76001}
Received: {'id': '122b25f7-a4b8-4315-8006-a296ad9857dc', 'timestamp': '2025-03-16T14:54:47.226607+00:00', 'stock_symbol': 'NFLX', 'date': '2017-01-10', 'close': 129.889999}
Received: {'id': '2b5e653c-12d0-4b5c-9f68-94a6a5e2dcc6', 'timestamp': '2025-03-16T14:54:47.327757+00:00', 'stock_

Creating and Training the model

In [33]:
df["Date"] = pd.to_datetime(df["Date"], errors='coerce')  # Ensure correct datetime conversion
df.dropna(subset=["Date"], inplace=True)  # Drop invalid datetime values
df = df.sort_values("Date")
scaler = MinMaxScaler(feature_range=(0, 1))
df["Close"] = scaler.fit_transform(df[["Close"]])

In [35]:
#Training the model

In [54]:
def create_sequences(data, time_steps=5):
    X, y = [], []
    for i in range(len(data) - time_steps):
        X.append(data[i : i + time_steps])
        y.append(data[i + time_steps])
    return np.array(X), np.array(y)

time_steps = 5
X_train, y_train = create_sequences(df["Close"].values, time_steps)
X_train = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))

print(len(X_train))

model = Sequential([
    LSTM(50, return_sequences=True, input_shape=(time_steps, 1)),
    LSTM(50, return_sequences=False),
    Dense(25),
    Dense(1)
])

model.compile(optimizer="adam", loss="mean_squared_error")
print("Training model...")
model.fit(X_train, y_train, epochs=10, batch_size=16)
model.save("stock_prediction_model.h5")
print("Model trained and saved!")

95
Training model...
Epoch 1/10


  super().__init__(**kwargs)


[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 6ms/step - loss: 0.0487
Epoch 2/10
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - loss: 0.0320 
Epoch 3/10
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - loss: 0.0308 
Epoch 4/10
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - loss: 0.0219 
Epoch 5/10
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step - loss: 0.0265 
Epoch 6/10
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step - loss: 0.0213 
Epoch 7/10
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - loss: 0.0419 
Epoch 8/10
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - loss: 0.0229 
Epoch 9/10
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - loss: 0.0233 
Epoch 10/10
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - loss: 0.0185 




Model trained and saved!


In [56]:
PREDICTION_TOPIC = "predicted-value"

In [90]:
def kafka_producer():
    seen_predictions = set()
    total_messages = 0
    total_latency = 0.0
    print("Sending stock predictions to Kafka...")
    for i in range(len(X_train)):
        input_seq = X_train[i].reshape(1, time_steps, 1)
        start_time = time.time()
        predicted_price = float(scaler.inverse_transform(model.predict(input_seq).reshape(-1, 1))[0][0])
        latency = time.time() - start_time
        total_latency += latency
        total_messages += 1
        message = {
            "id": str(uuid.uuid4()),
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "stock_symbol": df.iloc[i + time_steps]["Stock"],
            "date": str(df.iloc[i + time_steps]["Date"].date()),
            "original_price": float(scaler.inverse_transform([[df.iloc[i + time_steps]["Close"]]])[0][0]),
            "predicted_price": predicted_price,
            "latency": round(latency, 6)
        }
        message_key = (message["stock_symbol"], message["date"])
        if message_key not in seen_predictions:
            seen_predictions.add(message_key)
            producer.send(PREDICTION_TOPIC, value=message)
            print(f"Sent Prediction: {message}")
            time.sleep(0.1)
    print(f"Total messages processed: {total_messages}")
    print(f"Average Prediction Latency: {total_latency / total_messages:.6f} seconds")

In [92]:
kafka_producer()

Sending stock predictions to Kafka...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 37ms/step
Sent Prediction: {'id': '4b75845f-ccf8-4026-85d4-276b254c01a5', 'timestamp': '2025-03-16T15:21:03.270408+00:00', 'stock_symbol': 'BTC', 'date': '2014-01-13', 'original_price': 922.909973, 'predicted_price': 773.8822021484375, 'latency': 0.074139}
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
Sent Prediction: {'id': '8c6b2cec-f9f6-4259-b2ab-36d0de4fa517', 'timestamp': '2025-03-16T15:21:03.432232+00:00', 'stock_symbol': 'AMZN', 'date': '2014-01-13', 'original_price': 390.980011, 'predicted_price': 809.6038818359375, 'latency': 0.059555}
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step
Sent Prediction: {'id': 'f6d8f8e9-01f3-4990-8edb-4626813d5d5d', 'timestamp': '2025-03-16T15:21:03.593885+00:00', 'stock_symbol': 'NFLX', 'date': '2014-01-13', 'original_price': 48.115715, 'predicted_price': 817.6865234375, 'latency': 0.059483}
[1m1/1