In [43]:
import findspark
findspark.init()
import pyspark as ps
from pyspark.sql import SparkSession
spark=SparkSession.builder.getOrCreate()
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
df=spark.sql("select 'spark' as hello")
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



# Loading Dataset

In [2]:
data = spark.read.csv("helloj1.csv",header=True,inferSchema=True)
data.show()


+--------------------+------+
|             Reviews|labels|
+--------------------+------+
|Wow... Loved this...|     1|
|  Crust is not good.|     0|
|Not tasty and the...|     0|
|Stopped by during...|     1|
|The selection on ...|     1|
|Now I am getting ...|     0|
|Honeslty it didn'...|     0|
|The potatoes were...|     0|
|The fries were gr...|     1|
|      A great touch.|     1|
|Service was very ...|     1|
|  Would not go back.|     0|
|The cashier had n...|     0|
|I tried the Cape ...|     1|
|I was disgusted b...|     0|
|I was shocked bec...|     0|
| Highly recommended.|     1|
|Waitress was a li...|     0|
|This place is not...|     0|
|did not like at all.|     0|
+--------------------+------+
only showing top 20 rows



# Tokenize Reviews Text into Words

In [3]:
from pyspark.ml.feature import Tokenizer,RegexTokenizer
from pyspark.sql.functions import col,udf 
from pyspark.sql.types import IntegerType

In [4]:
tokenizer=Tokenizer(inputCol="Reviews",outputCol="words")
regexTokenizer=RegexTokenizer(inputCol="Reviews",outputCol="words",pattern="\\W")

countTokens=udf(lambda w: len(w), IntegerType())



In [5]:
tokenized=tokenizer.transform(data)

In [6]:
tokenized.show()

