- This file is used for classifying all the articles and to see the result before we create arxiv_classifying.py

In [1]:
import sys
import pyspark
import time
from pyspark.sql import SparkSession 
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import col, split, when, count
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, LinearSVC, NaiveBayes
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.feature import StringIndexer, RegexTokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import concat_ws
from delta import DeltaTable, configure_spark_with_delta_pip

In [2]:
def Create_ML_pipline(ML_model = "NB"):
  # Convert the main_category column to numeric using StringIndexer
  labelIndexer = StringIndexer(inputCol="main_category", outputCol="label")

  # Define the regular expression tokenizer
  regexTokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="\\W")

  # Define the stop words remover
  stopWordsRemover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")

  # Define the TF-IDF Vectorizer
  countVectorizer = CountVectorizer(inputCol="filtered_tokens", outputCol="vectorize_features")
  idf = IDF(inputCol="vectorize_features", outputCol="features")

  if ML_model == 'LR': # Create logistic regression classifier     
    ML_Model = LogisticRegression(featuresCol="features", labelCol="label", maxIter=100)
  elif ML_model == 'RF': # Create a Random Forest classifier
    ML_Model = RandomForestClassifier(numTrees=100, maxDepth=5, labelCol="label", featuresCol="features")
  elif ML_model == 'NB': # Create a Naive Bayes classifier
    ML_Model = NaiveBayes(smoothing=1.0, modelType="multinomial", labelCol="label", featuresCol="features")

  # Define the Pipeline
  pipeline = Pipeline(stages=[labelIndexer, regexTokenizer, stopWordsRemover, countVectorizer, idf, ML_Model])

  return pipeline



In [3]:
# the final result for prediction will be saved in the path as a Delta table
delta_table_path = "hdfs:///Dat500_Group09/spark_result/final_result/arxiv_meta" 

# Set the configuration properties for Delta tables
builder = pyspark.sql.SparkSession.builder.appName("Arxiv_Classification_baseline") \
    .master('yarn') \
    .config("spark.sql.shuffle.partitions", 24)\
    .config('spark.executor.instances', 8)\
    .config("spark.executor.memory", "2g")\
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
    .config("spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite", "true")\
    .config("spark.databricks.delta.properties.defaults.autoOptimize.autoCompact", "true") 
        #.config('spark.driver.memory', "8g")\ take the default from the configuration file we set it 8gb

    
spark = configure_spark_with_delta_pip(builder).getOrCreate()


:: loading settings :: url = jar:file:/home/ubuntu/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c0e8b289-2d7c-4460-b5e9-841cf4c7f429;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.3.0 in central
	found io.delta#delta-storage;2.3.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
:: resolution report :: resolve 226ms :: artifacts dl 9ms
	:: modules in use:
	io.delta#delta-core_2.12;2.3.0 from central in [default]
	io.delta#delta-storage;2.3.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0

23/04/26 12:33:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/26 12:33:42 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
23/04/26 12:34:25 WARN Client: Same path resource file:///home/ubuntu/.ivy2/jars/io.delta_delta-core_2.12-2.3.0.jar added multiple times to distributed cache.
23/04/26 12:34:25 WARN Client: Same path resource file:///home/ubuntu/.ivy2/jars/io.delta_delta-storage-2.3.0.jar added multiple times to distributed cache.
23/04/26 12:34:25 WARN Client: Same path resource file:///home/ubuntu/.ivy2/jars/org.antlr_antlr4-runtime-4.8.jar added multiple times to distributed cache.


In [4]:
from pyspark import SparkConf

conf = SparkConf()
config_map = conf.getAll()
for key, value in config_map:
    print(f"{key}: {value}")

