NOTE:
This is a directly exported ipynb version of the databricks notebook. 
 To check out the .dbc version, visit 
this [link](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5765572329082963/819669162567595/1062157151406693/latest.html)

Import required libraries

In [0]:
from pyspark.sql.functions import col, udf, regexp_replace, isnull
from pyspark.sql.types import StringType,IntegerType

from pyspark.ml.feature import StringIndexer, RegexTokenizer, StopWordsRemover, CountVectorizer, HashingTF, IDF

from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

Load the data

In [0]:
# File location and type
file_location = "/FileStore/tables/newsCorpora.csv"

df = spark.read.csv(file_location, 
                    header=False,
                    sep='\t')

In [0]:
df.show()

Renaming the columns

In [0]:
old_cols = df.schema.names
new_cols = ['ID', 'TITLE', 'URL', 'PUBLISHER', 'CATEGORY', 'STORY', 'HOSTNAME', 'TIMESTAMP']

from functools import reduce
df = reduce(lambda df, idx: df.withColumnRenamed(old_cols[idx], new_cols[idx]), range(len(old_cols)), df)

In [0]:
df.show(5)

In [0]:
print(f'Total number of rows: {df.count()}')

Subsetting a new dataframe with dependent variable (category) and independent variable (title)

In [0]:
news_df = df.select("TITLE", "CATEGORY")

Missing values

In [0]:
news_df.where(col("TITLE").isNull()).show()

In [0]:
news_df.where(col("CATEGORY").isNull()).show()

In [0]:
news_df.groupBy('CATEGORY').count().orderBy(col('count').desc()).show()

In [0]:
news_data = news_df.where(col("CATEGORY").isNotNull())

In [0]:
null_values = news_df.where(col("CATEGORY").isNull())

In [0]:
null_values.select('TITLE').show(truncate=False)

We can see, it is not actually Null, our csv reader was not able to split the rows properly. Thus, we go ahead and split the title into required columns.

In [0]:
null_values_pd = null_values.toPandas()


In [0]:
null_values_pd.TITLE.values[0]

In [0]:
null_values_pd.TITLE.values[0].split('\t')

In [0]:
categories = [null_values_pd.TITLE.values[i].split('\t')[3] for i in range(len(null_values_pd))]
categories

In [0]:
null_values_pd.loc[:, 'CATEGORY'] = categories

In [0]:
null_values_pd

Unnamed: 0,TITLE,CATEGORY
0,Love & Hip-Hop' Star Benzino Shot By Nephew En...,e
1,Peaches has died. We are beyond pain.\thttp://...,e
2,I don't know why I'm a sex symbol' says smould...,e
3,Hunger Games' top winner at MTV Movie Awards\t...,e
4,"A few years ago, scientists calculated that be...",t
5,I never thought I'd be in love” says Angelina ...,e
6,The Best Reactions To The Supposed Video of So...,e
7,The Fault In Our Stars' to release in India on...,e
8,"More Japanese Xbox One Games In Development, A...",t


In [0]:
imputed_sdf = spark.createDataFrame(null_values_pd)
imputed_sdf.show()

Merging the DataFrames

In [0]:
news_data_imp = news_data.union(imputed_sdf)
news_data_imp.groupBy('CATEGORY').count().orderBy(col('count').desc()).show()

In [0]:
news_data_imp.select('TITLE').show(10, truncate=False)

Data Cleaning
- Regex
- Tokenizing
- Removing stop words

In [0]:
news_data_imp = news_data_imp.withColumn('clean_title', regexp_replace(col('TITLE'), r'\'s'  , ''))
news_data_imp = news_data_imp.withColumn('clean_title', regexp_replace(col('clean_title'), '[^a-zA-Z ]'  , ''))
news_data_imp.show(truncate=False)

In [0]:
re_tokenizer = RegexTokenizer(inputCol='clean_title', outputCol='tokens')
tokenized_data = re_tokenizer.transform(news_data_imp)

In [0]:
tokenized_data.select('TITLE','tokens').show(truncate=False)

In [0]:
sw_remover = StopWordsRemover(inputCol='tokens', outputCol='no_stop_words')
sw_tokenized_data = sw_remover.transform(tokenized_data)

In [0]:
sw_tokenized_data.select('TITLE', 'no_stop_words').show(truncate=False)

In [0]:
indexer = StringIndexer(inputCol="CATEGORY", outputCol="category_index")
indexed_data = indexer.fit(sw_tokenized_data).transform(sw_tokenized_data)

In [0]:
indexed_data.groupBy('CATEGORY', 'category_index').count().orderBy(col('count').desc()).show()

#### Vectorizing the text data

Count Vectorizer

In [0]:
cv = CountVectorizer(inputCol='no_stop_words', outputCol='count_vec_features')
cv_model = cv.fit(indexed_data)
cv_features = cv_model.transform(indexed_data)

In [0]:
cv_features.select('count_vec_features').show(2)

In [0]:
train_cv, test_cv = cv_features.randomSplit([0.75, 0.25],seed=9)

In [0]:
naive_bayes_cv = NaiveBayes(modelType='multinomial', labelCol='category_index', featuresCol='count_vec_features')
naive_bayes_cv_model = naive_bayes_cv.fit(train_cv)
predictions_cv = naive_bayes_cv_model.transform(test_cv)

In [0]:
model_evaluator_cv = MulticlassClassificationEvaluator(labelCol='category_index', predictionCol='prediction', metricName='accuracy')
accuracy_cv = model_evaluator_cv.evaluate(predictions_cv)

In [0]:
print(f"Accuracy using NB Classifier for count vectorized features = {round(accuracy_cv * 100, 3)}%")


TF-IDF Vectorizer

In [0]:
hashingTF = HashingTF(inputCol='no_stop_words', outputCol='tf_features')
tf_features_data = hashingTF.transform(indexed_data)

In [0]:
idf = IDF(inputCol="tf_features", outputCol="tfidf_features")
idfModel = idf.fit(tf_features_data)
tfidf_features_data = idfModel.transform(tf_features_data)

In [0]:
train_tfidf, test_tfidf = tfidf_features_data.randomSplit([0.75, 0.25], seed=9)

In [0]:
naive_bayes_tfidf = NaiveBayes(modelType="multinomial", labelCol="category_index", featuresCol="tfidf_features")
naive_bayes_tfidf_model = naive_bayes_tfidf.fit(train_tfidf)
predictions_tfidf = naive_bayes_tfidf_model.transform(test_tfidf)

In [0]:
model_evaluator_tfidf = MulticlassClassificationEvaluator(labelCol="category_index", predictionCol="prediction", metricName="accuracy")
accuracy_tfidf = model_evaluator_tfidf.evaluate(predictions_tfidf)

In [0]:
print(f"Accuracy using NB Classifier for tf-idf vectorized features = {round(accuracy_tfidf * 100, 3)}%")