<a href="https://colab.research.google.com/github/Datangels/Machine_Learning_with_PySpark/blob/master/pyspark_NLP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Google Colab configuration & creation the SparkSession Object**

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
import numpy as np
import seaborn as sns
from matplotlib import pyplot as plt

## **Read the Dataset**

In [0]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [0]:
dataset_not_clean = spark.read.csv('/content/drive/My Drive/pycharm_colab_training/dataset/books_sentences.csv',inferSchema=True, header=True)

## **Exploratory Data Analysis**


In [0]:
print((dataset_not_clean.count(), len(dataset_not_clean.columns)))
# dataset_not_clean.printSchema()
# dataset_not_clean.describe().show()
# print((dataset_not_clean.count(), len(dataset_not_clean.columns)))

(7087, 2)


## **Feature Engineering**

In [0]:
text_df = dataset_not_clean.filter(((dataset_not_clean.Sentiment =='1') | (dataset_not_clean.Sentiment =='0')))
text_df = text_df.withColumn("Label", text_df.Sentiment.cast('float')).drop('Sentiment')

from pyspark.sql.functions import length
from pyspark.sql.functions import rand
text_df = text_df.withColumn('length',length(text_df['Review']))
text_df.orderBy(rand()).show(10,False)

+------------------------------------------------------------------------+-----+------+
|Review                                                                  |Label|length|
+------------------------------------------------------------------------+-----+------+
|Brokeback Mountain was boring.                                          |0.0  |30    |
|He's like,'YEAH I GOT ACNE AND I LOVE BROKEBACK MOUNTAIN '..            |1.0  |60    |
|Always knows what I want, not guy crazy, hates Harry Potter..           |0.0  |61    |
|Harry Potter -- the other two suck.                                     |0.0  |35    |
|i love da vinci code....                                                |1.0  |24    |
|I am going to start reading the Harry Potter series again because that i|1.0  |72    |
|Oh, and Brokeback Mountain is a TERRIBLE movie...                       |0.0  |49    |
|da vinci code sucks...                                                  |0.0  |22    |
|My dad's being stupid about bro

In [0]:
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover

tokenization = Tokenizer(inputCol='Review',outputCol='tokens')
tokenized_df = tokenization.transform(text_df)
stopword_removal = StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')
refined_text_df = stopword_removal.transform(tokenized_df)

refined_text_df.show(10)