spark.executor.memory: 2g
spark.yarn.dist.jars: file:///home/ubuntu/.ivy2/jars/io.delta_delta-core_2.12-2.3.0.jar,file:///home/ubuntu/.ivy2/jars/io.delta_delta-storage-2.3.0.jar,file:///home/ubuntu/.ivy2/jars/org.antlr_antlr4-runtime-4.8.jar
spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite: true
spark.sql.shuffle.partitions: 24
spark.submit.pyFiles: /home/ubuntu/.ivy2/jars/io.delta_delta-core_2.12-2.3.0.jar,/home/ubuntu/.ivy2/jars/io.delta_delta-storage-2.3.0.jar,/home/ubuntu/.ivy2/jars/org.antlr_antlr4-runtime-4.8.jar
spark.databricks.delta.properties.defaults.autoOptimize.autoCompact: true
spark.executor.instances: 8
spark.jars.packages: io.delta:delta-core_2.12:2.3.0
spark.master: yarn
spark.app.name: Arxiv_Classification_baseline
spark.ui.proxyBase: /proxy/application_1679580022279_0242
spark.sql.extensions: io.delta.sql.DeltaSparkSessionExtension
spark.driver.memory: 8g
spark.yarn.isPython: true
spark.submit.deployMode: client
spark.yarn.dist.pyFiles: file:///

In [5]:
spark = SparkSession.builder.appName("MyApp").getOrCreate()
aqe_enabled = spark.conf.get("spark.sql.adaptive.enabled")
print(f"Adaptive Query Execution is {'enabled' if aqe_enabled == 'true' else 'disabled'}")


23/04/26 12:39:10 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
Adaptive Query Execution is enabled


In [6]:
# Reading from delta table Training & Testing data
delta_training_file = "hdfs:///Dat500_Group09/spark_result/training"
delta_testing_file = "hdfs:///Dat500_Group09/spark_result/testing/test1"

trainingData = spark.read.format("delta").load(delta_training_file)
    
trainingData.show(3)

                                                                                

23/04/26 12:40:26 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[Stage 9:>                                                          (0 + 1) / 1]

+---------+--------------------+--------------------+--------------------+-----------+------------+-------------+--------------------+
|       id|                text|               title|            abstract|authers_num|article_date|main_category|          main_topic|
+---------+--------------------+--------------------+--------------------+-----------+------------+-------------+--------------------+
|0704.0003|The evolution of ...|The evolution of ...| The evolution of...|          1|  2007-04-01|      Physics|     General Physics|
|0704.0004|A determinant of ...|A determinant of ...| We show that a d...|          1|  2007-04-01|  Mathematics|       Combinatorics|
|0704.0005|From dyadic $\\La...|From dyadic $\\La...| In this paper we...|          2|  2007-04-01|  Mathematics|Classical Analysi...|
+---------+--------------------+--------------------+--------------------+-----------+------------+-------------+--------------------+
only showing top 3 rows



                                                                                

In [7]:
testData = spark.read.format("delta").load(delta_testing_file)
testData.show(3)

[Stage 19:>                                                         (0 + 1) / 1]

+---------+--------------------+--------------------+--------------------+-----------+------------+-------------+--------------------+
|       id|                text|               title|            abstract|authers_num|article_date|main_category|          main_topic|
+---------+--------------------+--------------------+--------------------+-----------+------------+-------------+--------------------+
|1003.2608|Dissipative Trans...|Dissipative Trans...| After almost hal...|          4|  2010-03-01|      Physics|       Quantum Gases|
|1003.2609|Spin dynamics in ...|Spin dynamics in ...| The doping depen...|          5|  2010-03-01|      Physics|   Superconductivity|
|1003.2610|Carbon fibre tips...|Carbon fibre tips...| We report the fa...|          2|  2010-03-01|      Physics|Mesoscale and Nan...|
+---------+--------------------+--------------------+--------------------+-----------+------------+-------------+--------------------+
only showing top 3 rows



                                                                                

In [8]:
pipeline = Create_ML_pipline()

In [9]:

print("="*100)
print("Training Data size: ", trainingData.count())
print("Testing Data size: ", testData.count())
print("="*100)



                                                                                

Training Data size:  1672474




Testing Data size:  1043714


                                                                                

In [10]:
# Fit the model
ML_model = pipeline.fit(trainingData)

                                                                                

23/04/26 12:46:07 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB




23/04/26 12:48:41 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB


                                                                                

23/04/26 12:48:45 WARN DAGScheduler: Broadcasting large task binary with size 6.7 MiB




