In [None]:
!pip install pyspark
!pip install nltk



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Twitter Sentiment Analysis").getOrCreate()

In [None]:
# Loading dataset
df = spark.read.csv('/content/drive/MyDrive/training.1600000.processed.noemoticon.csv', header=False, inferSchema=True)
df = df.selectExpr("_c0 as label", "_c5 as tweet")

df = df.filter((df.label == 0) | (df.label == 4))

from pyspark.sql.functions import when
df = df.withColumn("label", when(df.label == 4, 1).otherwise(0))

df.show(5)

+-----+--------------------+
|label|               tweet|
+-----+--------------------+
|    0|@switchfoot http:...|
|    0|is upset that he ...|
|    0|@Kenichan I dived...|
|    0|my whole body fee...|
|    0|@nationwideclass ...|
+-----+--------------------+
only showing top 5 rows


In [None]:
print("length of dataset is",df.count())
df.columns

length of dataset is 1600000


['label', 'tweet']

In [None]:
from pyspark.sql.functions import col, lower, regexp_replace, trim

def clean_text_df(df, input_col="tweet", output_col="clean_text"):
    df = df.withColumn(output_col, lower(col(input_col)))
    df = df.withColumn(output_col, regexp_replace(output_col, r"http\S+|www\S+", ""))
    df = df.withColumn(output_col, regexp_replace(output_col, r"@\w+", ""))
    df = df.withColumn(output_col, regexp_replace(output_col, r"#", ""))
    df = df.withColumn(output_col, regexp_replace(output_col, r"[^a-zA-Z\s]", ""))
    df = df.withColumn(output_col, regexp_replace(output_col, r"\s+", " "))
    df = df.withColumn(output_col, trim(output_col))
    return df
df = clean_text_df(df)
df.select("tweet", "clean_text").show(5, truncate=False)

+-------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------+
|tweet                                                                                                              |clean_text                                                                                              |
+-------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------+
|@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D|a thats a bummer you shoulda got david carr of third day to do it d                                     |
|is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. B

In [None]:
from pyspark.ml.feature import (
    Tokenizer, StopWordsRemover,VectorAssembler, NGram,
    CountVectorizer, IDF
)

tokenizer = Tokenizer(
    inputCol="clean_text",
    outputCol="tokens"
)

remover = StopWordsRemover(
    inputCol="tokens",
    outputCol="filtered_tokens"
)

unigram_cv = CountVectorizer(
    inputCol="filtered_tokens",
    outputCol="unigram_features",
    vocabSize=20000,
    minDF=5
)

bigram = NGram(
    n=2,
    inputCol="filtered_tokens",
    outputCol="bigrams"
)

bigram_cv = CountVectorizer(
    inputCol="bigrams",
    outputCol="bigram_features",
    vocabSize=20000,
    minDF=5
)

assembler = VectorAssembler(
    inputCols=["unigram_features","bigram_features"],
    outputCol="raw_features",
)

idf = IDF(
    inputCol="raw_features",
    outputCol="features"
)

feature_stages = [
    tokenizer,
    remover,
    bigram,
    unigram_cv,
    bigram_cv,
    assembler,
    idf
]

In [None]:
train_df,test_df = df.randomSplit([0.8,0.2],seed= 42)
train_df.cache()
test_df.cache()

DataFrame[label: int, tweet: string, clean_text: string]

In [None]:
from pyspark.sql.functions import when,col

label_counts = df.groupBy('label').count().collect()

neg_count = [row["count"] for row in label_counts if row["label"] == 0][0]
pos_count = [row["count"] for row in label_counts if row["label"] == 0][0]

total = neg_count + pos_count

train_df = train_df.withColumn(
    "classWeightCol",
    when(col("label") == 1,total/pos_count).otherwise(total/neg_count)
)

test_df = test_df.withColumn("classWeightCol",col("label"))

In [None]:
from pyspark.ml.classification import (
    LogisticRegression,
    NaiveBayes,
    LinearSVC
)
models ={
    "naive_bayes": NaiveBayes(
        featuresCol="features",
        labelCol="label"
    ),
    "linear_svc": LinearSVC(
        featuresCol="features",
        labelCol="label"
    )
  }

lr = LogisticRegression(
        featuresCol="features",
        labelCol="label",
        weightCol="classWeightCol",
        regParam=0.1
    )

In [None]:
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator,BinaryClassificationEvaluator
from pyspark.ml import Pipeline
paramGrid = (
    ParamGridBuilder()
    .addGrid(lr.regParam,[0.001,0.01,0.1])
    .addGrid(lr.elasticNetParam,[0.0,0.5,1.0])
    .build()
)

evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="f1"
)

evaluator_auc = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

cv_pipeline = Pipeline(
    stages=feature_stages + [lr]
)

