In [None]:
#Name - Pranav Girish Sankhe
#Name - Srivatsa Manjunath Hegde

In [1]:
# importing all the libraries needed.
# initializing spark
import findspark
findspark.init()
from pyspark.sql import SQLContext
from pyspark import SparkContext
sc =SparkContext()
sqlContext = SQLContext(sc)



In [2]:
# importing pyspark for some visualisation  
import pyspark as spark 

In [3]:
from pyspark.sql import SparkSession 
spark = SparkSession \
    .builder \
    .appName('Python Spark Basic example')\
    .config('spark.some.config.option', 'some-value')\
    .getOrCreate()

In [4]:
# reading the data.csv file.
df = spark.read.csv("data.csv", header = 'true')

In [5]:
# trying some basic pyspark SQL commands
df.printSchema()
print(type(df))

root
 |-- Category: string (nullable = true)
 |-- Descript: string (nullable = true)

<class 'pyspark.sql.dataframe.DataFrame'>


In [6]:
df.head()

Row(Category='sports', Descript='If you’ve never had your heart broken or your nerves thoroughly frayed by a sporting event, you know someone who has. As a young journalist at the Los Angeles Times in the 1990s, George Dohrmann would field late-night phone calls from rabid fans. He wondered, being a fan himself, why the depth of their devotion mystified him. “My blindness to the root causes of their thinking and conduct was also ignorance about myself,” he writes in his new book, “Superfans: Into the Heart of Obsessive Sports Fandom.” Dohrmann, who spent time as an investigative reporter at Sports Illustrated and is now a senior editor at The Athletic, spoke to everyday fans, academics and scientists about what it is that drives our vicarious competitive mania. Below, he talks about whether kids should be fans, his admiration for the soccer player Clint Dempsey and more.When did you first get the idea to write this book?It’s long been a curiosity of mine, why people were so intense abo

In [7]:
df.show(1)

+--------+--------------------+
|Category|            Descript|
+--------+--------------------+
|  sports|If you’ve never h...|
+--------+--------------------+
only showing top 1 row



In [8]:
from pyspark.sql.functions import col

df.groupBy('Category') \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+---------+-----+
| Category|count|
+---------+-----+
| business|   29|
|education|   28|
| politics|   19|
|   sports|   16|
+---------+-----+



In [9]:
df.groupBy("Descript") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+--------------------+-----+
|            Descript|count|
+--------------------+-----+
|EDUCATED A Memoir...|    2|
|THE NEWCOMERS Fin...|    2|
|MILAN — In the pr...|    1|
|With an important...|    1|
|H. Wayne Huizenga...|    1|
|WASHINGTON — The ...|    1|
|Since the Meredit...|    1|
|In your opinion, ...|    1|
|With its faux-mar...|    1|
|A bucket list is ...|    1|
|This fall, the Tr...|    1|
|New York Universi...|    1|
|It is arguably th...|    1|
|THE GOLDEN PASSPO...|    1|
|GANGNEUNG, South ...|    1|
|United States pro...|    1|
|GUADALAJARA, Mexi...|    1|
|On Wednesday, tho...|    1|
|Good Wednesday. H...|    1|
|The admissions pr...|    1|
+--------------------+-----+
only showing top 20 rows



In [10]:
# starting actual ML using PySpark for the data.csv 
# defining Pileline stages

# reference: https://spark.apache.org/docs/2.1.0/ml-features.html 
# referred this official documentation for syntax, accepted argument and function use.

from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

# regular expression tokenizer 
regexTokenizer = RegexTokenizer(inputCol="Descript", outputCol="words", pattern="\\W")

# stop words to count out the words that appear very frequently but do not contribute to the bag of words.
add_stopwords = ["a","an", "as","amp", "be", "but","by","c","http","https","he","her","I","to","rt","t","the"] 
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

# counting bag of words
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

In [11]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

# reference: https://spark.apache.org/docs/2.1.0/ml-pipeline.html
# referred this official documentation for syntax, accepted argument and function use.

# stage for getting the string index for the category column
label_stringIdx = StringIndexer(inputCol = "Category", outputCol = "label")

#creating pipeline from the above defined stages 
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(df)
dataset = pipelineFit.transform(df)

#visualising 
dataset.show(15)

+--------+--------------------+--------------------+--------------------+--------------------+-----+
|Category|            Descript|               words|            filtered|            features|label|
+--------+--------------------+--------------------+--------------------+--------------------+-----+
|  sports|If you’ve never h...|[if, you, ve, nev...|[if, you, ve, nev...|(2520,[0,1,2,3,4,...|  3.0|
|  sports|Since the Meredit...|[since, the, mere...|[since, meredith,...|(2520,[0,1,2,3,4,...|  3.0|
|  sports|New York Universi...|[new, york, unive...|[new, york, unive...|(2520,[0,1,2,3,4,...|  3.0|
|  sports|Between living al...|[between, living,...|[between, living,...|(2520,[0,1,2,3,4,...|  3.0|
|  sports|Emitting a high-d...|[emitting, a, hig...|[emitting, high, ...|(2520,[0,1,2,3,5,...|  3.0|
|  sports|There were more n...|[there, were, mor...|[there, were, mor...|(2520,[0,1,2,3,4,...|  3.0|
|  sports|WASHINGTON — In r...|[washington, in, ...|[washington, in, ...|(2520,[0,1,2,3,4,.

In [12]:
# splitting the data set into training and testing in 70:30 ratio.

(training, test) = dataset.randomSplit([0.7, 0.3], seed = 100)

print("Training Data Size: {}".format(training.count()))
print("Test Data Size: {}".format(test.count()))

Training Data Size: 61
Test Data Size: 31


In [None]:
# function to evaluate accuracy 

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def evaluate():    
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
    return evaluator.evaluate(predictions)

In [22]:
# Performing Logistic Regression for classification
# Referrence: https://spark.apache.org/docs/2.1.0/ml-classification-regression.html

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(training)
print("Printing a part of the resultant table")
predictions = lrModel.transform(test)
predictions.filter(predictions['prediction'] == 0) \
        .select("Descript","Category","probability","label","prediction") \
        .orderBy("probability", ascending=False) \
    .show(n = 15, truncate = 30)

print("Accuracy is: {}".format(evaluate()))

Printing a part of the resultant table
+------------------------------+--------+------------------------------+-----+----------+
|                      Descript|Category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|The investigation into Russ...|business|[0.9104856472580957,0.03093...|  0.0|       0.0|
|Within hours of announcing ...|business|[0.7565440038754787,0.19558...|  0.0|       0.0|
|In July, David J. Pecker, t...|business|[0.6287520504679609,0.07481...|  0.0|       0.0|
|SAN JUAN, P.R. — The messag...|business|[0.527104032198107,0.384314...|  0.0|       0.0|
|WASHINGTON — Few places hav...|politics|[0.4979607796561227,0.32447...|  2.0|       0.0|
|United States prosecutors h...|  sports|[0.47884750219749006,0.1688...|  3.0|       0.0|
|WASHINGTON — In recent mont...|  sports|[0.4557180541422677,0.11525...|  3.0|       0.0|
|The most surprising thing a...|business|[0.44561041544771984

In [25]:
# Performing Naive Bayes classification. 
# referrence: https://spark.apache.org/docs/2.1.0/ml-classification-regression.html#naive-bayes

from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(smoothing=1)
model = nb.fit(training)

predictions = model.transform(test)
print("Printing a part of the resultant table")
predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 15, truncate = 30)

print("Accuracy is: {}".format(evaluate()))

Printing a part of the resultant table
+------------------------------+--------+------------------------------+-----+----------+
|                      Descript|Category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|WASHINGTON — Few places hav...|politics|[1.0,1.140444494655158E-63,...|  2.0|       0.0|
|The most surprising thing a...|business|[1.0,1.6795386469208958E-77...|  0.0|       0.0|
|SAN JUAN, P.R. — The messag...|business|[1.0,2.056758764889174E-93,...|  0.0|       0.0|
|I didn’t mean to do it. It ...|business|[1.0,4.799941982738162E-121...|  0.0|       0.0|
|Within hours of announcing ...|business|[1.0,1.883509811154267E-135...|  0.0|       0.0|
|Every few weeks, an email a...|politics|[1.0,3.734264647499227E-139...|  2.0|       0.0|
|This fall, the Trump admini...|business|[1.0,6.0691876890222755E-14...|  0.0|       0.0|
|H. Wayne Huizenga, the entr...|business|[1.0,1.0289677732528

In [27]:
# Performing Random Forest Classifier
# Referrence: https://spark.apache.org/docs/2.1.0/ml-classification-regression.html#random-forest-classifier

from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 6, \
                            maxBins = 32)

# Train model with Training Data
rfModel = rf.fit(training)

predictions = rfModel.transform(test)
print("Printing a part of the resultant table")

predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 15, truncate = 30)
    
print("Accuracy is: {}".format(evaluate()))

Printing a part of the resultant table
+------------------------------+--------+------------------------------+-----+----------+
|                      Descript|Category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|The investigation into Russ...|business|[0.4587742153394832,0.21153...|  0.0|       0.0|
|H. Wayne Huizenga, the entr...|business|[0.43846124708624706,0.1811...|  0.0|       0.0|
|SAN JUAN, P.R. — The messag...|business|[0.43686789719941893,0.2899...|  0.0|       0.0|
|In July, David J. Pecker, t...|business|[0.4346132705014285,0.17830...|  0.0|       0.0|
|PLAINVILLE, Conn. — When Wi...|politics|[0.4207154709792683,0.15753...|  2.0|       0.0|
|Within hours of announcing ...|business|[0.4030883560883561,0.37318...|  0.0|       0.0|
|The most surprising thing a...|business|[0.39240909090909093,0.2755...|  0.0|       0.0|
|This fall, the Trump admini...|business|[0.33861808736872673