23/04/26 12:50:19 WARN DAGScheduler: Broadcasting large task binary with size 6.7 MiB


                                                                                

In [18]:
# future improvement
# save the machine learning model
#pipelinePath = 'hdfs:///Dat500_Group09/result/ML_model'
#ML_model.write().overwrite().save(pipelinePath)


# load the saved machine learning model
#from pyspark.ml import PipelineModel
#savedPipelineModel = PipelineModel.load(pipelinePath)

                                                                                

23/04/21 04:13:20 WARN TaskSetManager: Stage 229 contains a task of very large size (4849 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

23/04/21 04:13:22 WARN TaskSetManager: Stage 233 contains a task of very large size (4184 KiB). The maximum recommended task size is 1000 KiB.
23/04/21 04:13:23 WARN TaskSetManager: Stage 237 contains a task of very large size (16725 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

In [11]:
# Make predictions on the testing data
df_Prediction = ML_model.transform(testData)

In [12]:
# Print the dataframe with the original main_category and the predicted one
df_Prediction = df_Prediction.select("id", "main_category", "label", "prediction")
df_Prediction.show(3)

23/04/26 12:52:01 WARN DAGScheduler: Broadcasting large task binary with size 22.7 MiB


[Stage 54:>                                                         (0 + 1) / 1]

+----------+----------------+-----+----------+
|        id|   main_category|label|prediction|
+----------+----------------+-----+----------+
|2205.05485|     Mathematics|  1.0|       1.0|
|2205.05487|Computer Science|  2.0|       2.0|
|2205.05488|Computer Science|  2.0|       5.0|
+----------+----------------+-----+----------+
only showing top 3 rows



                                                                                

In [13]:
# Evaluate the model using the accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(df_Prediction)
print("accuracy = %g" % accuracy)

23/04/26 12:52:23 WARN DAGScheduler: Broadcasting large task binary with size 22.7 MiB




accuracy = 0.850805


                                                                                

In [14]:
# check if the Delta table exists  
# DeltaTable.isDeltaTable(spark, "spark-warehouse/table1") # True 
#DeltaTable
if DeltaTable.isDeltaTable(spark, delta_table_path):
    print("update delta table")
    deltaTable = DeltaTable.forPath(spark, delta_table_path)
    #"target.id = updates.id and target.main_category = updates.main_category") \
    deltaTable.alias("target") \
        .merge(
        source = df_Prediction.alias("updates"),
        condition = "target.id = updates.id") \
        .whenMatchedUpdate( set = 
        {
            "label": "updates.label",
            "prediction": "updates.prediction"     
        }) \
        .whenNotMatchedInsert(values =
        {
            "id": "updates.id",
            "main_category": "updates.main_category",
            "label": "updates.label",
            "prediction": "updates.prediction"        
        }) \
        .execute()
else: # file not exists
    print("Create delta table first time")
    df_Prediction.write.format("delta").save(delta_table_path)
    #df_Prediction.write.format("delta").partitionBy("main_category").save(delta_table_path)

update delta table


                                                                                

23/04/26 12:54:03 WARN DAGScheduler: Broadcasting large task binary with size 22.7 MiB




23/04/26 12:55:15 WARN DAGScheduler: Broadcasting large task binary with size 22.7 MiB


                                                                                

In [15]:
df = spark.read.format("delta").load(delta_table_path)
df.show(10)



+---------+-------------+-----+----------+
|       id|main_category|label|prediction|
+---------+-------------+-----+----------+
|0704.0011|  Mathematics|  1.0|       1.0|
|0704.0033|      Physics|  0.0|       3.0|
|0704.0037|      Physics|  0.0|       0.0|
|0704.0072|      Physics|  0.0|       1.0|
|0704.0079|  Mathematics|  1.0|       1.0|
|0704.0107|      Physics|  0.0|       3.0|
|0704.0111|  Mathematics|  1.0|       1.0|
|0704.0173|      Physics|  0.0|       0.0|
|0704.0185|      Physics|  0.0|       0.0|
|0704.0194|      Physics|  0.0|       0.0|
+---------+-------------+-----+----------+
only showing top 10 rows



                                                                                

In [16]:
df.count()

                                                                                

1043714