## STEP 1. Join tables and filter data

#### 1.1 Prepare necessary libraries and load data

In [0]:
# Import necessary libraries and functions
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, translate, trim, explode, regexp_replace, col, lower
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.classification import NaiveBayes
from pyspark.sql.functions import col



In [0]:
# Creating Spark Session
spark = (SparkSession
         .builder
         .appName("ML Model")
         .getOrCreate())

sc = spark.sparkContext

In [0]:
# Read in the tables

posts = spark.read.parquet("/tmp/project/posts.parquet")
postType = spark.read.parquet("/tmp/project/PostType.parquet")
Users = spark.read.parquet("/tmp/project/user.parquet")

#### 1.2 Join the tables Posts and postTypes by it post type id

In [0]:
# at this moment, we only use Posts and posttypes to train the model. so let's join them iwith the posttype id. 

df = posts.join(postType, posts.PostTypeId == postType.id)
# display(df)

#### 1.3 Filter the data

In the posttypes table, there is a column called `Type` which indicates if the posts is a question or an answer. We only need the 'question' entires. For these 'Question' rows, we will run machine learning model on the join the 'Body' column of the 'Posts' table. To tell what topic this post is about.

In [0]:
# Filter the dataframe to only include questions
df = df.filter(col("Type") == "Question")
# display(df)

In [0]:
# Formatting the 'Body' and `Tag` columns for machine learning training
df = (df.withColumn('Body', regexp_replace(df.Body, r'<.*?>', '')) # Transforming HTML code to strings
      .withColumn("Tags", split(trim(translate(col("Tags"), "<>", " ")), " ")) # Making a list of the tags
)

# display(df)

#### 1.4 Create a checkpoint to save the dataframe to file only contain the `Body` and `Tag` we need. 

In [0]:
df = df.select(col("Body").alias("text"), col("Tags"))

In [0]:
# Producing the tags as individual tags instead of an array
# This is duplicating the posts for each possible tag
df = df.select("text", explode("Tags").alias("tags"))
# display(df)

## STEP 2. Based on the above dataframe, prepare data from machine learning

#### 2.1. Text Cleaning Preprocessing

`pyspark.sql.functions.regexp_replace` is used to process the text

1. Remove URLs such as `http://stackoverflow.com`
2. Remove special characters
3. Substituting multiple spaces with single space
4. Lowercase all text
5. Trim the leading/trailing whitespaces

In [0]:
# Preprocessing the data
df = df.withColumn('text', regexp_replace('text', r"http\S+", "")) \
                    .withColumn('text', regexp_replace('text', r"[^a-zA-z]", " ")) \
                    .withColumn('text', regexp_replace('text', r"\s+", " ")) \
                    .withColumn('text', lower('text')) \
                    .withColumn('text', trim('text')) 
# display(df)

In [0]:

df =  df.withColumn("tags", regexp_replace("tags", r"[^a-zA-z]", " ")) \
      .withColumn("tags", regexp_replace("tags", r"\s+", " ")) \
      .withColumn("tags", lower("tags")) \
      .withColumn("tags", trim("tags"))    

## STEP 3. Model training

### 3.1 Data Prepration 

In [0]:
# Preparing the data
# Step 1: Creating the joined table
df = posts.join(postType, posts.PostTypeId == postType.id)
# Step 2: Selecting only Question posts
df = df.filter(col("Type") == "Question")
# Step 3: Formatting the raw data
df = (df.withColumn('Body', regexp_replace(df.Body, r'<.*?>', ''))
      .withColumn("Tags", split(trim(translate(col("Tags"), "<>", " ")), " "))
)
# Step 4: Selecting the columns
df = df.select(col("Body").alias("text"), col("Tags"))
# Step 5: Getting the tags
df = df.select("text", explode("Tags").alias("tags"))

df =  df.withColumn("tags", regexp_replace("tags", r"[^a-zA-z]", " ")) \
      .withColumn("tags", regexp_replace("tags", r"\s+", " ")) \
      .withColumn("tags", lower("tags")) \
      .withColumn("tags", trim("tags"))  
# Step 6: Clean the text
df = df.withColumn('text', regexp_replace('text', r"http\S+", "")) \
                    .withColumn('text', regexp_replace('text', r"[^a-zA-z]", " ")) \
                    .withColumn('text', regexp_replace('text', r"\s+", " ")) \
                    .withColumn('text', lower('text')) \
                    .withColumn('text', trim('text')) 
# Step 7: Initializing the transfomers
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5)
label_encoder = StringIndexer(inputCol = "tags", outputCol = "label")


