# **Text Classification**
## **BIG DATA PYSPARK** 2021
### **Final Project**
### **Artur Avagyan**

In [0]:
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [0]:
df = spark.sql("select * from tert_data_csv")

In [0]:
df.printSchema()

In [0]:
print('Count of rows: ',df.count(),'\nCount of columns: ', len(df.columns))

In [0]:
df.show(10)

In [0]:
df.select("Category") \
    .distinct().show()

In [0]:
df = df.withColumn("Category", when(df.Category == ' Քաղաքականություն','Քաղաքականություն')\
                               .when(df.Category == 'Քաղաքականություն ','Քաղաքականություն')\
                               .when(df.Category == ' Իրադարձային','Իրադարձային')\
                               .when(df.Category == 'Իրադարձային ','Իրադարձային')\
                               .when(df.Category == ' Սպորտ','Սպորտ')\
                               .when(df.Category == 'Սպորտ ','Սպորտ')\
                               .otherwise(df.Category))
df=df.withColumn('Month', when(df.Month=='Febuary','February').otherwise(df.Month))

In [0]:
df.select("Category") \
    .distinct().show()

In [0]:
df.groupBy('Category').count().show()

In [0]:
for col in df.columns:
  print(col, '  with null values: ', df.filter(df[col].isNull()).count())

In [0]:
import pandas as pd
import numpy as np
import plotly.graph_objects as go
import plotly.express as px
import plotly.figure_factory as ff
from plotly.subplots import make_subplots
tert_data=df.toPandas()
fig=make_subplots(rows=2,cols=2,
                subplot_titles=("Article distribution by categories",
                                "Article distribution by month",
                                "Article distribution by weekday",
                                "Article distribution by hour"))
fig.add_trace(go.Bar(y=tert_data.Category.value_counts().sort_values(ascending=True).index,
                    x=tert_data.Category.value_counts().sort_values(ascending=True),
                    name='',
                    orientation='h',
                    showlegend=False,
                    text=tert_data.Category.value_counts().sort_values(ascending=True),
                    textposition='outside'),row=1,col=1)

tert_data['Month'] = pd.Categorical(tert_data['Month'],
                            categories=['January','February','March','April','May','June',
                                        'July','August','September','October','November','December'],ordered=True)
fig.add_trace(go.Bar(x=tert_data.Month.value_counts().sort_index(ascending=True).index,
                    y=tert_data.Month.value_counts().sort_index(ascending=True),
                    name='',
                    orientation='v',
                    showlegend=False,
                    text=tert_data.Month.value_counts().sort_index(ascending=True),
                    textposition='outside'),row=1,col=2)

tert_data['Weekday'] = pd.Categorical(tert_data['Weekday'],
                            categories=['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'],ordered=True)
fig.add_trace(go.Bar(x=tert_data.Weekday.value_counts().sort_index(ascending=True).index,
                    y=tert_data.Weekday.value_counts().sort_index(ascending=True),
                    name='',
                    orientation='v',
                    showlegend=False,
                    text=tert_data.Weekday.value_counts().sort_index(ascending=True),
                    textposition='outside'),row=2,col=1)

tert_data['Hour'] = pd.Categorical(tert_data['Hour'],
                            categories=[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23],ordered=True)
fig.add_trace(go.Bar(x=tert_data.Hour.value_counts().sort_index(ascending=True).index,
                    y=tert_data.Hour.value_counts().sort_index(ascending=True),
                    name='',
                    orientation='v',
                    showlegend=False,
                    text=tert_data.Hour.value_counts().sort_index(ascending=True),
                    textposition='outside'),row=2,col=2)


fig.update_layout(title="Some statistics from tert.am",
                  width=1650, height=950,
                  # paper_bgcolor="LightSteelBlue"
                  )
fig.show()

In [0]:
# df=df.withColumn("Year",df.Year.cast('integer'))\
#   .withColumn("Day",df.Day.cast('integer'))\
#   .withColumn("Hour",df.Hour.cast('integer'))

In [0]:
df_only_text=df.select('Article','Category')
df_only_text.show(15)

In [0]:
df_only_text = df_only_text.withColumn("Category", when(df.Category == 'Քաղաքականություն','Քաղաքական')\
                               .when(df.Category == 'Իրադարձային','Քաղաքական')\
                               .otherwise('Ոչ քաղաքական'))

## Data preprocessing and ML models

In [0]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover,CountVectorizer,IDF, StringIndexer

tokenizer=Tokenizer(inputCol='Article',outputCol='mytokens')