+--------------------+------+--------------------+
|             Reviews|labels|               words|
+--------------------+------+--------------------+
|Wow... Loved this...|     1|[wow..., loved, t...|
|  Crust is not good.|     0|[crust, is, not, ...|
|Not tasty and the...|     0|[not, tasty, and,...|
|Stopped by during...|     1|[stopped, by, dur...|
|The selection on ...|     1|[the, selection, ...|
|Now I am getting ...|     0|[now, i, am, gett...|
|Honeslty it didn'...|     0|[honeslty, it, di...|
|The potatoes were...|     0|[the, potatoes, w...|
|The fries were gr...|     1|[the, fries, were...|
|      A great touch.|     1|  [a, great, touch.]|
|Service was very ...|     1|[service, was, ve...|
|  Would not go back.|     0|[would, not, go, ...|
|The cashier had n...|     0|[the, cashier, ha...|
|I tried the Cape ...|     1|[i, tried, the, c...|
|I was disgusted b...|     0|[i, was, disguste...|
|I was shocked bec...|     0|[i, was, shocked,...|
| Highly recommended.|     1|[h

# Transform the reviews text data into numeric features 

In [7]:
from pyspark.ml.feature import HashingTF

In [8]:

hashing = HashingTF(inputCol="words", outputCol="hashedValues", numFeatures=pow(2,4))
hashed_df = hashing.transform(tokenized)
hashed_df.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------+------+--------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+
|Reviews                                                                                                        |labels|words                                                                                                                                 |hashedValues                                                                                     |
+---------------------------------------------------------------------------------------------------------------+------+--------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------

In [9]:
hashed_df.show()

+--------------------+------+--------------------+--------------------+
|             Reviews|labels|               words|        hashedValues|
+--------------------+------+--------------------+--------------------+
|Wow... Loved this...|     1|[wow..., loved, t...|(16,[3,6,13],[1.0...|
|  Crust is not good.|     0|[crust, is, not, ...|(16,[7,9,10,13],[...|
|Not tasty and the...|     0|[not, tasty, and,...|(16,[1,3,6,11,13]...|
|Stopped by during...|     1|[stopped, by, dur...|(16,[0,1,2,6,7,8,...|
|The selection on ...|     1|[the, selection, ...|(16,[1,2,3,5,8,10...|
|Now I am getting ...|     0|[now, i, am, gett...|(16,[0,1,3,10,11,...|
|Honeslty it didn'...|     0|[honeslty, it, di...|(16,[0,1,4,6,10,1...|
|The potatoes were...|     0|[the, potatoes, w...|(16,[0,1,2,3,4,6,...|
|The fries were gr...|     1|[the, fries, were...|(16,[1,2,11,14],[...|
|      A great touch.|     1|  [a, great, touch.]|(16,[3,14],[2.0,1...|
|Service was very ...|     1|[service, was, ve...|(16,[6,8,11,12

In [10]:
hashed_df.printSchema

<bound method DataFrame.printSchema of DataFrame[Reviews: string, labels: string, words: array<string>, hashedValues: vector]>

# Converting Labelled Column to Numerical Column

In [11]:
from pyspark.sql.types import IntegerType
data_df = hashed_df.withColumn("labels", hashed_df["labels"].cast(IntegerType()))

In [12]:
data_df.printSchema

<bound method DataFrame.printSchema of DataFrame[Reviews: string, labels: int, words: array<string>, hashedValues: vector]>

# Dropping Nulls

In [13]:
data_df=data_df.na.drop()

# Spliiting Data into Train and Test

In [14]:
final_data=data_df.select("hashedValues","labels")
train_data,test_data=final_data.randomSplit([0.7,0.3])

In [15]:
final_data.show()

+--------------------+------+
|        hashedValues|labels|
+--------------------+------+
|(16,[3,6,13],[1.0...|     1|
|(16,[7,9,10,13],[...|     0|
|(16,[1,3,6,11,13]...|     0|
|(16,[0,1,2,6,7,8,...|     1|
|(16,[1,2,3,5,8,10...|     1|
|(16,[0,1,3,10,11,...|     0|
|(16,[0,1,4,6,10,1...|     0|
|(16,[0,1,2,3,4,6,...|     0|
|(16,[1,2,11,14],[...|     1|
|(16,[3,14],[2.0,1...|     1|
|(16,[6,8,11,12],[...|     1|
|(16,[3,13,14,15],...|     0|
|(16,[0,1,3,6,7,8,...|     0|
|(16,[1,2,10,11,12...|     1|
|(16,[0,3,11,12,13...|     0|
|(16,[5,8,11,12,15...|     0|
|      (16,[1],[2.0])|     1|
|(16,[3,6,7,8,11,1...|     0|
|(16,[6,9,10,11,12...|     0|
|(16,[2,10,11,13,1...|     0|
+--------------------+------+
only showing top 20 rows



In [16]:
spark.read.format('jdbc')

<pyspark.sql.readwriter.DataFrameReader at 0x1e3ce2d26d0>

# Logistic Regression

In [17]:
from pyspark.ml.classification import LogisticRegression

In [18]:

model=LogisticRegression(featuresCol="hashedValues",labelCol="labels",maxIter=100, regParam=0.8, elasticNetParam=0.8)
model=model.fit(train_data)
model

LogisticRegressionModel: uid=LogisticRegression_e6f4d5e79f6f, numClasses=2, numFeatures=16

# Predcitions

In [24]:
test_prediction = model.transform(test_data)

test_prediction.show()

+--------------------+------+--------------------+--------------------+----------+
|        hashedValues|labels|       rawPrediction|         probability|prediction|
+--------------------+------+--------------------+--------------------+----------+
|(16,[0,1,2,3,4,5,...|     0|[-0.0321195494221...|[0.49197080291970...|       1.0|
|(16,[0,1,2,3,4,5,...|     0|[-0.0321195494221...|[0.49197080291970...|       1.0|
|(16,[0,1,2,3,4,5,...|     0|[-0.0321195494221...|[0.49197080291970...|       1.0|
|(16,[0,1,2,3,4,5,...|     0|[-0.0321195494221...|[0.49197080291970...|       1.0|
|(16,[0,1,2,3,4,5,...|     1|[-0.0321195494221...|[0.49197080291970...|       1.0|
|(16,[0,1,2,3,4,5,...|     0|[-0.0321195494221...|[0.49197080291970...|       1.0|
|(16,[0,1,2,3,4,5,...|     0|[-0.0321195494221...|[0.49197080291970...|       1.0|
|(16,[0,1,2,3,4,5,...|     0|[-0.0321195494221...|[0.49197080291970...|       1.0|
|(16,[0,1,2,3,4,5,...|     0|[-0.0321195494221...|[0.49197080291970...|       1.0|
|(16

# Area Under ROC 

In [27]:
results = test_prediction.select('probability', 'labels')

In [29]:
results_collect = results.collect()

In [33]:
results_list = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect]
scoreAndLabels = sc.parallelize(results_list)

In [41]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

In [42]:
metrics = BinaryClassificationMetrics(scoreAndLabels)
print("AUC-value: " + str(round(metrics.areaUnderROC,4)))

AUC-value: 0.5
