In [62]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Importing Libraries



In [63]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, desc, split, explode,sum as _sum
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
from pyspark.sql.functions import lower, regexp_replace
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import HashingTF, IDF

Loading Dataset

In [64]:
df = spark.read.csv("/content/drive/MyDrive/IMDB Data Analysis/IMDB Dataset.csv",
    header=True,
    inferSchema=True,
    multiLine=True,
    escape='"',
    quote='"')



In [65]:
df.printSchema()
df.show(5)


root
 |-- review: string (nullable = true)
 |-- sentiment: string (nullable = true)

+--------------------+---------+
|              review|sentiment|
+--------------------+---------+
|One of the other ...| positive|
|A wonderful littl...| positive|
|I thought this wa...| positive|
|Basically there's...| negative|
|Petter Mattei's "...| positive|
+--------------------+---------+
only showing top 5 rows



In [66]:
print("Total Rows:", df.count())
print("Total Columns:", len(df.columns))


Total Rows: 50000
Total Columns: 2


Data Cleaning

Remove duplicates

In [67]:
df = df.dropDuplicates()
print("Total Rows:", df.count())
print("Total Columns:", len(df.columns))

Total Rows: 49582
Total Columns: 2


Checking missing values

In [68]:
df.select([_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

+------+---------+
|review|sentiment|
+------+---------+
|     0|        0|
+------+---------+



Class Distribution

In [69]:
df.groupBy("sentiment").count().show()


+---------+-----+
|sentiment|count|
+---------+-----+
| positive|24884|
| negative|24698|
+---------+-----+



Text Cleaning

In [70]:
#Text clean
df_clean = df.withColumn("review", lower(col("review")))
df_clean = df_clean.withColumn("review", regexp_replace(col("review"), "[^a-zA-Z\\s]", ""))


In [71]:
df_clean.write.csv("/content/drive/MyDrive/IMDB Data Analysis/cleaned_data", header=True)

In [72]:
#Tokenize
tokenizer = RegexTokenizer(inputCol="review", outputCol="tokens", pattern="\\s+")
df_token = tokenizer.transform(df_clean)


In [73]:
#Remove stop words
remover = StopWordsRemover(inputCol="tokens", outputCol="clean_tokens")
df_final = remover.transform(df_token)

df_final.select("review", "clean_tokens", "sentiment").show(5, truncate=False)


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Convert Text â†’ Features (TF-IDF)

In [74]:
hashing = HashingTF(inputCol="clean_tokens", outputCol="rawFeatures", numFeatures=20000)
df_tf = hashing.transform(df_final)


In [75]:
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(df_tf)
df_tfidf = idfModel.transform(df_tf)


Encode Label

In [76]:


indexer = StringIndexer(inputCol="sentiment", outputCol="label")
df_labeled = indexer.fit(df_tfidf).transform(df_tfidf)


In [77]:
train, test = df_labeled.randomSplit([0.8, 0.2], seed=42)


Train a Model

Logistic Regression

In [78]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(train)


Evaluation

In [79]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

pred = model.transform(test)

acc = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy").evaluate(pred)
f1 = MulticlassClassificationEvaluator(labelCol="label", metricName="f1").evaluate(pred)

print("Accuracy:", acc)
print("F1 Score:", f1)


Accuracy: 0.8225855901890649
F1 Score: 0.8225600418055024


In [80]:
pred.select("review", "sentiment", "prediction").show(10, truncate=False)


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Linear SVM


In [81]:
from pyspark.ml.classification import LinearSVC

svm = LinearSVC(featuresCol="features", labelCol="label")
svm_model = svm.fit(train)
svm_pred = svm_model.transform(test)


Py4JJavaError: An error occurred while calling o1741.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1307.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1307.0 (TID 1270) (08d051d614c9 executor driver): org.apache.spark.SparkFileNotFoundException: File file:/content/drive/MyDrive/IMDB Data Analysis/IMDB Dataset.csv does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:780)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:220)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:279)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkFileNotFoundException: File file:/content/drive/MyDrive/IMDB Data Analysis/IMDB Dataset.csv does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:780)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:220)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:279)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label")

print("SVM Accuracy:", evaluator.evaluate(svm_pred, {evaluator.metricName: "accuracy"}))
print("SVM F1:", evaluator.evaluate(svm_pred, {evaluator.metricName: "f1"}))
