In [None]:
import json
import smtplib
from email.mime.text import MIMEText

from kafka import KafkaConsumer
from pyspark.ml.feature import VectorAssembler, MinMaxScalerModel
from pyspark.ml.regression import LinearRegressionModel
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date
from pyspark.sql.functions import date_format

In [None]:
spark = SparkSession.builder.getOrCreate()

HDFS_PATH = 'hdfs://10.84.129.52:9000/trab/g05'
TICKERS = ['AAPL', 'MSFT', 'GOOG', 'AMZN', 'V']

consumer = KafkaConsumer(
    'g05in',
    bootstrap_servers='10.204.131.11:9092',
    value_deserializer=lambda v: json.loads(v.decode('utf-8'))  # reads the message as JSON
)

# Load the saved models
models = {ticker: LinearRegressionModel.load(f'{HDFS_PATH}/models/{ticker}/{ticker}_model') for ticker in TICKERS}
scalers = {ticker: MinMaxScalerModel.load(f'{HDFS_PATH}/models/{ticker}/{ticker}_scaler') for ticker in TICKERS}

In [None]:
def preprocess_real_time_data(ticker, data):
    # Use the respective MinMaxScaler model
    scaler_model = scalers[ticker]

    # Transform the data into the format expected by the model
    features = [data['Low'], data['Open'], data['Volume'], data['High'], data['Close']]
    df = spark.createDataFrame([features], ["Low", "Open", "Volume", "High", "Close"])

    # Add 'Date' column with current date and format it to 'dd-MM-yyyy'
    df = df.withColumn("Date", current_date())
    df = df.withColumn("Date", date_format(df["Date"], 'dd-MM-yyyy'))

    # Reorder the columns
    df = df.select(['Date', 'Low', 'Open', 'Volume', 'High', 'Close'])

    # Create a new feature: difference between 'Close' and 'Open'
    df = df.withColumn('Close_Open_Diff', df['Close'] - df['Open'])

    # Assemble the features into a feature vector
    assembler = VectorAssembler(
        inputCols=["Low", "Open", "Volume", "High", "Close_Open_Diff"],
        outputCol="features"
    )
    df = assembler.transform(df)

    # Normalize the features with MinMaxScaler
    scaled_df = scaler_model.transform(df)

    return scaled_df

In [None]:
def send_email(subject, body):
    to="pytest571@gmail.com"
    gmail_user="pytest571@gmail.com"
    gmail_pwd="srva zsjz tjkk yvgq"
    msg = MIMEText(body)
    msg['Subject'] = subject
    msg['From'] = gmail_user
    msg['To'] = to

    server = smtplib.SMTP('smtp.gmail.com', 587)
    server.starttls()
    server.login(gmail_user, gmail_pwd)
    server.send_message(msg)
    server.quit()

In [None]:
try:
    # Consume messages from Kafka
    for message in consumer:
        # Convert the JSON string to a dictionary
        message_dict = json.loads(message.value)

        # Iterate over the messages
        for ticker, data in message_dict.items():
            # Preprocess the real-time data
            preprocessed_features_df = preprocess_real_time_data(ticker, data)

            # Use the model to make a prediction
            prediction = models[ticker].transform(preprocessed_features_df).select("prediction").first()[0]

            # Compare the predicted value with the real-time value
            real_time_value = data['Close']
            difference = real_time_value - prediction

            # Calculate the percentage difference
            percentage_difference = abs(difference / real_time_value) * 100

            # If the percentage difference is 10% or more, send an email
            if percentage_difference >= 10:
                subject = f"Alert: Buy/Sell opportunity for {ticker} stocks"
                body = f"For {ticker}, the real-time value is {real_time_value}, the predicted value is {prediction}, and the difference is {difference} ({percentage_difference}%)."
                send_email(subject, body)

            # Print the result
            print(
                f"For {ticker}, the real-time value is {real_time_value}, the predicted value is {prediction}, and the difference is {difference} ({percentage_difference}%).")

except KeyboardInterrupt:
    consumer.close()
    spark.stop()