# Preprocessing  Data

In this step of the project we cleaned the data. We removed punctuations, special characters, and lowercased all words first, then tokenized the words for removing of stop words. Next the words were stemmed using lemmatization. Next the words that were not longer than 4 were removed. The last step was to split the data into train, validation, and test sets. 

### Import pyspark using Docker

In [1]:
import pyspark 
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

### Start Spark Session

In [3]:
spark = SparkSession.builder.appName('cleaning').getOrCreate()

### Load Data

In [4]:
df = spark.read.json('Movies_and_TV.json.gz')

In [5]:
### View Data
df = df.select('reviewText', 'verified')
df.show(5)

+--------------------+--------+
|          reviewText|verified|
+--------------------+--------+
|really happy they...|    true|
|Having lived in W...|    true|
|Excellent look in...|   false|
|More than anythin...|    true|
|This is a great m...|    true|
+--------------------+--------+
only showing top 5 rows



In [6]:
### droping na values 
df = df.na.drop()
df.count()

8757545

### Remove punctuations & special characters & lowercase words

In this step punctuations, special characters, and numbers were removed. Also, all words were now lowercased. 

In [7]:
### Clean Function 
def clean_text(c):
    c = lower(c)
    c = regexp_replace(c, "^rt ", "")
    c = regexp_replace(c, "[\=.]"," ")
    c = regexp_replace(c, "[^a-zA-Z0-9\\s]", "")
    c = regexp_replace(c, "  ", " ")
    c = regexp_replace(c, "   ", " ")
    c = regexp_replace(c, '\d+', "")
    return(c)

In [8]:
### View Clean 
df = df.withColumn("clean_text",clean_text(col('reviewText')))
df.show()

