In [2]:
#%%capture
#!wget "https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.5.0/spark-sql-kafka-0-10_2.12-3.5.0.jar"
#!wget "https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10_2.12/3.5.0/spark-streaming-kafka-0-10_2.12-3.5.0.jar"

In [3]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'

In [None]:
!pip install river

In [None]:
import pandas as pd
import os
import pickle
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, IntegerType, DoubleType
from river import linear_model, feature_extraction, compose

# --- Configuration ---
# File path to save the state of the evolving River model
MODEL_STATE_PATH = "/tmp/river_sentiment_model_text.pkl" 
# ---

spark = SparkSession.builder.appName("RiverTextStream").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

In [None]:
def get_or_create_model():
    """Initializes a new River pipeline or loads the existing state."""
    if os.path.exists(MODEL_STATE_PATH):
        print("Loading existing River model state...")
        with open(MODEL_STATE_PATH, 'rb') as f:
            return pickle.load(f)
    else:
        print("Initializing new River Logistic Regression Pipeline...")
        
        # 1. Feature Extraction: BagOfWords (BoW)
        # BagOfWords counts tokens. River is smart and uses a sparse, unbounded vocabulary.
        text_vectorizer = feature_extraction.BagOfWords(
            lowercase=True, 
            ngram_range=(1, 2) # Use unigrams and bigrams
        )
        
        # 2. Classifier: Logistic Regression
        # River's Logistic Regression is optimized for single-instance updates (online learning)
        classifier = linear_model.LogisticRegression()

        # 3. Build the online Pipeline: (Transformer | Estimator)
        model = compose.Pipeline(
            ('vectorizer', text_vectorizer),
            ('lr', classifier)
        )
        return model

In [None]:
def save_model(model):
    """Saves the current state of the River model."""
    try:
        with open(MODEL_STATE_PATH, 'wb') as f:
            pickle.dump(model, f)
        print(f"Model state saved to {MODEL_STATE_PATH}")
    except Exception as e:
        print(f"Error saving model state: {e}")

In [None]:
def process_batch_with_online_ml(micro_batch_df: pd.DataFrame):
    """
    Function executed per micro-batch. Loads the model, iterates row-by-row 
    for prediction and incremental learning, then saves the updated model.
    """
    # 1. Load the model state (must be done in the executor process)
    model = get_or_create_model()
    
    predictions = []
    
    # Iterate through the micro-batch row by row (The online part)
    # The dataframe must contain 'text' and 'label' columns
    for index, row in micro_batch_df.iterrows():
        text_sample = row['text']
        true_label = int(row['label']) # Ensure label is an integer for classification

        # River's input for text pipelines is simply the string/sentence
        # ðŸ’¡ NOTE: River handles the feature conversion internally in the pipeline.
        
        # A. PREDICTION
        # predict_one returns the predicted class (e.g., 0 or 1)
        # We use a try-except because the model might not have coefficients yet 
        # (especially in the very first batch)
        try:
            y_pred = model.predict_one(text_sample)
        except Exception:
            y_pred = -1 # Default prediction if model is too new/unstable

        predictions.append(y_pred)
        
        # B. TRAINING/LEARNING (The incremental update)
        # The model updates its internal state based on the true label
        model.learn_one(text_sample, true_label)

    # 2. Save the updated model state (Crucial for the next batch)
    save_model(model)
    
    # 3. Return the DataFrame with the new prediction column
    micro_batch_df['prediction'] = predictions
    return micro_batch_df[['text', 'label', 'prediction']]


In [None]:
def process_batch_mixed_mode(micro_batch_df: pd.DataFrame):
    """Handles both training and prediction based on the presence of a 'label'."""
    
    model = get_or_create_model() # Load the current model state
    
    predictions = []
    
    for index, row in micro_batch_df.iterrows():
        text_sample = row['text']
        true_label = row['label'] # This is the key field
        
        # 1. Prediction (Always runs first)
        y_pred = model.predict_one(text_sample)
        predictions.append(y_pred)
        
        # 2. Conditional Training (Only runs if a label is present)
        # Check if the label is NOT null (or not NaN/None in Pandas)
        if pd.notna(true_label):
            # Convert label to integer and update the model
            model.learn_one(text_sample, int(true_label))
            
    save_model(model) # Save the state after processing the entire batch
    
    # Return results including the raw prediction and the input label
    micro_batch_df['raw_prediction'] = predictions
    return micro_batch_df[['text', 'label', 'raw_prediction']]  

In [None]:

def apply_online_learning(micro_batch_df, batch_id):
    
    """The main foreachBatch entry point."""
    print(f"--- Processing Batch ID: {batch_id} ---")
    
    # Convert Spark DataFrame to Pandas DataFrame
    pandas_df = micro_batch_df.toPandas()

    if pandas_df.empty:
        print("Empty batch, skipping ML update.")
        return
        
    # Process and learn from the batch
    # result_df = process_batch_with_online_ml(pandas_df)
    result_df = process_batch_mixed_mode(pandas_df)

    # Write results back to a sink (Console in this case)
    spark_result_df = spark.createDataFrame(result_df)
    
    spark_result_df.show(truncate=False)

In [None]:

kafka_schema = StructType([
    StructField("text", StringType(), True),
    StructField("label", IntegerType(), True) # 0 or 1 for binary classification
])

# 1. Read Stream from Kafka
raw_stream = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "10.224.177.98:8097") \
  .option("subscribe", "input") \
  .option("startingOffsets", "latest") \
  .load()

# 2. Parse the JSON value into structured columns
parsed_stream = raw_stream.select(
    from_json(col("value").cast("string"), kafka_schema).alias("data")
).select("data.*")

# 3. Apply foreachBatch for Online Learning
query = parsed_stream.writeStream \
    .outputMode("update") \
    .foreachBatch(apply_online_learning) \
    .option("checkpointLocation", "/tmp/spark/river_checkpoint") \
    .start()

print("PySpark Stream started. Send data to 'sentiment-learning-topic'...")

query.awaitTermination()

In [38]:
for q in spark.streams.active:
    q.stop()