crossval = CrossValidator(
    estimator=cv_pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=2,
    parallelism=2
)

In [None]:
from pyspark.ml.evaluation import (
    MulticlassClassificationEvaluator,
    BinaryClassificationEvaluator
)

def evaluate_predictions(predictions):
    f1 = MulticlassClassificationEvaluator(
        labelCol="label",
        predictionCol="prediction",
        metricName="f1"
    ).evaluate(predictions)

    auc = BinaryClassificationEvaluator(
        labelCol="label",
        rawPredictionCol="rawPrediction",
        metricName="areaUnderROC"
    ).evaluate(predictions)

    return f1,auc


In [None]:
from pyspark.ml import Pipeline

results = {}

for name,model in models.items():
  print(f"\nTraining {name}...")

  pipeline = Pipeline(stages=feature_stages+[model])
  fitted_model = pipeline.fit(train_df)
  predictions = fitted_model.transform(test_df)
  f1,auc = evaluate_predictions(predictions)

  results[name] = {
      "model": fitted_model,
      "f1": f1,
      "auc": auc
  }
  print(f"{name} -> F1:{float(f1):.4f}, ROC_AUC:{float(auc):.4f}")

print("Training logistic regression...")

cv_model = crossval.fit(train_df)

cv_predictions = cv_model.transform(test_df)

f1_lr = evaluator.evaluate(cv_predictions)
auc_lr = evaluator_auc.evaluate(cv_predictions)

print(f"\nCross-Validated Logistic Regression -> F1: {f1_lr:.4f}, ROC-AUC: {auc_lr:.4f}")

results["logistic_regression_cv"] = {
    "model": cv_model,
    "f1": float(f1_lr),
    "auc": float(auc_lr)
}


Training naive_bayes...
naive_bayes -> F1:0.7731, ROC_AUC:0.5494

Training linear_svc...
linear_svc -> F1:0.7881, ROC_AUC:0.8631
Training logistic regression...

Cross-Validated Logistic Regression -> F1: 0.7897, ROC-AUC: 0.8675


In [None]:
best_model_name = max(results,key=lambda x: results[x]["f1"])
best_model = results[best_model_name]["model"]

preds = best_model.transform(test_df)

print("FALSE POSITIVES:")
preds.filter(
    (col("label") == 0) & (col("prediction") == 1)
).select("tweet").show(5,truncate=False)

print("FALSE NEGATIVES:")
preds.filter(
    (col("label") == 1) & (col("prediction") == 0)
).select("tweet").show(5,truncate=False)

