In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Setting up spark

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [3]:
!wget -q https://dlcdn.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz

In [4]:
!apt-get install file

Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following additional packages will be installed:
  libmagic-mgc libmagic1
The following NEW packages will be installed:
  file libmagic-mgc libmagic1
0 upgraded, 3 newly installed, 0 to remove and 37 not upgraded.
Need to get 275 kB of archives.
After this operation, 5,297 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu bionic-updates/main amd64 libmagic-mgc amd64 1:5.32-2ubuntu0.4 [184 kB]
Get:2 http://archive.ubuntu.com/ubuntu bionic-updates/main amd64 libmagic1 amd64 1:5.32-2ubuntu0.4 [68.6 kB]
Get:3 http://archive.ubuntu.com/ubuntu bionic-updates/main amd64 file amd64 1:5.32-2ubuntu0.4 [22.1 kB]
Fetched 275 kB in 1s (493 kB/s)
Selecting previously unselected package libmagic-mgc.
(Reading database ... 155565 files and directories currently installed.)
Preparing to unpack .../libmagic-mgc_1%3a5.32-2ubuntu0.4_amd64.deb ...
Unpacking libmagic-mgc (1:5.32

In [5]:
!ls

drive  sample_data  spark-3.2.0-bin-hadoop3.2.tgz


In [6]:
!file spark-3.2.0-bin-hadoop3.2.tgz

spark-3.2.0-bin-hadoop3.2.tgz: gzip compressed data, from Unix


In [7]:
!tar xf spark-3.2.0-bin-hadoop3.2.tgz

In [8]:
!pip install -q findspark

In [9]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

In [10]:
import findspark
findspark.init()

In [11]:
findspark.find()

'/content/spark-3.2.0-bin-hadoop3.2'

#### Code

In [12]:
from pyspark.sql import SparkSession
import pyspark

spark = SparkSession.builder.master("local[4]").appName("Colab").config('spark.ui.port', '4050').getOrCreate()

In [13]:
sc = spark.sparkContext
sc.getConf().getAll()

[('spark.app.name', 'Colab'),
 ('spark.driver.host', '6db2e71ff6fe'),
 ('spark.master', 'local[4]'),
 ('spark.executor.id', 'driver'),
 ('spark.sql.warehouse.dir', 'file:/content/spark-warehouse'),
 ('spark.driver.port', '41307'),
 ('spark.ui.port', '4050'),
 ('spark.rdd.compress', 'True'),
 ('spark.app.id', 'local-1639188694967'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.startTime', '1639188693150')]

In [14]:
spark

In [15]:
from pyspark.sql.types import StructType, IntegerType, StringType
schema = StructType().add("basemodel_review",StringType(),True).add("review",StringType(),True).add("result",IntegerType(),True)
df = spark.read.schema(schema).csv("/content/drive/My Drive/DB/Review/reviews_processed_final/review1.csv", sep=",", multiLine=True, header="true")
df = df.select("review","result")

df.printSchema()
df.show(5)

root
 |-- review: string (nullable = true)
 |-- result: integer (nullable = true)

+--------------------+------+
|              review|result|
+--------------------+------+
|"""apparently pri...|     1|
|"""store pretty g...|     1|
|"""call wvm recom...|     1|
|"""ive stay many ...|     0|
|"""food alway gre...|     1|
+--------------------+------+
only showing top 5 rows



In [None]:
df.count()

In [None]:
df.select("result").distinct().show()

In [None]:
df = df.select("review","result")
df.show(5)

In [None]:
#Balancing Data
positive_reviews = df.filter(df.result == 1).limit(335748)
negative_reviews = df.filter(df.result==0).limit(335748)

In [None]:
print(positive_reviews.count())
print(negative_reviews.count())

In [None]:
df_main = positive_reviews.union(negative_reviews)
df_main.printSchema()
df_main.show(5)

In [None]:
df_main.count()

In [None]:
(train_set, val_set, test_set) = df.randomSplit([0.98, 0.00, 0.02], seed = 2000)

In [24]:
from pyspark.ml.feature import Tokenizer, CountVectorizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline


tokenizer = Tokenizer(inputCol="review", outputCol="words")
cv = CountVectorizer(inputCol="words", outputCol="features")  
label_stringIdx = StringIndexer(inputCol = "result", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, cv, label_stringIdx])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
test_df = pipelineFit.transform(test_set)
train_df.show(5)

+------+------+------+--------------------+-----+
|review|result| words|            features|label|
+------+------+------+--------------------+-----+
|  """"|     0|[""""]|(262144,[44772],[...|  1.0|
|  """"|     0|[""""]|(262144,[44772],[...|  1.0|
|  """"|     0|[""""]|(262144,[44772],[...|  1.0|
|  """"|     0|[""""]|(262144,[44772],[...|  1.0|
|  """"|     0|[""""]|(262144,[44772],[...|  1.0|
+------+------+------+--------------------+-----+
only showing top 5 rows



In [25]:
train_df = train_df.select('features','label')
test_df = test_df.select('features','label')
train_df.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(262144,[44772],[...|  1.0|
|(262144,[44772],[...|  1.0|
|(262144,[44772],[...|  1.0|
|(262144,[44772],[...|  1.0|
|(262144,[44772],[...|  1.0|
+--------------------+-----+
only showing top 5 rows



In [26]:
from pyspark.ml.classification import LogisticRegression
import time

start = time.time()
lr = LogisticRegression(maxIter=10)
lrModel = lr.fit(train_df)
end = time.time()

In [43]:
from pyspark.mllib.evaluation import MulticlassMetrics

train_predictions = lrModel.transform(train_df)

results1 = train_predictions.select(['prediction', 'label'])
predictionAndLabels1=results1.rdd
metrics1 = MulticlassMetrics(predictionAndLabels1)
cm1=metrics1.confusionMatrix().toArray()


train_accuracy=(cm1[0][0]+cm1[1][1])/cm1.sum()
print('train_accuracy : ', train_accuracy)



train_accuracy :  0.9469394084383184


In [44]:
predictions = lrModel.transform(test_df) 
test_accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(test_df.count())
print('test_accuracy : ',test_accuracy)

test_accuracy :  0.8913098427784237


In [45]:
from pyspark.mllib.evaluation import MulticlassMetrics
results = predictions.select(['prediction', 'label'])
predictionAndLabels=results.rdd
metrics = MulticlassMetrics(predictionAndLabels)



In [46]:
cm=metrics.confusionMatrix().toArray()
accuracy=(cm[0][0]+cm[1][1])/cm.sum()
precision=(cm[0][0])/(cm[0][0]+cm[1][0])
recall=(cm[0][0])/(cm[0][0]+cm[0][1])

F1_score = (2*(precision*recall))/(precision+recall)

In [47]:
print('F1_score : ',F1_score)

F1_score :  0.9179764993132916