### 3.2 Models

#### Model 1: Pipeline 

In [0]:
# Machine Learning
# Step 1: Train Test Split
train, test = df.randomSplit([0.9, 0.1], seed=20200819)
lr = LogisticRegression(maxIter=100)
# Step 3: Creating the pipeline
pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, label_encoder, lr])
# Step 4: Fitting and transforming (predicting) using the pipeline
pipeline_model = pipeline.fit(train)
predictions = pipeline_model.transform(test)

In [0]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
roc_auc2 = evaluator.evaluate(predictions)
accuracy2 = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count())
print("Accuracy Score: {0:.4f}".format(accuracy2))
print("ROC-AUC: {0:.4f}".format(roc_auc2))

Accuracy Score: 0.3665
ROC-AUC: 0.3386


#### Model 2: Logistic Regression for classes greater than 1

In [0]:
#WORK
from pyspark.sql.functions import col
# Get the count of each class
class_counts = df.groupBy("tags").count()
# Filter the class_counts dataframe to keep only rows where count > 1
filtered_counts = class_counts.filter(col("count") > 1)
# Join the original dataframe with the filtered_counts dataframe
filtered_df = df.join(filtered_counts, "tags", "left")
# Drop the rows where count value = 1
filtered_df = filtered_df.filter(col("count").isNotNull())
filtered_counts = filtered_df.filter(col("count") > 1)

# Machine Learning
# Step 1: Train Test Split
train, test = filtered_counts.randomSplit([0.9, 0.1], seed=20200819)
lr = LogisticRegression(maxIter=100)

# Step 3: Creating the pipeline
pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, label_encoder, lr])

# Step 4: Fitting and transforming (predicting) using the pipeline
pipeline_model = pipeline.fit(train)
predictions = pipeline_model.transform(test)

In [0]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
roc_auc2 = evaluator.evaluate(predictions)
accuracy2 = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count())
print("Accuracy Score: {0:.4f}".format(accuracy2))
print("ROC-AUC: {0:.4f}".format(roc_auc2))

Accuracy Score: 0.4634
ROC-AUC: 0.4427


#### Model 3: NaiveBayes for classes greater than 1


In [0]:
#WORK
# Get the count of each class
class_counts = df.groupBy("tags").count()
# Filter the class_counts dataframe to keep only rows where count > 1
filtered_counts = class_counts.filter(col("count") > 1)
# Join the original dataframe with the filtered_counts dataframe
filtered_df = df.join(filtered_counts, "tags", "left")
# Drop the rows where count value = 1
filtered_df = filtered_df.filter(col("count").isNotNull())
filtered_counts = filtered_df.filter(col("count") > 1)

# Machine Learning
# Step 1: Train Test Split
train, test = filtered_counts.randomSplit([0.9, 0.1], seed=20200819)

# Step 3: Creating the pipeline
pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, label_encoder, nb])

nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# Step 4: Fitting and transforming (predicting) using the pipeline
pipeline_model = pipeline.fit(train)
predictions = pipeline_model.transform(test)


In [0]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
roc_auc2 = evaluator.evaluate(predictions)
accuracy2 = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count())
print("Accuracy Score: {0:.4f}".format(accuracy2))
print("ROC-AUC: {0:.4f}".format(roc_auc2))

Accuracy Score: 0.4744
ROC-AUC: 0.4957


#### Model 4: Logisitic Regression for top 20 classes

In [0]:
# Get the count of each class
class_counts_for1 = df.groupBy("tags").count()

# Filter the class_counts dataframe to keep only rows where count > 1
filtered_counts = class_counts_for1.filter(col("count") > 1)
filtered_df.display()

# # Join the original dataframe with the filtered_counts dataframe
# filtered_df = df.join(filtered_counts, "tags", "left")

# # Drop the rows where count value = 1
# filtered_df = filtered_df.filter(col("count").isNotNull())

# # Display the filtered dataframe
# filtered_df.display()

In [0]:
# Get the count of each class
class_counts1 = df.groupBy("tags").count()

# Order the classes based on count in descending order
top_20_tags = class_counts1.orderBy(col("count").desc()).limit(20)

# Print the order of the tags
top_20_tags.select("tags").display()
# top_20_tags = ordered_classes.show(20)
# top_20_tags.display()

tags
c
java
javascript
jquery
php
android
iphone
net
objective c
mysql


In [0]:
# Join the original dataframe with the filtered_counts dataframe
filtered_df = df.join(top_20_tags, "tags", "left")

