#### Use machine learning -- Sentiment analysis method to train a machine learning model. 

##### STEP 0. Loading the data into dataframes

In [0]:
# Creating a spark session
from pyspark.sql import SparkSession

spark = (SparkSession
         .builder
         .appName("Table Loading")
         .getOrCreate())

sc = spark.sparkContext

###### 0.1 Creating the Posts dataframe

In [0]:
# File location -- recall our mount storage workshop, data was mounted into '/mnt/BDProject'
file_location = "/mnt/BDProject/ml_training/Posts/*"

posts = spark.read \
  .parquet(file_location)


###### 0.2 Creating the posttypes dataframe

In [0]:
# Creating the schema for posttypes table
from pyspark.sql.types import *

PT_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("Type", StringType(), True)
])

In [0]:
# Creating the posttypes dataframe
file_location = "/mnt/BDProject/ml_training/PostTypes.txt"

postType = (spark.read
  .option("header", "true")
  .option("sep", ",")
  .schema(PT_schema)
  .csv(file_location))


###### 0.3 Creating the users dataframe

In [0]:
# Creating the schema for the users table
from pyspark.sql.types import *

users_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("Age", IntegerType(), True),
    StructField("CreationDate", DateType(), True),
    StructField("DisplayName", StringType(), True),
    StructField("DownVotes", IntegerType(), True),
    StructField("EmailHash", StringType(), True),
    StructField("Location", StringType(), True),
    StructField("Reputation", IntegerType(), True),
    StructField("UpVotes", IntegerType(), True),
    StructField("Views", IntegerType(), True),
    StructField("WebsiteUrl", StringType(), True),
    StructField("AccountId", IntegerType(), True)
])

In [0]:
# Creating the users dataframe
file_location = "/mnt/BDProject/ml_training/users.csv"

users = (spark.read
  .option("header", "true")
  .option("sep", ",")
  .schema(users_schema)
  .csv(file_location))


###### 0.4. Saving the dataframes

In [0]:
# Save the 3 tables to databricks local file system
posts.write.mode('overwrite').parquet("/tmp/project/posts.parquet")
postType.write.mode('overwrite').parquet("/tmp/project/PostType.parquet")
users.write.mode('overwrite').parquet("/tmp/project/user.parquet")

##### STEP 1. Join tables and filter data

###### 1.1 Prepare necessary libraries and load data

In [0]:
# Import necessary libraries and functions
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, translate, trim, explode, regexp_replace, col, lower, to_date

In [0]:
# Creating Spark Session
spark = (SparkSession
         .builder
         .appName("ML Model")
         .getOrCreate())

sc = spark.sparkContext

In [0]:
# Read in the tables

posts = spark.read.parquet("/tmp/project/posts.parquet")
postType = spark.read.parquet("/tmp/project/PostType.parquet")
Users = spark.read.parquet("/tmp/project/user.parquet")

###### 1.2 Join the tables Posts and postTypes by it post type id

In [0]:
# Use Posts and posttypes to train the model.

df = posts.join(postType, posts.PostTypeId == postType.id)

###### 1.3 Filter the data

In the posttypes table, there is a column called Type which indicates if the posts is a question or an answer. We only need the 'question' entires. For these 'Question' rows, we will run machine learning model on the join the 'Body' column of the 'Posts' table. To tell what topic this post is about.

In [0]:
# Filter the dataframe to only include questions
df = df.filter(col("Type") == "Question")

In [0]:
# Formatting the 'Body' and `Tag` columns for machine learning training
df = (df.withColumn('Body', regexp_replace(df.Body, r'<.*?>', '')) # Transforming HTML code to strings
      .withColumn("Tags", split(trim(translate(col("Tags"), "<>", " ")), " ")) # Making a list of the tags
      .withColumn("Date", to_date("CreationDate"))
)

###### 1.4 Create a checkpoint to save the dataframe to file only contain the Body, Tag, and date we need.

In [0]:
df = df.select(col("Body").alias("text"), col("Tags"), col("Date"))

In [0]:
# Producing the tags as individual tags instead of an array
# This is duplicating the posts for each possible tag
df = df.select("text", explode("Tags").alias("tags"), col("Date"))

In [0]:
# saving the file as a checkpoint (in case the cluster gets terminated)

df.write.mode('overwrite').parquet("/tmp/project.df.parquet")

In [0]:
# Saving the dataframe to memory for repetitive use
df.cache()
df.count()

