In [0]:
# import necessary libaries
from pyspark.sql.functions import *

In [0]:
# Creating Spark Session
from pyspark.sql import SparkSession

spark = (SparkSession
         .builder
         .appName("ML Model")
         .getOrCreate())

sc = spark.sparkContext

####Load the ML model and the stringIdexer generated, and also the Posts file

In [0]:
posts = spark.read.parquet("/mnt/adls_test/Posts/*")
ml_model = "/mnt/adls_test/model"
stringindexer = "/mnt/adls_test/stringindexer"

##Create a User Defined Function to filter and transform the data

In [0]:
# User defined function
def predictions_udf(df, ml_model, stringindexer):
    from pyspark.sql.functions import col, regexp_replace, lower, trim
    from pyspark.ml import PipelineModel

    # Filter out empty body text
    df = df.filter("Body is not null")
    # Making sure the naming of the columns are consistent with the model
    df = df.select(col("Body").alias("text"), col("Tags"))
    # Preprocessing of the feature column
    cleaned = df.withColumn('text', regexp_replace('text', r"http\S+", "")) \
                    .withColumn('text', regexp_replace('text', r"[^a-zA-z]", " ")) \
                    .withColumn('text', regexp_replace('text', r"\s+", " ")) \
                    .withColumn('text', lower('text')) \
                    .withColumn('text', trim('text')) 

    # Load in the saved pipeline model
    model = PipelineModel.load(ml_model)

    # Making the prediction
    prediction = model.transform(df)

    predicted = prediction.select(col('text'), col('Tags'), col('prediction'))

    # Decoding the indexer
    from pyspark.ml.feature import StringIndexerModel, IndexToString

    # Load in the StringIndexer that was saved
    indexer = StringIndexerModel.load(stringindexer)

    # Initialize the IndexToString converter
    i2s = IndexToString(inputCol = 'prediction', outputCol = 'decoded', labels = indexer.labels)
    converted = i2s.transform(predicted)

    # Display the important columns
    return converted

##Run the UDF and generate the result

In [0]:
result = predictions_udf(posts,ml_model, stringindexer)

##Summarize which topics are the most popular

In [0]:
# change the column name
topics = result.withColumnRenamed('decoded', 'topic').select('topic')
# Aggregate the topics and calculate the total qty of each topic
topic_qty = topics.groupBy(col("topic")).agg(count('topic').alias('qty')).orderBy(desc('qty'))

In [0]:
topic_qty.show()

+--------------------+---+
|               topic|qty|
+--------------------+---+
|agent-based-modeling|  4|
|   memory-management|  1|
|         multi-agent|  1|
+--------------------+---+



#Save the result file to the BI folder

In [0]:
# define this function
def crt_sgl_file(result_path):
# write the result to a folder container several files
    path = "/mnt/adls_test/BI/ml_result"
    topic_qty.write.option("delimiter", ",").option("header", "true").mode("overwrite").csv(path)
# list the folder, find the csv file
    filenames = dbutils.fs.ls(path)
    name = ''
    for filename in filenames:
        if filename.name.endswith('csv'):
            org_name = filename.name

# copy the csv file to the path you want to save, in this example, we use "/mnt/adls_test/BI/ml_result.csv"
    dbutils.fs.cp(path + '/'+ org_name, result_path)
# delete the folder
    dbutils.fs.rm(path, True)
    print('single file created')

In [0]:
# run the function
result_path = "/mnt/adls_test/BI/ml_result.csv"
crt_sgl_file(result_path)

single file created