# Drop the rows where count value = 1
filtered_df = filtered_df.filter(col("count").isNotNull())

# Display the filtered dataframe
filtered_df.display()

In [0]:
# Machine Learning
# Step 1: Train Test Split
train, test = filtered_df.randomSplit([0.8, 0.2], seed=20200819)
# Step 2: Initializing the transfomers
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5)
label_encoder = StringIndexer(inputCol = "tags", outputCol = "label")
lr = LogisticRegression(maxIter=100)
# Step 3: Creating the pipeline
pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, label_encoder, lr])
# Step 4: Fitting and transforming (predicting) using the pipeline
pipeline_model = pipeline.fit(train)
predictions = pipeline_model.transform(test)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
roc_auc2 = evaluator.evaluate(predictions)
accuracy2 = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count())

print("Accuracy Score: {0:.4f}".format(accuracy2))
print("ROC-AUC: {0:.4f}".format(roc_auc2))

Accuracy Score: 0.8065
ROC-AUC: 0.8077


#### Model 5: NaiveBayes for top 20 classes


In [0]:
# Step 1: Train Test Split
train, test = filtered_df.randomSplit([0.8, 0.2], seed=20200819)
# Step 2: Initializing the transfomers
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5)
label_encoder = StringIndexer(inputCol = "tags", outputCol = "label")

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
# Step 3: Creating the pipeline
pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, label_encoder, nb])
# Step 4: Fitting and transforming (predicting) using the pipeline
pipeline_model_nb = pipeline.fit(train)
predictions_nb = pipeline_model_nb.transform(test)

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
roc_auc_nb= evaluator.evaluate(predictions_nb)
accuracy_nb = predictions_nb.filter(predictions_nb.label == predictions_nb.prediction).count() / float(predictions_nb.count())

print("Accuracy Score: {0:.4f}".format(accuracy_nb))
print("ROC-AUC: {0:.4f}".format(roc_auc_nb))

Accuracy Score: 0.7903
ROC-AUC: 0.7927


#### Model 6: Logisitic Regression for top 10 classes


In [0]:
# Get the count of each class
class_counts1 = df.groupBy("tags").count()

# Order the classes based on count in descending order
top_10_tags = class_counts1.orderBy(col("count").desc()).limit(10)

# Print the order of the tags
top_10_tags.select("tags").display()


tags
c
java
javascript
jquery
php
android
iphone
net
objective c
html


In [0]:
# Join the original dataframe with the filtered_counts dataframe
filtered_df_10 = df.join(top_10_tags, "tags", "left")

# Drop the rows where count value = 1
filtered_df_10 = filtered_df_10.filter(col("count").isNotNull())

# Display the filtered dataframe
filtered_df_10.display()

In [0]:
# Machine Learning
# Step 1: Train Test Split
train, test = filtered_df_10.randomSplit([0.9, 0.1], seed=20200819)
# Step 2: Initializing the transfomers
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5)
label_encoder = StringIndexer(inputCol = "tags", outputCol = "label")
lr = LogisticRegression(maxIter=100)
# Step 3: Creating the pipeline
pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, label_encoder, lr])
# Step 4: Fitting and transforming (predicting) using the pipeline
pipeline_model = pipeline.fit(train)
predictions = pipeline_model.transform(test)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
roc_auc2 = evaluator.evaluate(predictions)
accuracy2 = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count())

print("Accuracy Score: {0:.4f}".format(accuracy2))
print("ROC-AUC: {0:.4f}".format(roc_auc2))

Accuracy Score: 0.8000
ROC-AUC: 0.7999


#### Model 7: NaiveBayes for top 10 classes


In [0]:
# Step 1: Train Test Split
train, test = filtered_df_10.randomSplit([0.9, 0.1], seed=20200819)
# Step 2: Initializing the transfomers
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5)
label_encoder = StringIndexer(inputCol = "tags", outputCol = "label")

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
# Step 3: Creating the pipeline
pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, label_encoder, nb])
# Step 4: Fitting and transforming (predicting) using the pipeline
pipeline_model_nb = pipeline.fit(train)
predictions_nb = pipeline_model_nb.transform(test)

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
roc_auc_nb= evaluator.evaluate(predictions_nb)
accuracy_nb = predictions_nb.filter(predictions_nb.label == predictions_nb.prediction).count() / float(predictions_nb.count())

print("Accuracy Score: {0:.4f}".format(accuracy_nb))
print("ROC-AUC: {0:.4f}".format(roc_auc_nb))

Accuracy Score: 0.7231
ROC-AUC: 0.7046
