## Model Prediction

### Import packages and load data

In [0]:
# Import packages
from pyspark.sql.functions import udf, col, concat, lit, monotonically_increasing_id, regexp_replace, lower, explode, split, collect_list, array_contains, instr
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, Normalizer, CountVectorizer, StringIndexer, StringIndexerModel, IDFModel
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.sql.types import ArrayType, StringType
from nltk.stem import PorterStemmer
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import IndexToString, StringIndexer
from pyspark.ml.feature import CountVectorizerModel

# Loading files and models
posts = spark.read.parquet('/mnt/bd-project/Landing/Posts/*')

model = LogisticRegressionModel.load('/mnt/bd-project/Models/lr_model')
counterVec = CountVectorizerModel.load('/mnt/bd-project/Models/cv_model')
lables = StringIndexerModel.load('/mnt/bd-project/Models/stringindexer')
idf_model = IDFModel.load('/mnt/bd-project/Models/tfidf')

### Data Preprocessing

In [0]:
# Data cleaning
df = posts[['id', 'Body', 'Title']]
df = df.withColumn("text", concat(col("Title"), lit(" "), col("Body")))

df = df.withColumn("text", regexp_replace("text", r"\W+", " ")) \
     .withColumn("text", regexp_replace("text", r"\b\w\b", "")) \
     .withColumn("text", regexp_replace("text", "_", " ")) \
     .withColumn("text", regexp_replace("text", r"\s+", " ")) \
     .withColumn("text", lower("text"))

df = df[df['text'].isNotNull()]

In [0]:
# Tokenization
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
tokenized = tokenizer.transform(df)

# Removing stop words
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="stop_words")
stopword = stopword_remover.transform(tokenized)

# Stemming
stemmer_func = udf(lambda words: [PorterStemmer().stem(word) for word in words], ArrayType(StringType()))
stemmed = stopword.withColumn("stemmed", stemmer_func(col("stop_words")))

# Removing additional stop words (highly frequent words based on EDA)
custom_stop_words = ['code', 'use', 'pre', 'get', 'want', 'like', 'thank', 'tri', 'work', 'way', 'need']

custom_remover = StopWordsRemover(inputCol="stemmed", outputCol="filtered", stopWords = custom_stop_words)
filtered = custom_remover.transform(stemmed)

# Count vectorizer
text_cv = counterVec.transform(filtered)

# Performing TF-IDF
final = idf_model.transform(text_cv)

### Label prediction

In [0]:
# Predicting labels of new posts
final = final.cache()
predictions = model.transform(final)

In [0]:
@udf(ArrayType(StringType()))
def extract_labels(probabilities, threshold=0.2):
    labels = [int(i) for i, prob in enumerate(probabilities) if prob > threshold]
    return labels

output = predictions.withColumn("predicted_labels", extract_labels(predictions.probability))

In [0]:
indexer = lables
exploded = output.withColumn("value", explode("predicted_labels"))
numeric = exploded.withColumn("indexed_value", col("value").cast("integer"))

i2s = IndexToString(inputCol="indexed_value", outputCol="temp_decoder", labels=indexer.labels)
temp_decoded = i2s.transform(numeric)

temp = temp_decoded.withColumnRenamed("Body", "Body_two") \
    .groupBy("Body_two").agg(collect_list("temp_decoder").alias("labels_decoder"))

output = output.join(temp, output.Body == temp.Body_two, 'inner')

i2s = IndexToString(inputCol="prediction", outputCol="tag", labels=indexer.labels)
output = i2s.transform(output)

output = output[['id', 'prediction', 'tag', 'predicted_labels', 'labels_decoder']]

In [0]:
display(output)

In [0]:

save = output[['id', 'prediction', 'tag']]


# define this function
def crt_sgl_file(result_path):
        # write the result to a folder container several files
        path = "/mnt/bd-project/Predictions/Temp_Parq/"
        save.write.option("delimiter", ",").option("header", "true").mode("overwrite").csv(path)

        # list the folder, find the csv file 
        filenames = dbutils.fs.ls(path)
        name = ''
        for filename in filenames:
            if filename.name.endswith('csv'):
                org_name = filename.name

        # copy the csv file to the path you want to save, in this example, we use  "/mnt/deBDProject/BI/ml_result.csv"
        dbutils.fs.cp(path + '/'+ org_name, result_path)

        # delete the folder
        dbutils.fs.rm(path, True)

        print('single file created')

In [0]:
from pyspark.sql.functions import col, concat_ws
from datetime import datetime

# Generate timestamp
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

# run the function
result_path = "/mnt/bd-project/Predictions/predictions_{}.csv".format(timestamp)

crt_sgl_file(result_path)