In [126]:
!pip install pyspark



In [127]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Twitter Sentiment Analysis") \
    .getOrCreate()

from pyspark.ml.feature import Word2Vec
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline



In [150]:
# Load CSV data into a Spark DataFrame
df = spark.read.csv("/content/train_data.csv", header=True, inferSchema=True)
df.show(20)

+---+-----+--------------------+
| id|label|               tweet|
+---+-----+--------------------+
|  1|    0| @user when a fat...|
|  2|    0|@user @user thank...|
|  3|    0|  bihday your maj...|
|  4|    0|#model   i love u...|
|  5|    0| factsguide: soci...|
|  6|    0|[2/2] huge fan fa...|
|  7|    0| @user camping to...|
|  8|    0|the next school y...|
|  9|    0|we won!!! love th...|
| 10|    0| @user @user welc...|
| 11|    0| â #ireland con...|
| 12|    0|we are so selfish...|
| 13|    0|i get to see my d...|
| 14|    1|@user #cnn calls ...|
| 15|    1|no comment!  in #...|
| 16|    0|ouch...junior is ...|
| 17|    0|i am thankful for...|
| 18|    1|retweet if you ag...|
| 19|    0|its #friday! ð...|
| 20|    0|as we all know, e...|
+---+-----+--------------------+
only showing top 20 rows



In [151]:
from pyspark.sql.functions import col, lower, regexp_replace
from pyspark.ml.feature import Tokenizer, StopWordsRemover

# Lowercase the text if not already done
if "tweet" in df.columns:
    df = df.withColumn("tweet", lower(col("tweet")))

# Remove punctuation and special characters if not already done
if "tweet" in df.columns:
    df = df.withColumn("tweet", regexp_replace(col("tweet"), "[^a-zA-Z0-9\\s]", ""))

# Tokenize text if not already tokenized
if "words" not in df.columns:
    tokenizer = Tokenizer(inputCol="tweet", outputCol="words")
    df = tokenizer.transform(df)

# Remove stopwords if not already removed
if "filtered" not in df.columns:
    remover = StopWordsRemover(inputCol="words", outputCol="filtered")
    df = remover.transform(df)

df.show(20)

