In [None]:
from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel
from kafka import KafkaConsumer
import json

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("KafkaStreamProcessing") \
    .getOrCreate()

# Load the saved trained ML model
model = PipelineModel.load('trained_model')

# Kafka consumer settings
topic_name = 'your_topic_name'
bootstrap_servers = 'localhost:9092'
consumer_group_id = 'your_consumer_group_id'

# Create Kafka consumer
consumer = KafkaConsumer(topic_name,
                         group_id=consumer_group_id,
                         bootstrap_servers=bootstrap_servers,
                         value_deserializer=lambda x: json.loads(x.decode('utf-8')))

# Output file path
output_file = 'output_predictions.txt'

# Function to predict output and write to file
def predict_and_write(df):
    # Perform prediction
    predictions = model.transform(df)
    
    # Write predictions to file
    predictions.select("input_data", "prediction").write.mode("append").csv(output_file, header=False)

# Listen to Kafka topic and predict output
for message in consumer:
    # Get data from Kafka message
    data_dict = message.value
    data_list = [(data_dict['feature1'], data_dict['feature2'], ..., data_dict['featureN'])]
    df = spark.createDataFrame(data_list, schema=model.stages[0].transformedSchema)
    
    # Predict output and write to file
    predict_and_write(df)