arm_stop_words=["այդ","այլ","այն","այս","դու","դուք","եմ","չեմ","երևի",
            "են","ենք","ես","եք","է","էի","էին","էինք","եթե","դա",
            "չեն","չենք","չես","չեք","չի","չէի","չէին","չէինք",
            "էիր","էիք","էր","ըստ","թ.","թե","ի","ին","իսկ","ինչ","դրա",
            "իր","կամ","համար","հետ","հետո","մենք","մեջ","մասին","մեր",
            "մի","նա","նաև","նրա","նրանք","որ","որը","չի","դեպքում","ու",
            "որոնք","որպես","ում","պիտի","վրա","և","եւ","կարող","սակայն",
            "ահա", "անգամ", "գեթ", "գոնե", "իբր", "թող", "լոկ","որովհետև",
            "խո", "խոմ", "հենց", "հո", "մանավանդ","իրոք որ", "թերևս", "թեկուզ",
            "կարծես", "միգուցե", "միթե", "մինչև","անգամ", "նույնիսկ",
            "անկասկած", "անպատճառ", "անտարակույս","իսկապես", "միայն"]

stopwords_remover=StopWordsRemover(inputCol='mytokens',outputCol='filtered_tokens',stopWords=arm_stop_words)
vectorizer=CountVectorizer(inputCol='filtered_tokens',outputCol='rawFeatures')
idf=IDF(inputCol='rawFeatures',outputCol='vectorizedFeatures')


labelEncoder = StringIndexer(inputCol='Category',outputCol='label').fit(df_only_text)
labelEncoder.labels

In [0]:
df_only_text=labelEncoder.transform(df_only_text)
df_only_text.show(5)

In [0]:
(trainDF, testDF)=df_only_text.randomSplit((0.75,0.25),seed=42)
trainDF.show(5)

## Text classification models

In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
lr=LogisticRegression(featuresCol='vectorizedFeatures',labelCol='label')
pipeline=Pipeline(stages=[tokenizer,stopwords_remover,vectorizer,idf,lr])
lr_model=pipeline.fit(trainDF)
predictions=lr_model.transform(testDF)
predictions.columns

In [0]:
predictions.select( 'probability', 'Category','label','prediction').show(10)

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator=MulticlassClassificationEvaluator(labelCol='label',
                                           predictionCol='prediction',
                                           metricName='accuracy')
accuracy=evaluator.evaluate(predictions)
accuracy

In [0]:
from pyspark.mllib.evaluation import MulticlassMetrics
lr_metric=MulticlassMetrics(predictions['label','prediction'].rdd)
print('Accuracy: --> ',lr_metric.accuracy)
print('Precision for label 0: --> ',lr_metric.precision(0.0))
print('Precision for label 1: --> ',lr_metric.precision(1.0))
print('Recall for label 0: --> ',lr_metric.recall(0.0))
print('Recall for label 1: --> ',lr_metric.recall(1.0))
print('F1Score for label 0: --> ',lr_metric.fMeasure(0.0))
print('F1Score for label 1: --> ',lr_metric.fMeasure(1.0))
print('Confusion Matrix: -->\n',lr_metric.confusionMatrix().toArray())

from sklearn.metrics import classification_report
print('\nClassification Report\n',classification_report(predictions.select('label').toPandas(),
                            predictions.select('prediction').toPandas(),
                            target_names=labelEncoder.labels))

In [0]:
from pyspark.sql.types import StringType
example_article=spark.createDataFrame([
  ('''
  Ռամոսի հետ պայմանագիր կնքելու ցանկություն ունեն անգլիական «Մանչեսթեր Սիթին» ու «Մանչեսթեր Յունայթեդը» եւ ֆրանսիական ՊՍԺ-ն, հաղորդել էր իսպանական AS-ը։
  Սերխիո Ռամոսը 2005 թվականին «Սեւիլիայից» է տեղափոխվել «Ռեալ», որի կազմում անցկացրել է 671 հանդիպում, խփել 101 գոլ, կատարել 40 գոլային փոխանցում, դարձել Իսպանիայի հնգակի չեմպիոն, կրկնակի գավաթակիր, քառակի սուպերգավաթակիր, Չեմպիոնների լիգայի քառակի գավաթակիր, Եվրոպայի եռակի սուպերգավաթակիր, աշխարհի քառակի ակումբային չեմպիոն։
  ''',StringType())
],['Article'])

pred_example=lr_model.transform(example_article)
pred_example.select('Article','prediction')\
  .withColumn("prediction", when(pred_example.prediction == 0.0,'Քաղաքական')\
                               .otherwise('Ոչ քաղաքական')).show()


### Other Classification models

In [0]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol='vectorizedFeatures',labelCol='label',maxDepth=10)
pipeline=Pipeline(stages=[tokenizer,stopwords_remover,vectorizer,idf,dt])
dt_model=pipeline.fit(trainDF)
predictions=dt_model.transform(testDF)
evaluator=MulticlassClassificationEvaluator(labelCol='label',
                                           predictionCol='prediction',
                                           metricName='accuracy')
accuracy=evaluator.evaluate(predictions)
accuracy

