In [0]:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

spark = (SparkSession
         .builder
         .appName("ML Model")
         .getOrCreate())

sc = spark.sparkContext

In [0]:
posts = spark.read.parquet("/mnt/SultanAlmalki/Posts/*")
ml_model = "/mnt/SultanAlmalki/model"
stringindexer = "/mnt/SultanAlmalki/stringindexer"




In [0]:
# User defined function
def predictions_udf(df, ml_model, stringindexer):
    from pyspark.sql.functions import col, regexp_replace, lower, trim
    from pyspark.ml import PipelineModel

    # Filter out empty body text
    df = df.filter("Body is not null")
    # Making sure the naming of the columns are consistent with the model
    df = df.select(col("Body").alias("text"), col("Tags"))
    # Preprocessing of the feature column
    cleaned = df.withColumn('text', regexp_replace('text', r"http\S+", "")) \
                    .withColumn('text', regexp_replace('text', r"[^a-zA-z]", " ")) \
                    .withColumn('text', regexp_replace('text', r"\s+", " ")) \
                    .withColumn('text', lower('text')) \
                    .withColumn('text', trim('text')) 

    # Load in the saved pipeline model
    model = PipelineModel.load(ml_model)

    # Making the prediction
    prediction = model.transform(df)

    predicted = prediction.select(col('text'), col('Tags'), col('prediction'))

    # Decoding the indexer
    from pyspark.ml.feature import StringIndexerModel, IndexToString

    # Load in the StringIndexer that was saved
    indexer = StringIndexerModel.load(stringindexer)

    # Initialize the IndexToString converter
    i2s = IndexToString(inputCol = 'prediction', outputCol = 'decoded', labels = indexer.labels)
    converted = i2s.transform(predicted)

    # Display the important columns
    return converted

In [0]:
result = predictions_udf(posts,ml_model, stringindexer)


In [0]:
# change the column name 
topics = result.withColumnRenamed('decoded', 'topic').select('topic')

# Aggregate the topics and calculate the total qty of each topic
topic_qty = topics.groupBy(col("topic")).agg(count('topic').alias('qty')).orderBy(desc('qty'))

In [0]:
# define this function

def crt_sgl_file(result_path):
        # write the result to a folder container several files
        path = "/mnt/SultanAlamlki/BI/ml_result"
        topic_qty.write.option("delimiter", ",").option("header", "true").mode("overwrite").csv(path)

        # list the folder, find the csv file 
        filenames = dbutils.fs.ls(path)
        name = ''
        for filename in filenames:
            if filename.name.endswith('csv'):
                org_name = filename.name

        # copy the csv file to the path you want to save, in this example, we use  "/mnt/deBDProject/BI/ml_result.csv"
        dbutils.fs.cp(path + '/'+ org_name, result_path)

        # delete the folder
        dbutils.fs.rm(path, True)

        print('single file created')

In [0]:
# run the function
result_path = "/mnt/SultanAlmalki/BI/ml_result.csv"

crt_sgl_file(result_path)

single file created


In [0]:
result = predictions_udf(posts, ml_model, stringindexer)


Category:
categorize the data and count the quantity in each category

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, sum as sum_func

spark = SparkSession.builder.appName("Topic Categorization").getOrCreate()

ml_result_df = spark.read.option("header", "true").option("inferSchema", "true").csv("/mnt/SultanAlmalki/BI/ml_result.csv")



In [0]:
def categorize_topic(topic):
    return when(topic.isin("c#", "java", "javascript", "c++", "python", "objective-c", "ruby", "c"), "Programming Languages") \
           .when(topic.isin("hibernate", "jquery", "linq"), "Frameworks and Libraries") \
           .when(topic.isin("php", "asp.net", "ruby-on-rails", ".net", "kohana", "drupal", "cakephp", "silverlight"), "Web Development") \
           .when(topic.isin("android", "iphone", "ios", "cocoa-touch", "blackberry", "windows-phone-7"), "Mobile Development") \
           .when(topic.isin("mysql", "sql", "ms-access", "sql-server-2008", "sql-server", "sqlite", "database"), "Databases") \
           .when(topic.isin("wpf", "winforms", "winapi", "forms"), "Desktop Development") \
           .when(topic == "hadoop", "Big Data") \
           .when(topic.isin("heap", "algorithm"), "Data Structures and Algorithms") \
           .when(topic == "pdf", "File Formats") \
           .when(topic == "matlab", "Scientific Computing") \
           .when(topic == "gps", "Location and Mapping") \
           .when(topic == "mvvm", "Design Patterns") \
           .when(topic.isin("visual-studio", "eclipse", "xcode"), "IDEs and Tools") \
           .when(topic.isin("categories", "properties", "regex", "performance", "warnings"), "General") \
           .when(topic.isin("reportingservices-2005", "reporting-services"), "Reporting") \
           .when(topic == "plot", "Data Visualization") \
           .when(topic == "audio", "Multimedia")\
           .otherwise("Other")

categorized_df = ml_result_df.withColumn("category", categorize_topic(col("topic")))
category_totals = categorized_df.groupBy("category").agg(sum_func("qty").alias("total_qty")).orderBy("total_qty", ascending=False)



In [0]:
# Repartition the DataFrame into a single partition
category_totals = category_totals.coalesce(1)

# Save the result as a CSV file in your Azure Storage
output_path = "/mnt/SultanAlmalki/BI/categorized_results.csv"
category_totals.write.option("header", "true").mode("overwrite").csv(output_path)