#### News articles can be from different categories like sports, business, etc. This project uses spark infrastructure with machine learning to predict the category of articles. The script first trains the model using the training set, test it, and finally evaluate the performance on some unknown article set.


#### ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------



### Import Packages and Create SparkSession

In [1]:
# Import packages
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, Tokenizer, RegexTokenizer, StopWordsRemover, VectorIndexer, OneHotEncoder, VectorAssembler, IndexToString
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Creatingt Spark SQL environment
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark

### Load Files

In [2]:
# Load text files into dataframe

dir_names = ["data/business/*", "data/movies/*", "data/politics/*", "data/science/*", "data/sports/*"]
labels = ["business", "movies", "politics", "science", "sports"]
schema = StructType([StructField('file_name', StringType(), True),StructField('text', StringType(), True),StructField('category', StringType(), True)])
articles_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)

for idx, dir_name in enumerate(dir_names):
    articles = spark.sparkContext.wholeTextFiles(dir_name)
    articles_data = articles.map(lambda item: (item[0].split('/')[-1],item[1],labels[idx]))
    articles_df = articles_df.union(spark.createDataFrame(articles_data, ["file_name", "text", "category"]))

articles_df.distinct().show(5)
print("Total Data (files) :", articles_df.count())


+--------------------+--------------------+--------+
|           file_name|                text|category|
+--------------------+--------------------+--------+
|ny_business_artic...|DAVOS, Switzerlan...|business|
|ny_science_articl...|North America was...| science|
|ny_business_artic...|Spotify is finall...|business|
|ny_movies_article...|Even at a time of...|  movies|
|ny_politics_artic...|A Democratic grou...|politics|
+--------------------+--------------------+--------+
only showing top 5 rows

Total Data (files) : 400


### Clean and format data

In [3]:
# Create labels, tokenize and clean data

from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover

# Create Labels for the record(containing text from different files of some category)
labeledData = StringIndexer(inputCol = "category", outputCol = "label").fit(articles_df).transform(articles_df)

# Tokenize the data
tokenizedData = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W").transform(labeledData)

# Clean data by removing stopwords
cleanData = StopWordsRemover(inputCol="words", outputCol="filtered").transform(tokenizedData)

cleanData.show(2)

