In [6]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Amazon Reviews Sentiment Analysis") \
    .getOrCreate()

# Load Data
data = spark.read.csv('test.csv', inferSchema=True, header=True)

# Preprocessing Steps
# Tokenize words
regexTokenizer = RegexTokenizer(inputCol="content", outputCol="words", pattern="\\W")

# Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered")

# Convert words to feature vectors
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Convert label to numeric
label_stringIdx = StringIndexer(inputCol="label", outputCol="labelIndex")

# Pipeline
pipeline = Pipeline(stages=[regexTokenizer, remover, hashingTF, idf, label_stringIdx])

# Apply transformations
pipelineModel = pipeline.fit(data)
dataset = pipelineModel.transform(data)

dataset.show(5)


23/12/11 22:05:54 WARN StopWordsRemover: Default locale set was [en_SA]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.
                                                                                

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|label|               title|             content|               words|            filtered|         rawFeatures|            features|labelIndex|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|    1|            Great CD|"My lovely Pat ha...|[my, lovely, pat,...|[lovely, pat, one...|(262144,[3370,218...|(262144,[3370,218...|       1.0|
|    1|One of the best g...|Despite the fact ...|[despite, the, fa...|[despite, fact, p...|(262144,[6946,844...|(262144,[6946,844...|       1.0|
|    0|Batteries died wi...|I bought this cha...|[i, bought, this,...|[bought, charger,...|(262144,[1578,576...|(262144,[1578,576...|       0.0|
|    1|works fine, but M...|Check out Maha En...|[check, out, maha...|[check, maha, ene...|(262144,[82005,10...|(262144,[82005,10.

23/12/11 22:06:03 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


In [7]:
train_data, test_data = dataset.randomSplit([0.7, 0.3])


In [8]:
# Define Logistic Regression model
lr = LogisticRegression(featuresCol='features', labelCol='labelIndex')

# Train the model
lrModel = lr.fit(train_data)

# Make predictions
predictions = lrModel.transform(test_data)

# Evaluate the model
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="labelIndex", rawPredictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy: ", accuracy)


23/12/11 22:06:26 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/11 22:06:35 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/11 22:06:35 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/12/11 22:06:35 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/11 22:06:43 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/11 22:06:43 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/11 22:06:43 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/11 22:06:43 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/11 22:06:43 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/11 22:06:44 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/11 22:06:44 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/11 22:06:44 WARN DAGSchedul

Accuracy:  0.7625791932943319


                                                                                

## Random Forest Classifier

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Amazon Reviews Classification") \
        .config("spark.executor.memory", "4g") \
.config("spark.executor.memoryOverhead", "1g") \
.config("spark.memory.fraction", "0.8") \
.config("spark.memory.storageFraction", "0.5") \
    .getOrCreate()

data = spark.read.csv('test.csv', inferSchema=True, header=True)


23/12/11 22:44:35 WARN Utils: Your hostname, Hadis-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 172.20.10.5 instead (on interface en0)
23/12/11 22:44:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/11 22:44:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [2]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml import Pipeline

# Tokenize words
regexTokenizer = RegexTokenizer(inputCol="content", outputCol="words", pattern="\\W")

# Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered")

# Feature transformation using TF-IDF
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Convert label to numeric
label_stringIdx = StringIndexer(inputCol="label", outputCol="labelIndex")

# Pipeline
pipeline = Pipeline(stages=[regexTokenizer, remover, hashingTF, idf, label_stringIdx])

# Apply transformations
dataset = pipeline.fit(data).transform(data)


23/12/11 22:44:43 WARN StopWordsRemover: Default locale set was [en_SA]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.
                                                                                

In [3]:
train_data, test_data = dataset.randomSplit([0.7, 0.3])


In [4]:
from pyspark.ml.classification import GBTClassifier

# Define the model
gbt_classifier = GBTClassifier(featuresCol='features', labelCol='labelIndex', maxDepth=3)  # default is 5

# Train the model
gbt_model = gbt_classifier.fit(train_data)

# Predict on test data
gbt_predictions = gbt_model.transform(test_data)

# Evaluate the model
gbt_evaluator = BinaryClassificationEvaluator(labelCol="labelIndex")
gbt_accuracy = gbt_evaluator.evaluate(gbt_predictions)
print("Gradient-Boosted Trees Classifier Accuracy: ", gbt_accuracy)


23/12/11 22:45:04 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/11 22:45:07 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/11 22:45:17 WARN DAGScheduler: Broadcasting large task binary with size 6.7 MiB
23/12/11 22:45:52 WARN DAGScheduler: Broadcasting large task binary with size 1033.7 KiB
23/12/11 22:45:53 WARN DAGScheduler: Broadcasting large task binary with size 7.8 MiB
23/12/11 22:45:58 WARN MemoryStore: Not enough space to cache rdd_49_7 in memory! (computed 65.0 MiB so far)
23/12/11 22:45:58 WARN BlockManager: Persisting block rdd_49_7 to disk instead.
23/12/11 22:46:01 WARN TaskMemoryManager: Failed to allocate a page (4194288 bytes), try again.
23/12/11 22:46:01 WARN TaskMemoryManager: Failed to allocate a page (4194288 bytes), try again.
23/12/11 22:46:01 WARN BlockManager: Block rdd_49_6 could not be removed as it was not found on disk or in memory
23/12/11 22:46:01 WARN TaskMemoryManager: Failed to allocate a page (419



23/12/11 22:46:01 WARN BlockManager: Block rdd_49_2 could not be removed as it was not found on disk or in memory
23/12/11 22:46:01 WARN BlockManager: Block rdd_49_3 could not be removed as it was not found on disk or in memory
23/12/11 22:46:01 ERROR Executor: Exception in task 0.0 in stage 11.0 (TID 53)
java.lang.OutOfMemoryError: Java heap space
23/12/11 22:46:01 ERROR Executor: Exception in task 3.0 in stage 11.0 (TID 56)
java.lang.OutOfMemoryError: Java heap space
23/12/11 22:46:02 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 0.0 in stage 11.0 (TID 53),5,main]
java.lang.OutOfMemoryError: Java heap space
23/12/11 22:46:02 ERROR SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[Executor task launch worker for task 3.0 in stage 11.0 (TID 56),5,main]
java.lang.OutOfMemoryError: Java heap space
23/12/11 22:46:02 WARN TaskSetManager: Lost task 0.0 in stage 11.0 (TID 53) (172.20.10.5

Py4JJavaError: An error occurred while calling o251.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 1 times, most recent failure: Lost task 0.0 in stage 11.0 (TID 53) (172.20.10.5 executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1046)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1045)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$collectAsMap$1(PairRDDFunctions.scala:738)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:737)
	at org.apache.spark.ml.tree.impl.RandomForest$.findBestSplits(RandomForest.scala:663)
	at org.apache.spark.ml.tree.impl.RandomForest$.runBagged(RandomForest.scala:208)
	at org.apache.spark.ml.tree.impl.GradientBoostedTrees$.boost(GradientBoostedTrees.scala:367)
	at org.apache.spark.ml.tree.impl.GradientBoostedTrees$.run(GradientBoostedTrees.scala:61)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$1(GBTClassifier.scala:201)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.GBTClassifier.train(GBTClassifier.scala:170)
	at org.apache.spark.ml.classification.GBTClassifier.train(GBTClassifier.scala:58)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:114)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.OutOfMemoryError: Java heap space


23/12/11 22:46:02 ERROR Executor: Exception in task 6.0 in stage 11.0 (TID 59): Java heap space
23/12/11 22:46:02 ERROR Executor: Exception in task 1.0 in stage 11.0 (TID 54): Java heap space