+---+-----+--------------------+--------------------+--------------------+
| id|label|               tweet|               words|            filtered|
+---+-----+--------------------+--------------------+--------------------+
|  1|    0| user when a fath...|[, user, when, a,...|[, user, father, ...|
|  2|    0|user user thanks ...|[user, user, than...|[user, user, than...|
|  3|    0|  bihday your maj...|[, , bihday, your...|[, , bihday, maje...|
|  4|    0|model   i love u ...|[model, , , i, lo...|[model, , , love,...|
|  5|    0| factsguide socie...|[, factsguide, so...|[, factsguide, so...|
|  6|    0|22 huge fan fare ...|[22, huge, fan, f...|[22, huge, fan, f...|
|  7|    0| user camping tom...|[, user, camping,...|[, user, camping,...|
|  8|    0|the next school y...|[the, next, schoo...|[next, school, ye...|
|  9|    0|we won love the l...|[we, won, love, t...|[won, love, land,...|
| 10|    0| user user welcom...|[, user, user, we...|[, user, user, we...|
| 11|    0|  ireland cons

### **word2vec**

In [152]:
from pyspark.ml.feature import Word2Vec

word2vec = Word2Vec(vectorSize=100, minCount=0, inputCol="filtered", outputCol="features")
model = word2vec.fit(df)
df = model.transform(df)

In [145]:
df.show(10)

+---+-----+--------------------+--------------------+--------------------+--------------------+
| id|label|               tweet|               words|            filtered|            features|
+---+-----+--------------------+--------------------+--------------------+--------------------+
|  1|    0| user when a fath...|[, user, when, a,...|[, user, father, ...|(1000,[0,1,164,16...|
|  2|    0|user user thanks ...|[user, user, than...|[user, user, than...|(1000,[0,1,19,22,...|
|  3|    0|  bihday your maj...|[, , bihday, your...|[, , bihday, maje...|(1000,[0,17],[2.0...|
|  4|    0|model   i love u ...|[model, , , i, lo...|[model, , , love,...|(1000,[0,2,7,8,27...|
|  5|    0| factsguide socie...|[, factsguide, so...|[, factsguide, so...|(1000,[0,199],[4....|
|  6|    0|22 huge fan fare ...|[22, huge, fan, f...|[22, huge, fan, f...|(1000,[15,109,172...|
|  7|    0| user camping tom...|[, user, camping,...|[, user, camping,...|(1000,[0,1,57],[1...|
|  8|    0|the next school y...|[the, ne

logistic regression

In [156]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

# Convert label to numerical format
indexer = StringIndexer(inputCol="label", outputCol="label_index")
lr = LogisticRegression(featuresCol="features", labelCol="label_index",threshold=0.3)

pipeline = Pipeline(stages=[indexer, lr])

# Split data into training and test sets
train, test = df.randomSplit([0.8, 0.2], seed=12345)

# Train the model
model = pipeline.fit(train)


In [157]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Make predictions
predictions = model.transform(test)

# Evaluate the model using F1 score
evaluator = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(predictions)
print(f"Test F1 Score = {f1_score}")


Test F1 Score = 0.9328921019120989


SVM

In [133]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

# Convert label to numerical format
indexer = StringIndexer(inputCol="label", outputCol="label_index")
svm = LinearSVC(featuresCol="features", labelCol="label_index")

pipeline = Pipeline(stages=[indexer, svm])

# Split data into training and test sets
train, test = df.randomSplit([0.8, 0.2], seed=12345)

# Train the model
model = pipeline.fit(train)

In [134]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Make predictions
predictions = model.transform(test)

# Evaluate the model using F1 score
evaluator = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(predictions)
print(f"Test F1 Score = {f1_score}")

Test F1 Score = 0.907774579566599


## **TF IDF**

In [146]:
# Feature extraction using TF-IDF

from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=20000)
featurizedData = hashingTF.transform(df)

idf = IDF(inputCol="rawFeatures", outputCol="tfidfFeatures")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)


In [147]:
df.show(10)

+---+-----+--------------------+--------------------+--------------------+--------------------+
| id|label|               tweet|               words|            filtered|            features|
+---+-----+--------------------+--------------------+--------------------+--------------------+
|  1|    0| user when a fath...|[, user, when, a,...|[, user, father, ...|(1000,[0,1,164,16...|
|  2|    0|user user thanks ...|[user, user, than...|[user, user, than...|(1000,[0,1,19,22,...|
|  3|    0|  bihday your maj...|[, , bihday, your...|[, , bihday, maje...|(1000,[0,17],[2.0...|
|  4|    0|model   i love u ...|[model, , , i, lo...|[model, , , love,...|(1000,[0,2,7,8,27...|
|  5|    0| factsguide socie...|[, factsguide, so...|[, factsguide, so...|(1000,[0,199],[4....|
|  6|    0|22 huge fan fare ...|[22, huge, fan, f...|[22, huge, fan, f...|(1000,[15,109,172...|
|  7|    0| user camping tom...|[, user, camping,...|[, user, camping,...|(1000,[0,1,57],[1...|
|  8|    0|the next school y...|[the, ne

logistic regression

In [158]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

# Convert label to numerical format
indexer = StringIndexer(inputCol="label", outputCol="label_index")
lr = LogisticRegression(featuresCol="features", labelCol="label_index",threshold=0.3)

pipeline = Pipeline(stages=[indexer, lr])

# Split data into training and test sets
train, test = rescaledData.randomSplit([0.8, 0.2], seed=12345)

# Train the model
model = pipeline.fit(train)


In [159]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Make predictions
predictions = model.transform(test)

# Evaluate the model using F1 score
evaluator = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(predictions)
print(f"Test F1 Score = {f1_score}")

Test F1 Score = 0.9362110670558152


SVM

In [138]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

# Convert label to numerical format
indexer = StringIndexer(inputCol="label", outputCol="label_index")
svm = LinearSVC(featuresCol="features", labelCol="label_index")

pipeline = Pipeline(stages=[indexer, svm])

# Split data into training and test sets
train, test = df.randomSplit([0.8, 0.2], seed=12345)

# Train the model
model = pipeline.fit(train)


In [139]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Make predictions
predictions = model.transform(test)

# Evaluate the model using F1 score
evaluator = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(predictions)
print(f"Test F1 Score = {f1_score}")

Test F1 Score = 0.907774579566599


## **Bag of Words**

In [148]:
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

# Assuming `df` is your DataFrame containing the data

# Drop the existing "features" column if it exists
if "features" in df.columns:
    df = df.drop("features")

# Convert text data into numerical features using Bag of Words (CountVectorizer)
cv = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=1000)  # Adjust vocabSize as needed
model = cv.fit(df)
df = model.transform(df)

In [149]:
df.show(10)

+---+-----+--------------------+--------------------+--------------------+--------------------+
| id|label|               tweet|               words|            filtered|            features|
+---+-----+--------------------+--------------------+--------------------+--------------------+
|  1|    0| user when a fath...|[, user, when, a,...|[, user, father, ...|(1000,[0,1,164,16...|
|  2|    0|user user thanks ...|[user, user, than...|[user, user, than...|(1000,[0,1,19,22,...|
|  3|    0|  bihday your maj...|[, , bihday, your...|[, , bihday, maje...|(1000,[0,17],[2.0...|
|  4|    0|model   i love u ...|[model, , , i, lo...|[model, , , love,...|(1000,[0,2,7,8,27...|
|  5|    0| factsguide socie...|[, factsguide, so...|[, factsguide, so...|(1000,[0,199],[4....|
|  6|    0|22 huge fan fare ...|[22, huge, fan, f...|[22, huge, fan, f...|(1000,[15,109,172...|
|  7|    0| user camping tom...|[, user, camping,...|[, user, camping,...|(1000,[0,1,57],[1...|
|  8|    0|the next school y...|[the, ne

logistic regression

In [141]:

# Convert label to numerical format
indexer = StringIndexer(inputCol="label", outputCol="label_index")

# Logistic Regression classifier
lr = LogisticRegression(featuresCol="features", labelCol="label_index")

pipeline = Pipeline(stages=[indexer, lr])

# Split data into training and test sets
train, test = df.randomSplit([0.8, 0.2], seed=12345)

# Train the model
model = pipeline.fit(train)


In [142]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# # Check if "features" column exists in test dataset and drop it if it does
# if "features" in test.columns:
#     test = test.drop("features")

# Make predictions on the test set
predictions = model.transform(test)

# Evaluate the model using F1 score
evaluator = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction", metricName="f1")

# Calculate the F1 score
f1_score = evaluator.evaluate(predictions)

# Print the F1 score
print(f"Test F1 Score = {f1_score}")


Test F1 Score = 0.9406806479085082


In [143]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

# Convert label to numerical format
indexer = StringIndexer(inputCol="label", outputCol="label_index")
svm = LinearSVC(featuresCol="features", labelCol="label_index")

pipeline = Pipeline(stages=[indexer, svm])

# Split data into training and test sets
train, test = df.randomSplit([0.8, 0.2], seed=12345)

# Train the model
model = pipeline.fit(train)

In [144]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Make predictions
predictions = model.transform(test)

# Evaluate the model using F1 score
evaluator = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(predictions)
print(f"Test F1 Score = {f1_score}")

Test F1 Score = 0.9430677831262497
