## STEP 0. Loading the data into dataframes

In [0]:
from pyspark.sql import SparkSession

# Initialize a SparkSession with the application name "Table Loading"
spark = (SparkSession
         .builder
         .appName("Table Loading")
         .getOrCreate())

# Get the SparkContext from the SparkSession
sc = spark.sparkContext

#### 0.1 Creating the `Posts` dataframe

In [0]:
# File location -- recall our mount storage workshop, data was mounted into '/mnt/deBDProject'
file_location = "/mnt/deBDProject/ml_training/Posts/*"

# Read the Parquet files from the specified file location into a DataFrame
posts = spark.read \
  .parquet(file_location)

display(posts)

#### 0.2 Creating the `posttypes` dataframe

In [0]:
# Creating the schema for posttypes table
from pyspark.sql.types import *

# Creating the schema for the posttypes table
PT_schema = StructType([
    StructField("id", IntegerType(), True), # Define the "id" field with IntegerType, allowing null values
    StructField("Type", StringType(), True) # Define the "Type" field with StringType, allowing null values
])

In [0]:
# Creating the posttypes dataframe
file_location = "/mnt/deBDProject/ml_training/PostTypes.txt"

# Read the CSV file into a DataFrame with specified options and schema
postType = (spark.read
  .option("header", "true") 
  .option("sep", ",") 
  .schema(PT_schema) 
  .csv(file_location)) 

display(postType)

#### 0.3 Creating the `users` dataframe

In [0]:
# Creating the schema for the users table
from pyspark.sql.types import *

# Define the schema for the users table
# The 'True' parameter indicates that the corresponding field is nullable.
users_schema = StructType([ 
    StructField("id", IntegerType(), True),
    StructField("Age", IntegerType(), True),
    StructField("CreationDate", DateType(), True),
    StructField("DisplayName", StringType(), True),
    StructField("DownVotes", IntegerType(), True),
    StructField("EmailHash", StringType(), True),
    StructField("Location", StringType(), True),
    StructField("Reputation", IntegerType(), True),
    StructField("UpVotes", IntegerType(), True),
    StructField("Views", IntegerType(), True),
    StructField("WebsiteUrl", StringType(), True),
    StructField("AccountId", IntegerType(), True)
])

In [0]:
# Creating the users dataframe
file_location = "/mnt/deBDProject/ml_training/users.csv"

# Read the CSV file into a DataFrame with specified options and schema
users = (spark.read
  .option("header", "true")
  .option("sep", ",")
  .schema(users_schema)
  .csv(file_location))

display(users)

#### 0.4. Saving the dataframes for easy retrieval

In [0]:
# Save the 3 tables to databricks local file system
posts.write.parquet("/tmp/project/posts.parquet")
postType.write.parquet("/tmp/project/PostType.parquet")
users.write.parquet("/tmp/project/user.parquet")

In [0]:
# review the local file system
display(dbutils.fs.ls("/tmp/project/"))

## STEP 1. Join tables and filter data

#### 1.1 Prepare necessary libraries and load data


In [0]:
# Import necessary libraries and functions
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, translate, trim, explode, regexp_replace, col, lower

In [0]:
# Creating Spark Session
spark = (SparkSession
         .builder
         .appName("ML Model")
         .getOrCreate())

sc = spark.sparkContext

In [0]:
# Loads data from Parquet files stored in Azure Data Lake Storage into Spark DataFrames.
posts = spark.read.parquet("/tmp/project/posts.parquet")
postType = spark.read.parquet("/tmp/project/PostType.parquet")
Users = spark.read.parquet("/tmp/project/user.parquet")

#### 1.2 Join the tables Posts and postTypes by it post type id

In [0]:
# Perform an inner join between the 'posts' DataFrame and the 'postType' DataFrame 
# based on the equality of 'PostTypeId' column from 'posts' and 'id' column from 'postType'
df = posts.join(postType, posts.PostTypeId == postType.id)

display(df)

#### 1.3 Filter the data

In [0]:
# Filter the DataFrame 'df' to retain only rows where the value in the 'Type' column is "Question"
df = df.filter(col("Type") == "Question")
display(df)

In [0]:
# Transforming HTML code to strings in the 'Body' column by removing HTML tags
# Making a list of the tags by splitting the 'Tags' column based on '<' and '>'
df = (df.withColumn('Body', regexp_replace(df.Body, r'<.*?>', '')) # Transforming HTML code to strings
      .withColumn("Tags", split(trim(translate(col("Tags"), "<>", " ")), " ")) # Making a list of the tags
)

display(df)

In [0]:
# Filter the DataFrame 'df' to retain only rows where the value in the 'Type' column is "Question"
df = df.filter(col("Type") == "Question")
display(df)

#### 1.4 Create a checkpoint to save the dataframe to file only contain the `Body` and `Tag` we need. 

In [0]:
# Selecting the 'Body' column and renaming it to "text", retaining the 'Tags' column
df = df.select(col("Body").alias("text"), col("Tags"))

In [0]:
# Producing the tags as individual tags instead of an array
# This is duplicating the posts for each possible tag
df = df.select("text", explode("Tags").alias("tags"))
display(df)

In [0]:
# saving the file as a checkpoint (in case the cluster gets terminated)
df.write.parquet("/tmp/project.df.parquet")

In [0]:
# Saving the dataframe to memory for repetitive use
df.cache()
df.count()

## STEP 2. Based on the above dataframe, prepare data from machine learning

#### 2.1. Text Cleaning Preprocessing


In [0]:
# text cleaning and preprocessing steps include removing URLs, special characters, multiple spaces, lowercase all text, and trim whitespaces.
cleaned = df.withColumn('text', regexp_replace('text', r"http\S+", "")) \
                    .withColumn('text', regexp_replace('text', r"[^a-zA-z]", " ")) \
                    .withColumn('text', regexp_replace('text', r"\s+", " ")) \
                    .withColumn('text', lower('text')) \
                    .withColumn('text', trim('text')) 