+--------------------+--------------------+--------+-----+--------------------+--------------------+
|           file_name|                text|category|label|               words|            filtered|
+--------------------+--------------------+--------+-----+--------------------+--------------------+
|ny_business_artic...|Good Wednesday. H...|business|  1.0|[good, wednesday,...|[good, wednesday,...|
|ny_business_artic...|The German carmak...|business|  1.0|[the, german, car...|[german, carmaker...|
+--------------------+--------------------+--------+-----+--------------------+--------------------+
only showing top 2 rows



### Features Extraction

In [4]:
# Identify features using hashingTF and IDF

from pyspark.ml.feature import HashingTF, IDF

# Calculate term frequency of the document
hashingTermFreq = HashingTF(inputCol="filtered", outputCol="tfFeatures", numFeatures=1000)
tfFeaturedData = hashingTermFreq.transform(cleanData)

# Calculate inverse document frequency
idf = IDF(inputCol="tfFeatures", outputCol="features")
invDocFreqModel = idf.fit(tfFeaturedData)
rescaledIdfData = invDocFreqModel.transform(tfFeaturedData)

rescaledIdfData.select("category","label","filtered","tfFeatures","features").show(2)

+--------+-----+--------------------+--------------------+--------------------+
|category|label|            filtered|          tfFeatures|            features|
+--------+-----+--------------------+--------------------+--------------------+
|business|  1.0|[good, wednesday,...|(1000,[1,2,3,5,6,...|(1000,[1,2,3,5,6,...|
|business|  1.0|[german, carmaker...|(1000,[3,6,10,23,...|(1000,[3,6,10,23,...|
+--------+-----+--------------------+--------------------+--------------------+
only showing top 2 rows



### Data Partition

In [5]:
# Spliting in train and test set. Beware : It sorts the dataset

train, test = rescaledIdfData.randomSplit([0.7,0.3])
print("Training Data (files) ",train.count())
print("Test Data (files) ",test.count())

Training Data (files)  283
Test Data (files)  117


### Train and Predict partitioned data using Random Forest Classification

In [6]:
# TRAIN and TEST DATA USING RANDOM_FOREST_CLASSIFICATION

from pyspark.ml.classification import RandomForestClassifier

# Create the RF model
rf = RandomForestClassifier(labelCol="label", featuresCol="features") 
# Train model
rfModel = rf.fit(train)

# Predict test data
rfPredictions = rfModel.transform(test)
 
print("TEST RESULT:")
# Show the result of prediction with the probability
rfPredictions.filter(rfPredictions['prediction'] == 0)\
    .select("category","file_name","text","features","label","prediction","probability") \
    .orderBy("probability",ascending=False).show(10)

# Evaluate the prediction result
mulClassEvl = MulticlassClassificationEvaluator(predictionCol="prediction", metricName="accuracy")
accuracy = mulClassEvl.evaluate(rfPredictions)
print("Accuracy- %g" % accuracy)
print("Test Error- %g" % (1.0 - accuracy))


TEST RESULT:
+--------+--------------------+--------------------+--------------------+-----+----------+--------------------+
|category|           file_name|                text|            features|label|prediction|         probability|
+--------+--------------------+--------------------+--------------------+-----+----------+--------------------+
|politics|ny_politics_artic...|WASHINGTON — Pres...|(1000,[1,3,7,8,9,...|  0.0|       0.0|[0.78171980855110...|
|politics|ny_politics_artic...|WASHINGTON — For ...|(1000,[2,4,6,13,1...|  0.0|       0.0|[0.68643056261657...|
|politics|ny_politics_artic...|WASHINGTON — Repr...|(1000,[4,5,7,12,1...|  0.0|       0.0|[0.67677694707072...|
|politics|ny_politics_artic...|WASHINGTON — The ...|(1000,[0,3,4,6,8,...|  0.0|       0.0|[0.64902220341435...|
|politics|ny_politics_artic...|WASHINGTON — Sena...|(1000,[0,1,4,6,9,...|  0.0|       0.0|[0.63167517006392...|
|politics|ny_politics_artic...|Update, March 14,...|(1000,[3,4,9,19,2...|  0.0|       0.0|[

## EVALUATING RANDOM FOREST CLASSIFICATION MODEL ON UNKNOWN DATA (not test data)


In [7]:
# LOAD NEW FILES (TEST FILES)

new_dir_names = ["new_data/business/*", "new_data/movies/*", "new_data/politics/*", "new_data/science/*", "new_data/sports/*"]
labels = ["business", "movies", "politics", "science", "sports"]
schema = StructType([StructField('file_name', StringType(), True),StructField('text', StringType(), True),StructField('category', StringType(), True)])
new_articles_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)

for idx, new_dir_name in enumerate(new_dir_names):
    new_articles = spark.sparkContext.wholeTextFiles(new_dir_name)
    new_articles_data = new_articles.map(lambda item: (item[0].split('/')[-1],item[1],labels[idx]))
    new_articles_df = new_articles_df.union(spark.createDataFrame(new_articles_data, ["file_name", "text", "category"]))

print("Total new files loaded (only for testing) :", new_articles_df.count())
print()

# label, Clean, and Extract Features
labeledTestData = StringIndexer(inputCol = "category", outputCol = "label").fit(articles_df).transform(new_articles_df)
tokenizedTestData = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W").transform(labeledTestData)
clnTestData = StopWordsRemover(inputCol="words", outputCol="filtered").transform(tokenizedTestData)
featuredTestData = HashingTF(inputCol="filtered", outputCol="tfFeatures", numFeatures=1000).transform(clnTestData)
rescaledTestData = IDF(inputCol="tfFeatures", outputCol="features").fit(tfFeaturedData).transform(featuredTestData)

# Predict the labels
testPrediction = rfModel.transform(rescaledTestData)
 
print("TEST RESULT:")
# Show the result of prediction with the probability
testPrediction.filter(testPrediction['prediction'] == 0)\
    .select("category","file_name","text","features","label","prediction","probability") \
    .orderBy("probability",ascending=False).show(10)

# Evaluate accuracy
mulClassEvl = MulticlassClassificationEvaluator(predictionCol="prediction", metricName="accuracy")
accuracy = mulClassEvl.evaluate(testPrediction)
print("Accuracy- %g" % accuracy)
print("Test Error- %g" % (1.0 - accuracy))


Total new files loaded (only for testing) : 100

TEST RESULT:
+--------+--------------------+--------------------+--------------------+-----+----------+--------------------+
|category|           file_name|                text|            features|label|prediction|         probability|
+--------+--------------------+--------------------+--------------------+-----+----------+--------------------+
|politics|ny_politics_artic...|LIMA, Peru — As P...|(1000,[1,3,5,11,1...|  0.0|       0.0|[0.78805006569284...|
|politics|ny_politics_artic...|WASHINGTON — With...|(1000,[1,4,5,7,9,...|  0.0|       0.0|[0.66155175880286...|
|politics|ny_politics_artic...|WASHINGTON — The ...|(1000,[0,1,4,5,6,...|  0.0|       0.0|[0.61061653168505...|
|politics|ny_politics_artic...|WASHINGTON — The ...|(1000,[1,6,7,10,1...|  0.0|       0.0|[0.59713316590150...|
|politics|ny_politics_artic...|WASHINGTON — Pres...|(1000,[3,4,5,6,7,...|  0.0|       0.0|[0.53992632368327...|
|politics|ny_politics_artic...|WASHINGTON 

### Train and Predict partitioned data using Logistic Regression

In [8]:
# TRAIN AND TEST DATA USING LOGISTIC_REGRESSION

# Apply Logistic Regression to create the model, train, and predict on test data.
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create model
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
# Train model
lrModel = lr.fit(train)

# Predict Testdata
lrPredictions = lrModel.transform(test)

print("TEST RESULT:")
# Display prediction result and its probability
lrPredictions.filter(lrPredictions['prediction'] == 0) \
    .select("category","file_name","text","features","label","prediction","probability") \
    .orderBy("probability",ascending=False).show(10)

# Evaluate the prediction result
mulClassEvl = MulticlassClassificationEvaluator(predictionCol="prediction", metricName="accuracy")
accuracy = mulClassEvl.evaluate(lrPredictions)
print("Accuracy- %g" % accuracy)
print("Test Error- %g" % (1.0 - accuracy))

TEST RESULT:
+--------+--------------------+--------------------+--------------------+-----+----------+--------------------+
|category|           file_name|                text|            features|label|prediction|         probability|
+--------+--------------------+--------------------+--------------------+-----+----------+--------------------+
|politics|ny_politics_artic...|• President Trump...|(1000,[0,1,3,4,7,...|  0.0|       0.0|[0.99985646975422...|
|politics|ny_politics_artic...|WASHINGTON — The ...|(1000,[1,3,12,19,...|  0.0|       0.0|[0.98788680063328...|
|politics|ny_politics_artic...|WASHINGTON — The ...|(1000,[3,4,6,7,10...|  0.0|       0.0|[0.94510932410313...|
|politics|ny_politics_artic...|WASHINGTON — Pres...|(1000,[1,3,7,8,9,...|  0.0|       0.0|[0.92211608608754...|
|politics|ny_politics_artic...|WASHINGTON — Sena...|(1000,[0,1,4,6,9,...|  0.0|       0.0|[0.88301017166012...|
|politics|ny_politics_artic...|Update, March 14,...|(1000,[3,4,9,19,2...|  0.0|       0.0|[

## EVALUATING LOGISTIC REGRESSION MODEL ON UNKNOWN DATA (not test data)

In [9]:
# LOAD NEW FILES (TEST FILES)

new_dir_names = ["new_data/business/*", "new_data/movies/*", "new_data/politics/*", "new_data/science/*", "new_data/sports/*"]
labels = ["business", "movies", "politics", "science", "sports"]
schema = StructType([StructField('file_name', StringType(), True),StructField('text', StringType(), True),StructField('category', StringType(), True)])
new_articles_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)

for idx, new_dir_name in enumerate(new_dir_names):
    new_articles = spark.sparkContext.wholeTextFiles(new_dir_name)
    new_articles_data = new_articles.map(lambda item: (item[0].split('/')[-1],item[1],labels[idx]))
    new_articles_df = new_articles_df.union(spark.createDataFrame(new_articles_data, ["file_name", "text", "category"]))

print("Total new files loaded (only for testing) :", new_articles_df.count())
print()

# label, Clean, and Extract Features
labeledTestData2 = StringIndexer(inputCol = "category", outputCol = "label").fit(articles_df).transform(new_articles_df)
tokenizedTestData2 = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W").transform(labeledTestData2)
clnTestData2 = StopWordsRemover(inputCol="words", outputCol="filtered").transform(tokenizedTestData2)
featuredTestData2 = HashingTF(inputCol="filtered", outputCol="tfFeatures", numFeatures=1000).transform(clnTestData2)
rescaledTestData2 = IDF(inputCol="tfFeatures", outputCol="features").fit(tfFeaturedData).transform(featuredTestData2)

# Predict labels
testPrediction = lrModel.transform(rescaledTestData2)
 
print("TEST RESULT:")
# Show the result of prediction with the probability
testPrediction.filter(testPrediction['prediction'] == 0)\
    .select("category","file_name","text","features","label","prediction","probability") \
    .orderBy("probability",ascending=False).show(10)

# Evaluate accuracy
mulClassEvl = MulticlassClassificationEvaluator(predictionCol="prediction", metricName="accuracy")
accuracy = mulClassEvl.evaluate(testPrediction)
print("Accuracy- %g" % accuracy)
print("Test Error- %g" % (1.0 - accuracy))


Total new files loaded (only for testing) : 100

TEST RESULT:
+--------+--------------------+--------------------+--------------------+-----+----------+--------------------+
|category|           file_name|                text|            features|label|prediction|         probability|
+--------+--------------------+--------------------+--------------------+-----+----------+--------------------+
|politics|ny_politics_artic...|WASHINGTON — The ...|(1000,[1,6,7,10,1...|  0.0|       0.0|[0.98255504585349...|
|politics|ny_politics_artic...|WASHINGTON — Pres...|(1000,[3,4,5,6,7,...|  0.0|       0.0|[0.97165103767754...|
|politics|ny_politics_artic...|BLOOMINGTON, Ind....|(1000,[0,1,2,3,4,...|  0.0|       0.0|[0.95672635635010...|
|politics|ny_politics_artic...|LIMA, Peru — As P...|(1000,[1,3,5,11,1...|  0.0|       0.0|[0.88470932343682...|
|politics|ny_politics_artic...|WASHINGTON — With...|(1000,[1,4,5,7,9,...|  0.0|       0.0|[0.87240393340621...|
|politics|ny_politics_artic...|WASHINGTON 

## Repeate the above process using pipeline (with Random Forest Classification)

In [10]:
# APPLICATION OF PIPELINE (with Random Forest Classification)

# Define all the transformations
labelIndexer = StringIndexer(inputCol = "category", outputCol = "label")
regTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")
stopWordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered")
hashingTF = HashingTF(inputCol="filtered", outputCol="itfFeatures", numFeatures=1000)
idf = IDF(inputCol="itfFeatures", outputCol="features", minDocFreq=5)

# Pipeline all the transformations
pipeline = Pipeline(stages=[labelIndexer, regTokenizer, stopWordsRemover, hashingTF, idf])
pipelineModel = pipeline.fit(articles_df)
data = pipelineModel.transform(articles_df)

# Split the data in training and test
train, test = data.randomSplit([0.7, 0.3])

# Create Logistic Regression model
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
# Train the model
rfModel = rf.fit(train)
# Predict the test data
rfPredictions = rfModel.transform(test)
print("TEST RESULT:")
# Display prediction result and prediction probability
rfPredictions.filter(rfPredictions['prediction'] == 0) \
    .select("category","file_name","text","features","label","prediction","probability") \
    .orderBy("probability", ascending=False).show(10)
    
# Evaluate the performance of the model
mulClassEvl = MulticlassClassificationEvaluator(predictionCol="prediction", metricName="accuracy")
accuracy = mulClassEvl.evaluate(rfPredictions)
print("Accuracy- %g" % accuracy)
print("Test Error- %g" % (1.0 - accuracy))

TEST RESULT:
+--------+--------------------+--------------------+--------------------+-----+----------+--------------------+
|category|           file_name|                text|            features|label|prediction|         probability|
+--------+--------------------+--------------------+--------------------+-----+----------+--------------------+
|politics|ny_politics_artic...|WASHINGTON — When...|(1000,[1,9,13,16,...|  0.0|       0.0|[0.66803400508571...|
|politics|ny_politics_artic...|WASHINGTON — The ...|(1000,[0,3,4,6,8,...|  0.0|       0.0|[0.65384135411594...|
|politics|ny_politics_artic...|WASHINGTON — In a...|(1000,[0,1,3,7,13...|  0.0|       0.0|[0.64296410178251...|
|politics|ny_politics_artic...|WASHINGTON — Pres...|(1000,[0,1,3,5,6,...|  0.0|       0.0|[0.62832395566357...|
|politics|ny_politics_artic...|WASHINGTON — The ...|(1000,[1,3,12,19,...|  0.0|       0.0|[0.59525478052635...|
|politics|ny_politics_artic...|WASHINGTON — Phil...|(1000,[1,4,5,7,13...|  0.0|       0.0|[

## Repeate the above process using pipeline (with Logistic Regression)

In [11]:
# APPLICATION OF PIPELINE (with Logistic Regression)

# Define all the transformations
labelIndexer = StringIndexer(inputCol = "category", outputCol = "label")
regTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")
stopWordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered")
hashingTF = HashingTF(inputCol="filtered", outputCol="itfFeatures", numFeatures=1000)
idf = IDF(inputCol="itfFeatures", outputCol="features", minDocFreq=5)

# Pipeline all the transformations
pipeline = Pipeline(stages=[labelIndexer, regTokenizer, stopWordsRemover, hashingTF, idf])
pipelineModel = pipeline.fit(articles_df)
data = pipelineModel.transform(articles_df)

# Split the data in training and test
train, test = data.randomSplit([0.7, 0.3])

# Create logistic regression model
# Create model
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
# Train model
lrModel = lr.fit(train)
# Predict Testdata
lrPredictions = lrModel.transform(test)
print("TEST RESULT:")
# Display prediction result and prediction probability
lrPredictions.filter(lrPredictions['prediction'] == 0) \
    .select("category","file_name","text","features","label","prediction","probability") \
    .orderBy("probability", ascending=False) \
    .show(10)
    
# Evaluate the performance of the model
mulClassEvl = MulticlassClassificationEvaluator(predictionCol="prediction", metricName="accuracy")
accuracy = mulClassEvl.evaluate(lrPredictions)
print("Accuracy- %g" % accuracy)
print("Test Error- %g" % (1.0 - accuracy))

TEST RESULT:
+--------+--------------------+--------------------+--------------------+-----+----------+--------------------+
|category|           file_name|                text|            features|label|prediction|         probability|
+--------+--------------------+--------------------+--------------------+-----+----------+--------------------+
|politics|ny_politics_artic...|WASHINGTON — Fift...|(1000,[0,1,3,4,6,...|  0.0|       0.0|[0.94852034162383...|
|politics|ny_politics_artic...|WASHINGTON — Pres...|(1000,[1,3,7,8,9,...|  0.0|       0.0|[0.94761729681854...|
|politics|ny_politics_artic...|WASHINGTON — From...|(1000,[0,3,4,5,6,...|  0.0|       0.0|[0.94406212285865...|
|politics|ny_politics_artic...|WASHINGTON — Few ...|(1000,[0,1,3,4,6,...|  0.0|       0.0|[0.93880856642434...|
|politics|ny_politics_artic...|Update, March 14,...|(1000,[3,4,9,19,2...|  0.0|       0.0|[0.93855943813865...|
|politics|ny_politics_artic...|WASHINGTON — Pres...|(1000,[0,2,3,4,6,...|  0.0|       0.0|[

In [181]:
spark.stop()

### References:

https://creativedata.atlassian.net/wiki/spaces/SAP/pages/83237142/Pyspark+-+Tutorial+based+on+Titanic+Dataset<br>
https://www.tutorialkart.com/apache-spark/spark-mllib-tf-idf/<br>