+--------------------+--------+--------------------+
|          reviewText|verified|          clean_text|
+--------------------+--------+--------------------+
|really happy they...|    true|really happy they...|
|Having lived in W...|    true|having lived in w...|
|Excellent look in...|   false|excellent look in...|
|More than anythin...|    true|more than anythin...|
|This is a great m...|    true|this is a great m...|
|This movie was in...|    true|this movie was in...|
|This is a fascina...|    true|this is a fascina...|
|This DVD appears ...|    true|this dvd appears ...|
|This movie is not...|    true|this movie is not...|
|So sorry I didn't...|    true|so sorry i didnt ...|
|Product received ...|    true|product received ...|
|Believe me when I...|    true|believe me when i...|
|This video arrive...|    true|this video arrive...|
|The Reunion of th...|   false|the reunion of th...|
|Wedding Music (3:...|   false|wedding music  ge...|
|This is truly a m...|   false|this is truly a

### Tokenize 

In [9]:
from pyspark.ml.feature import Tokenizer

In [10]:
### tokenize words 
tokenizer = Tokenizer(inputCol="clean_text", outputCol="token_text")
token = tokenizer.transform(df).select('verified', 'token_text')
token.show()

+--------+--------------------+
|verified|          token_text|
+--------+--------------------+
|    true|[really, happy, t...|
|    true|[having, lived, i...|
|   false|[excellent, look,...|
|    true|[more, than, anyt...|
|    true|[this, is, a, gre...|
|    true|[this, movie, was...|
|    true|[this, is, a, fas...|
|    true|[this, dvd, appea...|
|    true|[this, movie, is,...|
|    true|[so, sorry, i, di...|
|    true|[product, receive...|
|    true|[believe, me, whe...|
|    true|[this, video, arr...|
|   false|[the, reunion, of...|
|   false|[wedding, music, ...|
|   false|[this, is, truly,...|
|   false|[it, is, an, exce...|
|    true|[i, have, a, thin...|
|   false|[this, dvd, is, u...|
|    true|[just, brought, t...|
+--------+--------------------+
only showing top 20 rows



### Remove Stop words

Stop words were removed using the pyspark ml feature “StopWordsRemover”. 

In [11]:
from pyspark.ml.feature import StopWordsRemover

In [12]:
### remove stops words 
remover = StopWordsRemover(inputCol='token_text', outputCol='swr_text')
swr = remover.transform(token).select('verified','swr_text')
swr.show(5)

+--------+--------------------+
|verified|            swr_text|
+--------+--------------------+
|    true|[really, happy, g...|
|    true|[lived, west, new...|
|   false|[excellent, look,...|
|    true|[anything, ive, c...|
|    true|[great, movie, mi...|
+--------+--------------------+
only showing top 5 rows



### Lemmatization

In the lemmatization step the python library “nltk” was used. A function was created to stem the words down using the “WordNetLemmatizer” from the “nltk” library. 

In [14]:
### import nltk
import nltk
nltk.download('wordnet')

[nltk_data] Downloading package wordnet to /home/jovyan/nltk_data...
[nltk_data]   Unzipping corpora/wordnet.zip.


True

In [15]:
### Create Function 
from nltk.stem import WordNetLemmatizer 

# Instantiate stemmer object
lemmer = WordNetLemmatizer()

def lem(in_vec):
    out_vec = []
    for t in in_vec:
        t_stem = lemmer.lemmatize(t)
        if len(t_stem) > 2:
            out_vec.append(t_stem)       
    return(out_vec)

In [16]:
### Use function 
from pyspark.sql.types import *
lemmer_udf = udf(lambda x: lem(x), ArrayType(StringType()))

lem_text = swr.withColumn("lem", lemmer_udf(col("swr_text"))).select('verified', 'lem')
lem_text.show()

+--------+--------------------+
|verified|                 lem|
+--------+--------------------+
|    true|[really, happy, g...|
|    true|[lived, west, new...|
|   false|[excellent, look,...|
|    true|[anything, ive, c...|
|    true|[great, movie, mi...|
|    true|[movie, english, ...|
|    true|[fascinating, tru...|
|    true|[dvd, appears, ge...|
|    true|[movie, english, ...|
|    true|[sorry, didnt, pu...|
|    true|[product, receive...|
|    true|[believe, tell, r...|
|    true|[video, arrived, ...|
|   false|[reunion, cathedr...|
|   false|[wedding, music, ...|
|   false|[truly, moving, v...|
|   false|[excellent, exper...|
|    true|[thing, purchasin...|
|   false|[dvd, unbelievabl...|
|    true|[brought, dvd, ho...|
+--------+--------------------+
only showing top 20 rows



### Remove Short words

In the last cleaning step, we removed words that were not longer than four characters. 

In [17]:
### Removing words 
filter_length_udf = udf(lambda row: " ".join([x for x in row if len(x) >= 4]))
df2= lem_text.withColumn('Text', filter_length_udf(col('lem'))).select('Text', 'verified')
df2.show()

+--------------------+--------+
|                Text|verified|
+--------------------+--------+
|really happy evan...|    true|
|lived west guinea...|    true|
|excellent look co...|   false|
|anything challeng...|    true|
|great movie missi...|    true|
|movie english gre...|    true|
|fascinating true ...|    true|
|appears german en...|    true|
|movie english alt...|    true|
|sorry didnt purch...|    true|
|product received ...|    true|
|believe tell rece...|    true|
|video arrived per...|    true|
|reunion cathedral...|   false|
|wedding music fre...|   false|
|truly moving vide...|   false|
|excellent experie...|   false|
|thing purchasing ...|    true|
|unbelievable punk...|   false|
|brought home rock...|    true|
+--------------------+--------+
only showing top 20 rows



### Add a index to split data

An index was added to the dataset because when we used the random split function in pyspark the text column would turn into null values. To find another way to split the data we created an index. 

In [18]:
df2 = df2.withColumn('index', monotonically_increasing_id())
df2.show(10)

+--------------------+--------+-----+
|                Text|verified|index|
+--------------------+--------+-----+
|really happy evan...|    true|    0|
|lived west guinea...|    true|    1|
|excellent look co...|   false|    2|
|anything challeng...|    true|    3|
|great movie missi...|    true|    4|
|movie english gre...|    true|    5|
|fascinating true ...|    true|    6|
|appears german en...|    true|    7|
|movie english alt...|    true|    8|
|sorry didnt purch...|    true|    9|
+--------------------+--------+-----+
only showing top 10 rows



### Split Data to Train, Val, Test

We split the data on a 80% train and 20% validation split with 1 row for the test set. 

In [19]:
### 80/20 split 
print('Train:',8757545 * .80 )
print('Val:',8757545 * .20 )

Train: 7006036.0
Val: 1751509.0


In [20]:
### Train set
train = df2.filter(df2.index <= 7006036)
train.count()

7006037

In [21]:
### Validation set
val = df2.filter((df2.index > 7006036) & (df2.index < 8757544))
val.count()

1751507

In [22]:
### Test example
test = df2.filter(df2.index == 8757544)
test.count()

1

### Export Data

The last step was to export the data for the next three steps in the project. 

In [23]:
### Export validation set
val.write.option("header", "true").csv('clean_val')

In [24]:
### Export train set
train.write.option("header", "true").csv('clean_train')

In [25]:
### Export test set
test.write.option("header", "true").csv('clean_test')