In [1]:
import os
import pyspark.sql.types as typ
import pyspark.sql.functions as F

In [2]:
# load modules
from pyspark.sql import SparkSession
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.feature import VectorAssembler 
from pyspark.mllib.linalg import Vectors
from pyspark.sql.functions import col 
from pyspark.mllib.evaluation import MulticlassMetrics

import os

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("data preprocessing") \
    .config("spark.executor.memory", '8g') \
    .config('spark.executor.cores', '4') \
    .config('spark.cores.max', '4') \
    .config("spark.driver.memory",'8g') \
    .getOrCreate()

sc = spark.sparkContext

In [4]:
from pyspark.ml import Pipeline 
from pyspark.ml.feature import CountVectorizer,StringIndexer, RegexTokenizer,StopWordsRemover
from pyspark.sql.functions import col, udf,regexp_replace,isnull
from pyspark.sql.types import StringType,IntegerType
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [9]:
# read in data
data_file = "uci-news-aggregator.csv"
news_data = spark.read.csv(data_file, inferSchema=True, header = True)

In [10]:
news_data.show()

+---+--------------------+--------------------+--------------------+--------+--------------------+--------------------+-------------+
| ID|               TITLE|                 URL|           PUBLISHER|CATEGORY|               STORY|            HOSTNAME|    TIMESTAMP|
+---+--------------------+--------------------+--------------------+--------+--------------------+--------------------+-------------+
|  1|Fed official says...|http://www.latime...|   Los Angeles Times|       b|ddUyU0VZz0BRneMio...|     www.latimes.com|1394470370698|
|  2|Fed's Charles Plo...|http://www.livemi...|            Livemint|       b|ddUyU0VZz0BRneMio...|    www.livemint.com|1394470371207|
|  3|US open: Stocks f...|http://www.ifamag...|        IFA Magazine|       b|ddUyU0VZz0BRneMio...| www.ifamagazine.com|1394470371550|
|  4|Fed risks falling...|http://www.ifamag...|        IFA Magazine|       b|ddUyU0VZz0BRneMio...| www.ifamagazine.com|1394470371793|
|  5|Fed's Plosser: Na...|http://www.moneyn...|           Mone

In [12]:
news_data.count()

422937

In [13]:
title_category = news_data.select("TITLE", "CATEGORY")

In [14]:
title_category.show()

+--------------------+--------+
|               TITLE|CATEGORY|
+--------------------+--------+
|Fed official says...|       b|
|Fed's Charles Plo...|       b|
|US open: Stocks f...|       b|
|Fed risks falling...|       b|
|Fed's Plosser: Na...|       b|
|Plosser: Fed May ...|       b|
|Fed's Plosser: Ta...|       b|
|Fed's Plosser exp...|       b|
|US jobs growth la...|       b|
|ECB unlikely to e...|       b|
|ECB unlikely to e...|       b|
|EU's half-baked b...|       b|
|Europe reaches cr...|       b|
|ECB FOCUS-Stronge...|       b|
|EU aims for deal ...|       b|
|Forex - Pound dro...|       b|
|Noyer Says Strong...|       b|
|EU Week Ahead Mar...|       b|
|ECB member Noyer ...|       b|
|Euro Anxieties Wa...|       b|
+--------------------+--------+
only showing top 20 rows



In [16]:
def null_value_count(df):
    null_columns_counts = []
    numRows = df.count()
    for k in df.columns:
        nullRows = df.where(col(k).isNull()).count()
    if(nullRows > 0):
        temp = k,nullRows
        null_columns_counts.append(temp)
        
    return(null_columns_counts)

In [17]:
null_columns_count_list = null_value_count(title_category)

In [18]:
spark.createDataFrame(null_columns_count_list, ['Column_With_Null_Value' , 'Null_Values_Count']).show()

+----------------------+-----------------+
|Column_With_Null_Value|Null_Values_Count|
+----------------------+-----------------+
|              CATEGORY|              516|
+----------------------+-----------------+



In [19]:
title_category = title_category.dropna()

In [20]:
title_category.count()

422421

In [21]:
title_category.select("Category").distinct().count()

265

In [22]:
title_category.groupBy("Category").count().orderBy(col("count").desc()).show(truncate = False)