In [0]:
dt_metric=MulticlassMetrics(predictions['label','prediction'].rdd)
print('Accuracy: --> ',dt_metric.accuracy)
print('Precision for label 0: --> ',dt_metric.precision(0.0))
print('Precision for label 1: --> ',dt_metric.precision(1.0))
print('Recall for label 0: --> ',dt_metric.recall(0.0))
print('Recall for label 1: --> ',dt_metric.recall(1.0))
print('F1Score for label 0: --> ',dt_metric.fMeasure(0.0))
print('F1Score for label 1: --> ',dt_metric.fMeasure(1.0))
print('Confusion Matrix: -->\n',dt_metric.confusionMatrix().toArray())

print('\nClassification Report\n',classification_report(predictions.select('label').toPandas(),
                            predictions.select('prediction').toPandas(),
                            target_names=labelEncoder.labels))

In [0]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol='vectorizedFeatures',labelCol='label', numTrees=100,maxDepth=10)
pipeline=Pipeline(stages=[tokenizer,stopwords_remover,vectorizer,idf,rf])
rf_model=pipeline.fit(trainDF)
predictions=rf_model.transform(testDF)
evaluator=MulticlassClassificationEvaluator(labelCol='label',
                                           predictionCol='prediction',
                                           metricName='accuracy')
accuracy=evaluator.evaluate(predictions)
accuracy

In [0]:
rf_metric=MulticlassMetrics(predictions['label','prediction'].rdd)
print('Accuracy: --> ',rf_metric.accuracy)
print('Precision for label 0: --> ',rf_metric.precision(0.0))
print('Precision for label 1: --> ',rf_metric.precision(1.0))
print('Recall for label 0: --> ',rf_metric.recall(0.0))
print('Recall for label 1: --> ',rf_metric.recall(1.0))
print('F1Score for label 0: --> ',rf_metric.fMeasure(0.0))
print('F1Score for label 1: --> ',rf_metric.fMeasure(1.0))
print('Confusion Matrix: -->\n',rf_metric.confusionMatrix().toArray())

print('\nClassification Report\n',classification_report(predictions.select('label').toPandas(),
                            predictions.select('prediction').toPandas(),
                            target_names=labelEncoder.labels))

In [0]:
from pyspark.ml.classification import NaiveBayes
scores=[]
for i in np.arange(0.1, 1.1, 0.1):
  nb = NaiveBayes(featuresCol='vectorizedFeatures',labelCol='label',smoothing=i, modelType="multinomial")
  pipeline=Pipeline(stages=[tokenizer,stopwords_remover,vectorizer,idf,nb])
  nb_model=pipeline.fit(trainDF)
  predictions=nb_model.transform(testDF)
  evaluator=MulticlassClassificationEvaluator(labelCol='label',
                                             predictionCol='prediction',
                                             metricName='accuracy')
  accuracy=evaluator.evaluate(predictions)
  scores.append(accuracy)
alphas = np.arange(0.1,1.1,0.1)
fig = px.line(x=alphas, y=scores,
              )
fig.update_layout(title='Accuracy with different alphas',
                  width=500, height=500,
                  xaxis_title="Alpha",
                  yaxis_title="Accuracy"
                  )
fig.show()

In [0]:
nb = NaiveBayes(featuresCol='vectorizedFeatures',labelCol='label',smoothing=0.9, modelType="multinomial")
pipeline=Pipeline(stages=[tokenizer,stopwords_remover,vectorizer,idf,nb])
nb_model=pipeline.fit(trainDF)
predictions=nb_model.transform(testDF)

nb_metric=MulticlassMetrics(predictions['label','prediction'].rdd)
print('Accuracy: --> ',nb_metric.accuracy)
print('Precision for label 0: --> ',nb_metric.precision(0.0))
print('Precision for label 1: --> ',nb_metric.precision(1.0))
print('Recall for label 0: --> ',nb_metric.recall(0.0))
print('Recall for label 1: --> ',nb_metric.recall(1.0))
print('F1Score for label 0: --> ',nb_metric.fMeasure(0.0))
print('F1Score for label 1: --> ',nb_metric.fMeasure(1.0))
print('Confusion Matrix: -->\n',nb_metric.confusionMatrix().toArray())

print('\nClassification Report\n',classification_report(predictions.select('label').toPandas(),
                            predictions.select('prediction').toPandas(),
                            target_names=labelEncoder.labels))

## Best Regards

<h4 align="left">Artur Avagyan</h4>
    <ul>
    <li>Actuarial supervisor at Central Bank of Armenia</li>
    <li>Student from YSU (Data Science for Business Master's Degree Program)</li>
    <li>E-mail:   avagyan.artur97@gmail.com</li>
    <li>LinkedIn: <a href="https://www.linkedin.com/in/artur-avagyan-0a16311b3">Artur Avagyan</a></li>
    <li>GitHub:   <a href="https://github.com/artur-avagyan">Artur Avagyan</a></li>
    <li>Phone:    +37498641815</li>
    </ul>

<h3 align="center">YSU DSB 2020-2021</h3>