FALSE POSITIVES:
+----------------------------------------------------------+
|tweet                                                     |
+----------------------------------------------------------+
|   #Battleground                                          |
|   Tell those girls your clocking out lol                 |
|   no shopping                                            |
|  (unsure)  (hassle)  (:  (music) http://plurk.com/p/yq08l|
|  Anyone in DFW wanna hang out?                           |
+----------------------------------------------------------+
only showing top 5 rows
FALSE NEGATIVES:
+-----------------------------------------------------------------------------------------------------------------------------------------+
|tweet                                                                                                                                    |
+----------------------------------------------------------------------------------------------------------------

In [None]:
from pyspark.sql import Row

def predict_text(model,spark,text):
  temp_df = spark.createDataFrame([Row(tweet = text)])
  temp_df = clean_text_df(temp_df)

  result = model.transform(temp_df).select("prediction","probability").collect()[0]

  return {
      "prediction": int(result["prediction"]),
      "probablities": list(result["probability"]),
      "confidence": max(result["probability"])
  }

In [None]:
output = predict_text(
    best_model,
    spark,
    "This movie was absolutely amazing!"
)

output

{'prediction': 1,
 'probablities': [np.float64(0.13719325956202927),
  np.float64(0.8628067404379707)],
 'confidence': np.float64(0.8628067404379707)}

In [None]:
!pip install mlflow

Collecting mlflow
  Downloading mlflow-3.9.0-py3-none-any.whl.metadata (31 kB)
Collecting mlflow-skinny==3.9.0 (from mlflow)
  Downloading mlflow_skinny-3.9.0-py3-none-any.whl.metadata (32 kB)
Collecting mlflow-tracing==3.9.0 (from mlflow)
  Downloading mlflow_tracing-3.9.0-py3-none-any.whl.metadata (19 kB)
Collecting Flask-CORS<7 (from mlflow)
  Downloading flask_cors-6.0.2-py3-none-any.whl.metadata (5.3 kB)
Collecting docker<8,>=4.0.0 (from mlflow)
  Downloading docker-7.1.0-py3-none-any.whl.metadata (3.8 kB)
Collecting graphene<4 (from mlflow)
  Downloading graphene-3.4.3-py2.py3-none-any.whl.metadata (6.9 kB)
Collecting gunicorn<24 (from mlflow)
  Downloading gunicorn-23.0.0-py3-none-any.whl.metadata (4.4 kB)
Collecting huey<3,>=2.5.4 (from mlflow)
  Downloading huey-2.6.0-py3-none-any.whl.metadata (4.3 kB)
Collecting skops<1 (from mlflow)
  Downloading skops-0.13.0-py3-none-any.whl.metadata (5.6 kB)
Collecting databricks-sdk<1,>=0.20.0 (from mlflow-skinny==3.9.0->mlflow)
  Downloa

In [None]:
import mlflow
import mlflow.spark

mlflow.set_experiment("Twitter Sentiment Analysis- pyspark")

2026/01/30 11:27:16 INFO alembic.runtime.plugins: setup plugin alembic.autogenerate.schemas
2026/01/30 11:27:16 INFO alembic.runtime.plugins: setup plugin alembic.autogenerate.tables
2026/01/30 11:27:16 INFO alembic.runtime.plugins: setup plugin alembic.autogenerate.types
2026/01/30 11:27:16 INFO alembic.runtime.plugins: setup plugin alembic.autogenerate.constraints
2026/01/30 11:27:16 INFO alembic.runtime.plugins: setup plugin alembic.autogenerate.defaults
2026/01/30 11:27:16 INFO alembic.runtime.plugins: setup plugin alembic.autogenerate.comments
2026/01/30 11:27:16 INFO mlflow.store.db.utils: Creating initial MLflow database tables...
2026/01/30 11:27:16 INFO mlflow.store.db.utils: Updating database tables
2026/01/30 11:27:16 INFO alembic.runtime.migration: Context impl SQLiteImpl.
2026/01/30 11:27:16 INFO alembic.runtime.migration: Will assume non-transactional DDL.
2026/01/30 11:27:17 INFO alembic.runtime.migration: Running upgrade  -> 451aebb31d03, add metric step
2026/01/30 11:2

<Experiment: artifact_location='/content/mlruns/1', creation_time=1769772439224, experiment_id='1', last_update_time=1769772439224, lifecycle_stage='active', name='Twitter Sentiment Analysis- pyspark', tags={}>

In [None]:
best_f1 = results[best_model_name]["f1"]
best_auc = results[best_model_name]["auc"]

with mlflow.start_run(run_name=best_model_name):
  mlflow.log_param("model",best_model_name)
  mlflow.log_metric("f1_score",best_f1)
  mlflow.log_metric("roc_auc",best_auc)

  mlflow.spark.log_model(
      best_model,
      artifact_path="sentiment_model"
  )

print("best model logged to mlflow:",best_model_name)

best model logged to mlflow: logistic_regression_cv


In [None]:
import os
os.makedirs("/content/stream_input", exist_ok=True)

In [None]:
from pyspark.sql.functions import col

stream_df = spark.readStream.format("text").load("/content/stream_input")

stream_df = stream_df.withColumnRenamed("value","tweet")

In [None]:
from pyspark.ml.functions import vector_to_array
stream_df = clean_text_df(stream_df)

stream_predictions = best_model.transform(stream_df).withColumn("prob_array", vector_to_array("probability"))\
.withColumn("positive_prob", col("prob_array")[1]).select("tweet","prediction","positive_prob")

In [None]:
query = stream_predictions.writeStream.format("memory").queryName("sentiment_stream").outputMode("append").start()

In [None]:
print("Is stream active?", query.isActive)

Is stream active? False


In [None]:
with open("/content/stream_input/tweets1.txt", "w") as f:
    f.write("I love Spark streaming!\n")
    f.write("This movie was terrible\n")

In [None]:
with open("/content/stream_input/tweets2.txt", "w") as f:
    f.write("I love Spark streaming\n")
    f.write("This is the worst movie ever\n")

In [None]:
with open("/content/stream_input/tweets_101.txt", "w") as f:
    f.write("I love this product so much\n")
    f.write("This is horrible and disappointing\n")

In [None]:
spark.sql("SELECT * FROM sentiment_stream").show(truncate=False)

+----------------------------------+----------+-------------------+
|tweet                             |prediction|positive_prob      |
+----------------------------------+----------+-------------------+
|I love Spark streaming            |1.0       |0.734244627571385  |
|This is the worst movie ever      |0.0       |0.30183064616507205|
|I love Spark streaming!           |1.0       |0.734244627571385  |
|This movie was terrible           |0.0       |0.29626714471245286|
|I love this product so much       |1.0       |0.7707708784224886 |
|This is horrible and disappointing|0.0       |0.02016066995660326|
+----------------------------------+----------+-------------------+



In [None]:
query.stop()