In [1]:
# Import libraries

# sparkML
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

# sparkSQL
from pyspark.sql.functions import udf, col, when
from pyspark.sql.types import StringType, DoubleType
from pyspark.sql import SparkSession

# other
from sklearn.metrics import classification_report
from nltk.stem import WordNetLemmatizer
import findspark
import nltk
import csv

In [2]:
findspark.init()

In [3]:
# Download nltk functions --> Necessary to remove stopwords and more to get better accuracy in our NLP ML model
nltk.download("punkt")
nltk.download("stopwords")
nltk.download("wordnet")

[nltk_data] Downloading package punkt to /home/ubuntu/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /home/ubuntu/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /home/ubuntu/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

In [4]:
spark = SparkSession.builder.appName("ModelTraining")

spark.config("spark.executor.cores", "4")  # Use all 4 available cores per executor
spark.config("spark.driver.cores", "1")  # Use 1 core for the driver
spark.config("spark.default.parallelism", "4")

spark = spark.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/19 14:39:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
# UDF for preprocessing
def preprocess_text(text):
    '''Remove stop words, tokenize and clean the data from the title column. Original: |Cis Men of Reddit, if you were a woman, what would you like about Men?  --> PROCESSED: men reddit woman would like men''' 
    tokens = nltk.word_tokenize(text.lower(), language="english")
    tokens = [word for word in tokens if word.isalnum() and len(word) > 1]
    tokens = [lemmatizer.lemmatize(word, pos="v") for word in tokens if word not in stop_words]
    return ' '.join(tokens)

In [6]:

# Load the labeled dataset into spark
df = spark.read.csv("labeled-training-dataset.csv", header=True, inferSchema=True)

# We need a copy of the column, as we are going to tokenize and vektorize the title to better classification
df = df.withColumn("original_title", df["title"])

# Data Preprocessing
lemmatizer = WordNetLemmatizer()
stop_words = set(nltk.corpus.stopwords.words("english"))

# Create a list of unique values of the topic_name column in the csv file. Eventually, you will find some rows that contain a really rare character combination, so you avoid it creating specific previous topics.
TARGET_CLASSES = ["money", "food", "job", "life", "music", "media", "movie", "sexual", "health", "kid", "game", "book", "tech", "relationships"]

# Filter out the elements that are not falling in any of the classes due to a SyntaxError in the Reddit Sentence --> "Is calling someone a ""plaything"" a porn term? If so, what type of porn/ kink content?",sexual [Look at that combination of double ""]
df = df.filter(df["topic_name"].isin(TARGET_CLASSES))

preprocess_udf = udf(preprocess_text, StringType())
df = df.withColumn("title", preprocess_udf(df["title"]))

# Convert the topic_name column to numeric
indexer = StringIndexer(inputCol="topic_name", outputCol="label")
indexed_df = indexer.fit(df).transform(df)
indexed_df = indexed_df.withColumn("label", col("label").cast("integer"))

# Split the dataset
(train_df, test_df) = indexed_df.randomSplit([0.8, 0.2], seed=42)

# TF-IDF Vectorization --> Classic steps prior to train a Multiclassification model in NLP
tokenizer = Tokenizer(inputCol="title", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
cv = CountVectorizer(inputCol="filtered_words", outputCol="raw_features", vocabSize=1500)
idf = IDF(inputCol="raw_features", outputCol="features")

# Logistic Regression model
lr = LogisticRegression(labelCol="label", featuresCol="features")

# Modify the pipeline to use Logistic Regression
pipeline = Pipeline(stages=[tokenizer, remover, cv, idf, lr])

# Hyperparameter Tuning for Logistic Regression
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 0.5]).addGrid(lr.elasticNetParam, [0.0, 0.1, 0.5]).build()

evaluator = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")

crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5, seed=42)

                                                                                

In [7]:
# Fit the model
cvModel = crossval.fit(train_df)

# Make predictions
predictions = cvModel.transform(test_df)

23/09/19 14:40:29 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


In [8]:
# Model Evaluation
accuracy = evaluator.evaluate(predictions)
print("Accuracy: {:.4f}".format(accuracy))

# Classification Report
y_true = predictions.select("label").rdd.flatMap(lambda x: x).collect()
y_pred = predictions.select("prediction").rdd.flatMap(lambda x: x).collect()
report = classification_report(y_true, y_pred, target_names=TARGET_CLASSES, output_dict=True)

print("Classification Report for Testing Data:")
for topic, metrics in report.items():
    if topic == 'accuracy':
        print(f"Accuracy: {metrics:.4f}")
    else:
        print(f"Topic: {topic}")
        print(f"Precision: {metrics['precision']:.4f}")
        print(f"Recall: {metrics['recall']:.4f}")
        print(f"F1-Score: {metrics['f1-score']:.4f}")
        print(f"Support: {metrics['support']:.0f}")
        print()

Accuracy: 0.6636


                                                                                

Classification Report for Testing Data:
Topic: money
Precision: 0.4828
Recall: 0.5185
F1-Score: 0.5000
Support: 27

Topic: food
Precision: 0.6774
Recall: 0.7241
F1-Score: 0.7000
Support: 29

Topic: job
Precision: 0.3778
Recall: 0.6538
F1-Score: 0.4789
Support: 26

Topic: life
Precision: 0.8182
Recall: 0.7500
F1-Score: 0.7826
Support: 24

Topic: music
Precision: 0.7000
Recall: 0.5385
F1-Score: 0.6087
Support: 26

Topic: media
Precision: 0.8000
Recall: 0.6957
F1-Score: 0.7442
Support: 23

Topic: movie
Precision: 0.4286
Recall: 0.5000
F1-Score: 0.4615
Support: 30

Topic: sexual
Precision: 0.7059
Recall: 0.5455
F1-Score: 0.6154
Support: 22

Topic: health
Precision: 0.8750
Recall: 0.8750
F1-Score: 0.8750
Support: 24

Topic: kid
Precision: 0.9048
Recall: 0.7600
F1-Score: 0.8261
Support: 25

Topic: game
Precision: 0.9444
Recall: 0.8095
F1-Score: 0.8718
Support: 21

Topic: book
Precision: 0.6667
Recall: 0.4706
F1-Score: 0.5517
Support: 17

Topic: tech
Precision: 0.8125
Recall: 0.8125
F1-Score:

In [12]:
# The accuracy is good enough (as tested offline with new data) --> we save the model:
model_path = "trained-model"
cvModel.bestModel.save(model_path)

                                                                                

In [1]:
spark.stop()

NameError: name 'spark' is not defined