In [1]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
!pip install pyspark mlflow

Collecting mlflow
  Downloading mlflow-3.4.0-py3-none-any.whl.metadata (30 kB)
Collecting mlflow-skinny==3.4.0 (from mlflow)
  Downloading mlflow_skinny-3.4.0-py3-none-any.whl.metadata (31 kB)
Collecting mlflow-tracing==3.4.0 (from mlflow)
  Downloading mlflow_tracing-3.4.0-py3-none-any.whl.metadata (19 kB)
Collecting docker<8,>=4.0.0 (from mlflow)
  Downloading docker-7.1.0-py3-none-any.whl.metadata (3.8 kB)
Collecting fastmcp<3,>=2.0.0 (from mlflow)
  Downloading fastmcp-2.12.3-py3-none-any.whl.metadata (17 kB)
Collecting graphene<4 (from mlflow)
  Downloading graphene-3.4.3-py2.py3-none-any.whl.metadata (6.9 kB)
Collecting gunicorn<24 (from mlflow)
  Downloading gunicorn-23.0.0-py3-none-any.whl.metadata (4.4 kB)
Collecting databricks-sdk<1,>=0.20.0 (from mlflow-skinny==3.4.0->mlflow)
  Downloading databricks_sdk-0.66.0-py3-none-any.whl.metadata (39 kB)
Collecting opentelemetry-proto<3,>=1.9.0 (from mlflow-skinny==3.4.0->mlflow)
  Downloading opentelemetry_proto-1.37.0-py3-none-any.w

In [2]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("bittlingmayer/amazonreviews")

print("Path to dataset files:", path)

Downloading from https://www.kaggle.com/api/v1/datasets/download/bittlingmayer/amazonreviews?dataset_version_number=7...


100%|██████████| 493M/493M [00:16<00:00, 31.4MB/s]

Extracting files...





Path to dataset files: /root/.cache/kagglehub/datasets/bittlingmayer/amazonreviews/versions/7


In [3]:
!cp /root/.cache/kagglehub/datasets/bittlingmayer/amazonreviews/versions/7/train.ft.txt.bz2 /content/train.ft.txt.bz2

In [4]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# Initialize Spark
spark = SparkSession.builder.appName("AmazonSentiment").getOrCreate()

# Load and parse the data (assuming file is at /content/train.ft.txt.bz2)
df = spark.read.option("delimiter", " ").csv("/content/train.ft.txt.bz2")
df = df.withColumn("label", F.when(F.col("_c0") == "__label__2", 1).otherwise(0)) \
       .withColumn("text", F.col("_c1")) \
       .select("label", "text")

df.show(5, truncate=50)

+-----+---------+
|label|     text|
+-----+---------+
|    1|  Stuning|
|    1|      The|
|    1|Amazing!:|
|    1|Excellent|
|    1|Remember,|
+-----+---------+
only showing top 5 rows



In [5]:
import mlflow
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Set the experiment name
mlflow.set_experiment("Amazon Review Sentiment Analysis")

2025/09/24 13:20:54 INFO mlflow.tracking.fluent: Experiment with name 'Amazon Review Sentiment Analysis' does not exist. Creating a new experiment.


<Experiment: artifact_location='file:///content/mlruns/576981139912121076', creation_time=1758720054097, experiment_id='576981139912121076', last_update_time=1758720054097, lifecycle_stage='active', name='Amazon Review Sentiment Analysis', tags={}>

In [8]:

from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
import pyspark.sql.functions as F

# === For faster development, work with a smaller sample ===
# df_sampled = df.sample(fraction=0.05, seed=42) # Use 5% of the data
# (train_data, test_data) = df_sampled.randomSplit([0.8, 0.2], seed=42)

# Using the full dataset
(train_data, test_data) = df.randomSplit([0.8, 0.2], seed=42)

# --- SOLUTION 1: CACHE THE DATA ---
train_data.cache()
test_data.cache()

# Define pipeline stages
tokenizer = Tokenizer(inputCol="text", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
hashingTF = HashingTF(inputCol="filtered_words", outputCol="rawFeatures", numFeatures=20000)
idf = IDF(inputCol="rawFeatures", outputCol="features")
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Create the full pipeline
full_pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, lr])