## STEP 3. Machine Learning Model Training



#### 3.1 Feature Transformer

##### 3.1.1 Tokenizer

In [0]:
# Tokenizing the text data in the 'text' column using the Tokenizer
from pyspark.ml.feature import Tokenizer

# Create a Tokenizer instance with inputCol="text" and outputCol="tokens"
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")

# Transform the DataFrame 'cleaned' with the tokenizer
tokenized = tokenizer.transform(cleaned)

display(tokenized)


##### 3.1.2 Stopword Removal

In [0]:
# 3.1.2 Stopword Removal
from pyspark.ml.feature import StopWordsRemover

# Create a StopWordsRemover instance to remove stopwords from tokenized text
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")

# Apply stopword removal to the tokenized DataFrame
stopword = stopword_remover.transform(tokenized)

display(stopword)


##### 3.1.3 CountVectorizer (TF - Term Frequency)

In [0]:
# 3.1.3 CountVectorizer (TF - Term Frequency)
# TF - Term Frequency: helps count how many times each word appears in text data and puts those counts into a new column.
from pyspark.ml.feature import CountVectorizer

# Create a CountVectorizer instance with vocabSize and input/output columns specified
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')

# Fit the CountVectorizer to the DataFrame 'stopword' to create a model
cv_model = cv.fit(stopword)

# Transform the DataFrame 'stopword' using the CountVectorizer model to get the term frequency counts
text_cv = cv_model.transform(stopword)

display(text_cv)


##### 3.1.4 TF-IDF Vectorization

In [0]:
# 3.1.4 TF-IDF Vectorization
from pyspark.ml.feature import HashingTF, IDF

# IDF Vectorization helps us figure out how important each word is in our text data and puts those importance scores into a new column.

# Create an IDF instance specifying input/output columns and minDocFreq to remove sparse terms
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5)

# Fit IDF to the DataFrame 'text_cv' to create an IDF model
idf_model = idf.fit(text_cv)

# Transform the DataFrame 'text_cv' using the IDF model to get the TF-IDF scores
text_idf = idf_model.transform(text_cv)

display(text_idf)


#### 3.2 Label Encoding

In [0]:
# 3.2 Label Encoding
from pyspark.ml.feature import StringIndexer

# Create a StringIndexer instance to encode the "tags" column into numerical labels
label_encoder = StringIndexer(inputCol="tags", outputCol="label")

# Fit the StringIndexer to the DataFrame 'text_idf' to create a label encoding model
le_model = label_encoder.fit(text_idf)

# Transform the DataFrame 'text_idf' using the label encoding model to get the encoded labels
final = le_model.transform(text_idf)

display(final)


#### 3.3 Model Training

In [0]:
from pyspark.ml.classification import LogisticRegression

# Create a LogisticRegression instance with maxIter specified
lr = LogisticRegression(maxIter=100)

# Fit the Logistic Regression model to the DataFrame 'final'
lr_model = lr.fit(final)

# Make predictions using the trained Logistic Regression model on the DataFrame 'final'
predictions = lr_model.transform(final)

display(predictions)


#### 3.4 Model Evalution

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create a MulticlassClassificationEvaluator instance specifying the column for predictions
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

# Calculate the ROC AUC score using the evaluator on the 'predictions' DataFrame
roc_auc = evaluator.evaluate(predictions)

# Calculate the accuracy by counting correct predictions and dividing by the total number of predictions
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count())

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(roc_auc))

#### 3.5 Create a Pipeline

In [0]:
# Importing all the libraries
from pyspark.sql.functions import split, translate, trim, explode, regexp_replace, col, lower
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Preparing the data
# Step 1: Creating the joined table
df = posts.join(postType, posts.PostTypeId == postType.id)
# Step 2: Selecting only Question posts
df = df.filter(col("Type") == "Question")
# Step 3: Formatting the raw data
df = (df.withColumn('Body', regexp_replace(df.Body, r'<.*?>', ''))
      .withColumn("Tags", split(trim(translate(col("Tags"), "<>", " ")), " "))
)
# Step 4: Selecting the columns
df = df.select(col("Body").alias("text"), col("Tags"))
# Step 5: Getting the tags
df = df.select("text", explode("Tags").alias("tags"))
# Step 6: Clean the text
cleaned = df.withColumn('text', regexp_replace('text', r"http\S+", "")) \
                    .withColumn('text', regexp_replace('text', r"[^a-zA-z]", " ")) \
                    .withColumn('text', regexp_replace('text', r"\s+", " ")) \
                    .withColumn('text', lower('text')) \
                    .withColumn('text', trim('text')) 

# Machine Learning
# Step 1: Train Test Split
train, test = cleaned.randomSplit([0.9, 0.1], seed=20200819)
# Step 2: Initializing the transfomers
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5)
label_encoder = StringIndexer(inputCol = "tags", outputCol = "label")
lr = LogisticRegression(maxIter=100)
# Step 3: Creating the pipeline
pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, label_encoder, lr])
# Step 4: Fitting and transforming (predicting) using the pipeline
pipeline_model = pipeline.fit(train)
predictions = pipeline_model.transform(test)

## STEP 4. Save the Model file to Azure storage

In [0]:
# Saving model object to the /mnt/deBDProject directory. Yours name may be different.
pipeline_model.save('/mnt/deBDProject/model')

# Save the the String Indexer to decode the encoding. We need it in the future Sentiment Analysis.
le_model.save('/mnt/deBDProject/stringindexer')

In [0]:
# Review the directory
display(dbutils.fs.ls("/mnt/deBDProject/model"))