In [6]:
spark = SparkSession.builder \
        .appName("Classification Model with PySpark") \
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/21 14:38:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/21 14:38:04 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [7]:
!pip install pyspark

# Importing libraries and modules
import os
import logging
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col

# Configure logging ERROR level
logger = logging.getLogger("py4j")
logger.setLevel(logging.ERROR)


# Loading the dataset into a PySpark dataframe
df = spark.read.format("csv").option("header", "false").option("delimiter", "\t").load("amazon_cells_labelled.txt")

# Renaming the columns and casting the labels to integer type
df = df.toDF("text", "label")
df = df.withColumn("label", col("label").cast("int"))

# Tokenizing the text data
tokenizer = Tokenizer(inputCol="text", outputCol="words")
df = tokenizer.transform(df)

# Removing stop words from the text data
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="wordsfiltred")
df = stopwords_remover.transform(df)

# Compute the term frequency of the filtered words
hashing_tf = HashingTF(inputCol="wordsfiltred", outputCol="rawfeatures")
df = hashing_tf.transform(df)

# Computing the inverse document frequency of the raw features
idf = IDF(inputCol="rawfeatures", outputCol="features")
idf_model = idf.fit(df)
df = idf_model.transform(df)

# Splitting the data into training and testing sets
(training_data, testing_data) = df.randomSplit([0.8, 0.2], seed=42)

# Training a random forest classifier
random_forest = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=100, maxDepth=4)
rforest_model = random_forest.fit(training_data)

# Evaluating the performance of the random forest classifier on the testing data
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(rforest_model.transform(testing_data))
print("Accuracy: {:.1f}%".format(accuracy * 100))




23/04/21 14:38:48 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/04/21 14:38:49 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/04/21 14:38:50 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/04/21 14:38:50 WARN DAGScheduler: Broadcasting large task binary with size 6.7 MiB
23/04/21 14:38:59 WARN DAGScheduler: Broadcasting large task binary with size 1033.7 KiB
23/04/21 14:39:00 WARN DAGScheduler: Broadcasting large task binary with size 7.6 MiB
23/04/21 14:39:02 WARN MemoryStore: Not enough space to cache rdd_45_0 in memory! (computed 419.2 MiB so far)
23/04/21 14:39:02 WARN BlockManager: Persisting block rdd_45_0 to disk instead.
23/04/21 14:39:04 WARN MemoryStore: Not enough space to cache rdd_45_0 in memory! (computed 419.2 MiB so far)
23/04/21 14:39:06 WARN DAGScheduler: Broadcasting large task binary with size 7.6 MiB
23/04/21 14:39:07 WARN MemoryStore: Not enough space to cache rdd_45_0 in memory! (computed 419

Accuracy: 57.4%