with mlflow.start_run():
    mlflow.log_param("regParam", lr.getRegParam())
    mlflow.log_param("maxIter", lr.getMaxIter())

    # Train the model
    print("Fitting the model...")
    model = full_pipeline.fit(train_data)

    # Make predictions
    print("Transforming test data...")
    predictions = model.transform(test_data)

    # --- SOLUTION 1: CACHE PREDICTIONS ---
    predictions.cache()
    print("Predictions cached.")

    # --- LOG METRICS ---
    print("Evaluating metrics...")
    evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label")
    auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

    # --- SOLUTION 2: EFFICIENT ACCURACY CALCULATION ---
    # Cast boolean to integer (true=1, false=0) and take the average
    accuracy_df = predictions.withColumn('correct', F.when(F.col('label') == F.col('prediction'), 1).otherwise(0))
    accuracy = accuracy_df.select(F.avg('correct')).first()[0]

    mlflow.log_metric("AUC", auc)
    mlflow.log_metric("Accuracy", accuracy)

    # Unpersist the cached dataframes to free up memory
    train_data.unpersist()
    test_data.unpersist()
    predictions.unpersist()

    # --- LOG MODEL ---
    print("Logging model...")
    mlflow.spark.log_model(model, "spark-lr-model")

    print(f"Model logged! AUC: {auc:.4f}, Accuracy: {accuracy:.4f}")

Fitting the model...
Transforming test data...
Predictions cached.
Evaluating metrics...
Logging model...




Model logged! AUC: 0.7775, Accuracy: 0.6995


In [7]:
import mlflow
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    Tokenizer, StopWordsRemover, HashingTF, IDF, NGram, VectorAssembler
)
# Make sure to import the model you are actually using
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# --- Use a small data sample for testing ---
# Ensure df and spark are defined from your earlier cells
df_sampled = df.sample(fraction=0.01, seed=42)
(train_data, test_data) = df_sampled.randomSplit([0.8, 0.2], seed=42)

# --- 1. Define Feature Engineering Stages ---
tokenizer = Tokenizer(inputCol="text", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
ngram = NGram(n=2, inputCol="filtered_words", outputCol="ngrams")
hashingTF_words = HashingTF(inputCol="filtered_words", outputCol="hashed_words", numFeatures=5000)
hashingTF_ngrams = HashingTF(inputCol="ngrams", outputCol="hashed_ngrams", numFeatures=5000)
assembler = VectorAssembler(inputCols=["hashed_words", "hashed_ngrams"], outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")

# --- 2. Define the Model (use the fast one for testing) ---
lr = LogisticRegression(featuresCol="features", labelCol="label")

# --- 3. Assemble the Full Pipeline (define it only ONCE) ---
pipeline = Pipeline(stages=[
    tokenizer, remover, ngram,
    hashingTF_words, hashingTF_ngrams,
    assembler, idf,
    lr  # <-- Use the faster model in the pipeline
])

# --- 4. Set up Hyperparameter Tuning ---
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.01])  # Add a second option to see tuning work
             .addGrid(lr.maxIter, [10])
             .build())

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label", metricName="areaUnderROC")

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=2)

# --- 5. Run Training and Log with MLflow ---
with mlflow.start_run():
    print("Starting cross-validation...")
    cvModel = crossval.fit(train_data)
    print("Cross-validation complete.")

    best_model = cvModel.bestModel
    predictions = best_model.transform(test_data)

    auc = evaluator.evaluate(predictions)
    accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count())

    print(f"Best Model Found! AUC: {auc:.4f}, Accuracy: {accuracy:.4f}")

    # --- FIX: Log the CORRECT parameters for the LogisticRegression model ---
    best_lr_model = best_model.stages[-1]
    mlflow.log_param("best_regParam", best_lr_model.getRegParam())
    mlflow.log_param("best_maxIter", best_lr_model.getMaxIter())

    mlflow.log_metric("AUC", auc)
    mlflow.log_metric("Accuracy", accuracy)

    input_example = test_data.limit(5).toPandas()
    mlflow.spark.log_model(
        best_model,
        "spark-lr-cv-model",
        input_example=input_example
    )

Starting cross-validation...


