In [1]:
#connect drive to Colab
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
#Download Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [3]:
#Install Apache Spark with Hadoop
#!wget -q https://dlcdn.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz -P drive/MyDrive/Colab_Notebooks/Amazon_reviews/Resources

In [4]:
#unzip
!tar xf drive/MyDrive/Colab_Notebooks/Amazon_reviews/Resources/spark-3.2.0-bin-hadoop3.2.tgz

In [5]:
!pip install -q findspark

In [6]:
#set PySpark enviroment
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" 
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

In [7]:
#initate PySPark
import findspark
findspark.init()

In [8]:
from pyspark.sql import SparkSession
#create a Spark session
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()
#print the SparkSession variable.
spark  

In [9]:
import pyspark
#from pyspark.sql.functions import isnan, when, count, col
import pyspark.sql.functions as F
from pyspark.sql.functions import col, trim, lower, regexp_replace

In [10]:
import numpy as np
import pandas as pd

In [11]:
filePath = "drive/MyDrive/Colab_Notebooks/Amazon_reviews/preprocess_data"
df = spark.read.json(filePath)
df.printSchema()

root
 |-- categories: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- vote: double (nullable = true)



In [12]:
df.show()

+----------+-------+--------------------+--------------------+--------------------+--------+----+
|categories|overall|          reviewText|        reviewerName|             summary|verified|vote|
+----------+-------+--------------------+--------------------+--------------------+--------+----+
|  Software|    5.0|As someone who ha...|   Loves Books in MD|Learn Adobe Photo...|   false|null|
|  Software|    4.0|I've been running...|           Mindcrime|Great product, bu...|   false|14.0|
|  Software|    1.0|December 13, 2008...|         James Smith|Amazon, PBJWORLD ...|   false| 5.0|
|  Software|    2.0|I have been a Qui...|     Lance_big_daddy|Intuit has lost i...|   false|31.0|
|  Software|    1.0|This is by far th...|              Deimos|        Garbage.....|   false|null|
|  Software|    1.0|I have had Acroni...|                Mark|After image is ma...|   false|null|
|  Software|    1.0|I've used Turbota...|        Torrey Pines|Very buggy this year|   false|null|
|  Software|    3.0|

In [13]:
def sparkShape(dataFrame):
    return (dataFrame.count(), len(dataFrame.columns))

In [14]:
pyspark.sql.dataframe.DataFrame.shape = sparkShape
print(df.shape())

(67691433, 7)


In [15]:
df.printSchema()

root
 |-- categories: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- vote: double (nullable = true)



In [16]:
def removePunctuation(column):
     return trim(lower(regexp_replace(column,'[^\sa-zA-Z0-9]', ''))).alias('stopped')

In [17]:
df = df.withColumn("reviewText_cleaned",removePunctuation(col("reviewText")))

In [18]:
df.select('reviewText_cleaned').show()

+--------------------+
|  reviewText_cleaned|
+--------------------+
|as someone who ha...|
|ive been running ...|
|december 13 2008\...|
|i have been a qui...|
|this is by far th...|
|i have had acroni...|
|ive used turbotax...|
|the software on i...|
|easy to install a...|
|these comments ar...|
|this is the first...|
|here is a gem of ...|
|for anyone who ha...|
|f you like will f...|
|like many have sa...|
|i honestly havent...|
|when i originally...|
|ive been using ka...|
|it is what it is ...|
|i have been a big...|
+--------------------+
only showing top 20 rows



In [19]:
df = df.withColumn("summary_cleaned",removePunctuation(col("summary")))

In [20]:
df.select('summary_cleaned').show()

+--------------------+
|     summary_cleaned|
+--------------------+
|learn adobe photo...|
|great product but...|
|amazon pbjworld  ...|
|intuit has lost i...|
|             garbage|
|after image is ma...|
|very buggy this year|
|the software is g...|
|easy to install r...|
|buggy as crp unle...|
|         easy to use|
|clean easy to use...|
|a great place to ...|
|funny but i was e...|
|it is a shame the...|
|       wordperfect 5|
|great internet pr...|
|    solid protection|
|its fine but noth...|
|classic video aud...|
+--------------------+
only showing top 20 rows



In [21]:
df.printSchema()

root
 |-- categories: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- vote: double (nullable = true)
 |-- reviewText_cleaned: string (nullable = true)
 |-- summary_cleaned: string (nullable = true)



In [22]:
df = df.drop("reviewText","summary")

In [23]:
df.printSchema()

root
 |-- categories: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- vote: double (nullable = true)
 |-- reviewText_cleaned: string (nullable = true)
 |-- summary_cleaned: string (nullable = true)



In [24]:
df = df.drop("reviewerName")

In [25]:
#because this data set is large
#only sampe 1% of it
df_sample = df.sample(fraction=0.01, seed=42)

In [26]:
#df.write.format('json').mode('overwrite').save("drive/MyDrive/Colab_Notebooks/Amazon_reviews/cleaned_data")

In [27]:
df_sample.write.format('json').mode('overwrite').save("drive/MyDrive/Colab_Notebooks/Amazon_reviews/sample_data")