+--------------------+-----+------+--------------------+--------------------+
|              Review|Label|length|              tokens|      refined_tokens|
+--------------------+-----+------+--------------------+--------------------+
|The Da Vinci Code...|  1.0|    39|[the, da, vinci, ...|[da, vinci, code,...|
|this was the firs...|  1.0|    72|[this, was, the, ...|[first, clive, cu...|
|i liked the Da Vi...|  1.0|    32|[i, liked, the, d...|[liked, da, vinci...|
|i liked the Da Vi...|  1.0|    32|[i, liked, the, d...|[liked, da, vinci...|
|I liked the Da Vi...|  1.0|    72|[i, liked, the, d...|[liked, da, vinci...|
|that's not even a...|  1.0|    72|[that's, not, eve...|[even, exaggerati...|
|I loved the Da Vi...|  1.0|    72|[i, loved, the, d...|[loved, da, vinci...|
|i thought da vinc...|  1.0|    57|[i, thought, da, ...|[thought, da, vin...|
|The Da Vinci Code...|  1.0|    45|[the, da, vinci, ...|[da, vinci, code,...|
|I thought the Da ...|  1.0|    51|[i, thought, the,...|[thought

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import *
len_udf = udf(lambda s: len(s), IntegerType())
refined_text_df = refined_text_df.withColumn("token_count",len_udf(col('refined_tokens')))
refined_text_df.orderBy(rand()).show(10)

+--------------------+-----+------+--------------------+--------------------+-----------+
|              Review|Label|length|              tokens|      refined_tokens|token_count|
+--------------------+-----+------+--------------------+--------------------+-----------+
|I hate Harry Potter.|  0.0|    20|[i, hate, harry, ...|[hate, harry, pot...|          3|
|I LOVE BROKEBACK ...|  1.0|    26|[i, love, brokeba...|[love, brokeback,...|          3|
|Brokeback mountai...|  1.0|    35|[brokeback, mount...|[brokeback, mount...|          3|
|by the way, the D...|  0.0|    62|[by, the, way,, t...|[way,, da, vinci,...|          7|
|I love Harry Pott...|  1.0|    21|[i, love, harry, ...|[love, harry, pot...|          3|
|I want to be here...|  1.0|    72|[i, want, to, be,...|[want, love, harr...|          7|
|I love Brokeback ...|  1.0|    29|[i, love, brokeba...|[love, brokeback,...|          3|
|Brokeback Mountai...|  0.0|    40|[brokeback, mount...|[brokeback, mount...|          4|
|I watched

In [0]:
from pyspark.ml.feature import CountVectorizer
count_vec = CountVectorizer(inputCol='refined_tokens',outputCol='features')
cv_text_df = count_vec.fit(refined_text_df).transform(refined_text_df)
cv_text_df.select(['refined_tokens','token_count','features','Label']).show(10)

+--------------------+-----------+--------------------+-----+
|      refined_tokens|token_count|            features|Label|
+--------------------+-----------+--------------------+-----+
|[da, vinci, code,...|          5|(2302,[0,1,4,43,2...|  1.0|
|[first, clive, cu...|          9|(2302,[11,51,229,...|  1.0|
|[liked, da, vinci...|          5|(2302,[0,1,4,53,3...|  1.0|
|[liked, da, vinci...|          5|(2302,[0,1,4,53,3...|  1.0|
|[liked, da, vinci...|          8|(2302,[0,1,4,53,6...|  1.0|
|[even, exaggerati...|          6|(2302,[46,229,271...|  1.0|
|[loved, da, vinci...|          8|(2302,[0,1,22,30,...|  1.0|
|[thought, da, vin...|          7|(2302,[0,1,4,228,...|  1.0|
|[da, vinci, code,...|          6|(2302,[0,1,4,33,2...|  1.0|
|[thought, da, vin...|          7|(2302,[0,1,4,223,...|  1.0|
+--------------------+-----------+--------------------+-----+
only showing top 10 rows



In [0]:
from pyspark.ml.feature import VectorAssembler

model_text_df = cv_text_df.select(['features','token_count','Label'])

df_assembler = VectorAssembler(inputCols=['features','token_count'],outputCol='features_vec')
model_text_df = df_assembler.transform(model_text_df)

## **Splitting the Dataset**

In [0]:
training_df, test_df = model_text_df.randomSplit([0.75,0.25])

## **Build and Train Linear Regression Mode**

In [0]:
from pyspark.ml.classification import LogisticRegression
log_reg = LogisticRegression(featuresCol='features_vec',labelCol='Label').fit(training_df)
results = log_reg.evaluate(test_df).predictions
results.show(10)

+--------------------+-----------+-----+--------------------+--------------------+--------------------+----------+
|            features|token_count|Label|        features_vec|       rawPrediction|         probability|prediction|
+--------------------+-----------+-----+--------------------+--------------------+--------------------+----------+
|(2302,[0,1,4,5,64...|          6|  1.0|(2303,[0,1,4,5,64...|[-17.648497042873...|[2.16448764406048...|       1.0|
|(2302,[0,1,4,5,22...|          9|  1.0|(2303,[0,1,4,5,22...|[-8.7506228304995...|[1.58337586536483...|       1.0|
|(2302,[0,1,4,5,30...|          5|  1.0|(2303,[0,1,4,5,30...|[-20.437778527709...|[1.33041018958616...|       1.0|
|(2302,[0,1,4,5,36...|          5|  1.0|(2303,[0,1,4,5,36...|[-12.212404455274...|[4.96841628432399...|       1.0|
|(2302,[0,1,4,5,65...|          5|  1.0|(2303,[0,1,4,5,65...|[-16.296294485445...|[8.36775956994756...|       1.0|
|(2302,[0,1,4,11,4...|          7|  1.0|(2303,[0,1,4,11,4...|[-9.6682907018195..

## **Evaluate Model on Test Data**

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
true_postives = results[(results.Label == 1) & (results.prediction == 1)].count()
true_negatives = results[(results.Label == 0) & (results.prediction == 0)].count()
false_positives = results[(results.Label == 0) & (results.prediction == 1)].count()
false_negatives = results[(results.Label == 1) & (results.prediction == 0)].count()

recall = float(true_postives)/(true_postives + false_negatives)
print("Recall: " + str(recall))
precision = float(true_postives) / (true_postives + false_positives)
print("Precision: " + str(precision))
accuracy = float((true_postives + true_negatives) /(results.count()))
print("Accuracy: " + str(accuracy))

Recall: 0.9847715736040609
Precision: 0.967098703888335
Accuracy: 0.9732292247629671