##### STEP 2. Based on the above dataframe, prepare data for machine learning

###### 2.1. Text Cleaning Preprocessing

pyspark.sql.functions.regexp_replace is used to process the text

1. Remove URLs
2. Remove special characters
3. Substituting multiple spaces with single space
4. Lowercase all text
5. Trim the leading/trailing whitespaces

In [0]:
# Preprocessing the data
cleaned = df.withColumn('text', regexp_replace('text', r"http\S+", "")) \
                    .withColumn('text', regexp_replace('text', r"[^a-zA-z]", " ")) \
                    .withColumn('text', regexp_replace('text', r"\s+", " ")) \
                    .withColumn('text', lower('text')) \
                    .withColumn('text', trim('text')) 


##### STEP 3. Machine Learning Model Training

###### 3.1 Feature Transformer

3.1.1 Tokenizer

In [0]:
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
tokenized = tokenizer.transform(cleaned)

3.1.2 Stopword Removal

In [0]:
from pyspark.ml.feature import StopWordsRemover

stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
stopword = stopword_remover.transform(tokenized)

3.1.3 CountVectorizer (TF - Term Frequency)

CountVectorizer: converting a collection of text into a matrix of token counts.

vocabSize=2**16: the vocabulary size or the maximum number of unique words or tokens that the system can handle is set to 65,536

In [0]:
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
cv_model = cv.fit(stopword)
text_cv = cv_model.transform(stopword)

3.1.4 TF-IDF Vectorization

In [0]:
from pyspark.ml.feature import HashingTF, IDF

idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
idf_model = idf.fit(text_cv)
text_idf = idf_model.transform(text_cv)

###### 3.2 Label Encoding

In [0]:
from pyspark.ml.feature import StringIndexer

label_encoder = StringIndexer(inputCol = "tags", outputCol = "label")
le_model = label_encoder.fit(text_idf)
final = le_model.transform(text_idf)

###### 3.3 Model Training

In [0]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=100)

lr_model = lr.fit(final)

predictions = lr_model.transform(final)

###### 3.4 Model Evalution

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
roc_auc = evaluator.evaluate(predictions)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count())

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(roc_auc))

###### 3.5 Create a Pipeline

In [0]:
# Importing all the libraries
from pyspark.sql.functions import split, translate, trim, explode, regexp_replace, col, lower
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Preparing the data
# Step 1: Creating the joined table
df = posts.join(postType, posts.PostTypeId == postType.id)
# Step 2: Selecting only Question posts
df = df.filter(col("Type") == "Question")
# Step 3: Formatting the raw data
df = (df.withColumn('Body', regexp_replace(df.Body, r'<.*?>', ''))
      .withColumn("Tags", split(trim(translate(col("Tags"), "<>", " ")), " "))
      .withColumn("Date", to_date("CreationDate"))
)
# Step 4: Selecting the columns
df = df.select(col("Body").alias("text"), col("Tags"), col("Date"))
# Step 5: Getting the tags
df = df.select("text", explode("Tags").alias("tags"), col("Date"))
# Step 6: Clean the text
cleaned = df.withColumn('text', regexp_replace('text', r"http\S+", "")) \
                    .withColumn('text', regexp_replace('text', r"[^a-zA-z]", " ")) \
                    .withColumn('text', regexp_replace('text', r"\s+", " ")) \
                    .withColumn('text', lower('text')) \
                    .withColumn('text', trim('text')) 

# Machine Learning
# Step 1: Train Test Split
train, test = cleaned.randomSplit([0.9, 0.1], seed=20200819)
# Step 2: Initializing the transfomers
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5)
label_encoder = StringIndexer(inputCol = "tags", outputCol = "label")
lr = LogisticRegression(maxIter=100)
# Step 3: Creating the pipeline
pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, label_encoder, lr])
# Step 4: Fitting and transforming (predicting) using the pipeline
pipeline_model = pipeline.fit(train)
predictions = pipeline_model.transform(test)

##### STEP 4. Save the Model file to Azure storage

In [0]:
# Saving model object to the /mnt/BDProject directory.
pipeline_model.save('/mnt/BDProject/modelU')

# Save the the String Indexer to decode the encoding. We need it in the future Sentiment Analysis.
le_model.save('/mnt/BDProject/stringindexerU')

In [0]:
# Review the directory
display(dbutils.fs.ls("/mnt/BDProject/modelU"))