+--------------------+------+
|Category            |count |
+--------------------+------+
|e                   |152127|
|b                   |115935|
|t                   |108237|
|m                   |45616 |
|Us Magazine         |31    |
|Contactmusic.com    |20    |
|GossipCop           |20    |
|CBS News            |12    |
|Complex.com         |12    |
|HipHopDX            |11    |
|The Hollywood Gossip|11    |
|We Got This Covered |10    |
|HeadlinePlanet.com  |10    |
|Gamepur             |8     |
|WorstPreviews.com   |7     |
|TooFab.com          |7     |
|Consequence of Sound|7     |
|Wetpaint            |7     |
|The Escapist        |6     |
|NBC Bay Area        |5     |
+--------------------+------+
only showing top 20 rows



In [25]:
title_category.groupBy("TITLE").count().orderBy(col("count").desc()).show(truncate=False)

+----------------------------------------------------------------------------------+-----+
|TITLE                                                                             |count|
+----------------------------------------------------------------------------------+-----+
|The article requested cannot be found! Please refresh your browser or go back  ...|145  |
|Business Highlights                                                               |59   |
|Posted by Parvez Jabri                                                            |59   |
|Posted by Imaduddin                                                               |53   |
|Posted by Shoaib-ur-Rehman Siddiqui                                               |52   |
|(click the phrases to see a list)                                                 |51   |
|Business Wire                                                                     |41   |
|PR Newswire                                                                       |38   |

In [26]:
title_category = title_category.withColumn("only_str", regexp_replace(col('TITLE'), '\d+', ''))

In [27]:
title_category.select("TITLE", "only_str").show(truncate = False)

+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|TITLE                                                                      |only_str                                                                   |
+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|Fed official says weak data caused by weather, should not slow taper       |Fed official says weak data caused by weather, should not slow taper       |
|Fed's Charles Plosser sees high bar for change in pace of tapering         |Fed's Charles Plosser sees high bar for change in pace of tapering         |
|US open: Stocks fall after Fed official hints at accelerated tapering      |US open: Stocks fall after Fed official hints at accelerated tapering      |
|Fed risks falling 'behind the curve', Charles Plosser says                 

In [29]:
regex_tokenizer = RegexTokenizer(inputCol="only_str", outputCol="words", pattern="\\W")
raw_words = regex_tokenizer.transform(title_category)

In [30]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
words_df = remover.transform(raw_words)

In [34]:
indexer = StringIndexer(inputCol="CATEGORY", outputCol="categoryIndex")
feature_data = indexer.fit(words_df).transform(words_df)

In [35]:
feature_data.select("CATEGORY","categoryIndex").show()

+--------+-------------+
|CATEGORY|categoryIndex|
+--------+-------------+
|       b|          1.0|
|       b|          1.0|
|       b|          1.0|
|       b|          1.0|
|       b|          1.0|
|       b|          1.0|
|       b|          1.0|
|       b|          1.0|
|       b|          1.0|
|       b|          1.0|
|       b|          1.0|
|       b|          1.0|
|       b|          1.0|
|       b|          1.0|
|       b|          1.0|
|       b|          1.0|
|       b|          1.0|
|       b|          1.0|
|       b|          1.0|
|       b|          1.0|
+--------+-------------+
only showing top 20 rows



In [37]:
cv = CountVectorizer(inputCol="filtered", outputCol="features")
model = cv.fit(feature_data)
countVectorizer_feateures = model.transform(feature_data)

In [39]:
(trainingData, testData) = countVectorizer_feateures.randomSplit([0.8, 0.2],seed = 11)

In [41]:
nb = NaiveBayes(modelType="multinomial",labelCol="categoryIndex", featuresCol="features")
nbModel = nb.fit(trainingData)
nb_predictions = nbModel.transform(testData)

In [42]:
nb_predictions.select("prediction", "categoryIndex", "features").show(5)

+----------+-------------+--------------------+
|prediction|categoryIndex|            features|
+----------+-------------+--------------------+
|       0.0|          0.0|(49043,[540,620,1...|
|       0.0|         74.0|(49043,[74,399,57...|
|       0.0|         20.0|(49043,[74,113,39...|
|       0.0|         23.0|(49043,[53,2046,2...|
|       0.0|          0.0|(49043,[6,21,22,5...|
+----------+-------------+--------------------+
only showing top 5 rows



In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="categoryIndex", predictionCol="prediction", metricName="accuracy")
nb_accuracy = evaluator.evaluate(nb_predictions)
print("Accuracy of NaiveBayes is = %g"% (nb_accuracy))
print("Test Error of NaiveBayes = %g " % (1.0 - nb_accuracy))