KeyboardInterrupt: 

In [12]:
# Install the pyngrok library
!pip install pyngrok

# Kill any existing mlflow and ngrok processes to start fresh
!killall mlflow
!killall ngrok

from pyngrok import ngrok
import os

# Terminate open tunnels if any exist
ngrok.kill()

# Set your ngrok authtoken (optional but recommended, get one from ngrok.com)
ngrok.set_auth_token("338tbSnswQ3CUcZHfDrk8s0z4ra_7me8sxm24gMs6nV73Txon")

# Set the MLFLOW_TRACKING_URI so MLflow knows where to store experiments
# This will create an mlruns directory in your Colab environment
os.environ["MLFLOW_TRACKING_URI"] = "mlruns"

# Run MLflow UI in the background
get_ipython().system_raw("mlflow ui --port 5000 &")

# Create a public URL to the local port 5000
public_url = ngrok.connect(5000)
print("✅ MLflow UI is running. Access it at:")
print(public_url)





✅ MLflow UI is running. Access it at:
NgrokTunnel: "https://terisa-carriable-colourably.ngrok-free.dev" -> "http://localhost:5000"


In [11]:
import mlflow
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, Bidirectional, LSTM, Dense, TextVectorization
import numpy as np

# --- 1. Use Spark to prepare the data and convert to Pandas ---
# We are still using the sampled data for speed in Colab
df_sampled = df.sample(fraction=0.02, seed=42)
(train_df_spark, test_df_spark) = df_sampled.randomSplit([0.8, 0.2], seed=42)

# Convert the Spark DataFrames to Pandas for Keras
train_pdf = train_df_spark.toPandas()
test_pdf = test_df_spark.toPandas()

# Extract text and labels as NumPy arrays
X_train = train_pdf['text'].to_numpy()
y_train = train_pdf['label'].to_numpy()
X_test = test_pdf['text'].to_numpy()
y_test = test_pdf['label'].to_numpy()

# --- 2. Set up MLflow for automatic logging ---
mlflow.set_experiment("Amazon Review Sentiment (BiLSTM)")
# autolog() will automatically track metrics, parameters, and the model
mlflow.tensorflow.autolog()

# --- 3. Build the Keras BiLSTM Model ---
# Keras's TextVectorization layer handles tokenization, integers, and padding
max_features = 10000  # Max vocabulary size
sequence_length = 250   # Max length of a review to consider

vectorize_layer = TextVectorization(
    max_tokens=max_features,
    output_mode='int',
    output_sequence_length=sequence_length)

# Adapt the layer to our training text
vectorize_layer.adapt(X_train)

# Define the model architecture
model = Sequential([
    # Input layer that converts text to integer sequences
    vectorize_layer,
    # Embedding layer that learns vector representations for each word
    Embedding(max_features + 1, 128),
    # The Bidirectional LSTM layer
    Bidirectional(LSTM(64)),
    # Final output layer
    Dense(1, activation='sigmoid')
])

model.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=['accuracy'])

# --- 4. Train the Model (inside an mlflow run) ---
with mlflow.start_run():
    print("Starting BiLSTM model training...")

    model.fit(X_train, y_train,
              epochs=3,
              validation_data=(X_test, y_test),
              batch_size=32)

    print("\nTraining complete.")

2025/09/24 13:39:23 INFO mlflow.tracking.fluent: Experiment with name 'Amazon Review Sentiment (BiLSTM)' does not exist. Creating a new experiment.


Starting BiLSTM model training...


Epoch 1/3
[1m1811/1811[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step - accuracy: 0.6784 - loss: 0.5631



[1m1811/1811[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m48s[0m 24ms/step - accuracy: 0.6784 - loss: 0.5631 - val_accuracy: 0.7289 - val_loss: 0.5147
Epoch 2/3
[1m1811/1811[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m42s[0m 23ms/step - accuracy: 0.7916 - loss: 0.4531 - val_accuracy: 0.7302 - val_loss: 0.5280
Epoch 3/3
[1m1811/1811[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m82s[0m 23ms/step - accuracy: 0.7940 - loss: 0.4191 - val_accuracy: 0.7294 - val_loss: 0.5417
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 248ms/step





Training complete.
