In [0]:
# install java libs and spark.
! apt-get install openjdk-8-jdk-headless -qq > /dev/null
! wget -q https://www-us.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
! tar xf spark-2.4.4-bin-hadoop2.7.tgz
! pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

### Mount Data

In [0]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


### Install pyspark

In [0]:
pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 61kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 47.2MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.5-py2.py3-none-any.whl size=218257927 sha256=a2d10033347b350c913f0cbe12e916dc8f60fd7c1d4a3ab1f5751fa9f129c0f7
  Stored in directory: /root/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.5


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd

### Start Spark Session

In [0]:
APP_NAME = "EDA1"
SPARK_URL = "local[*]"

In [0]:
### Start spark session 
spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()

### Load data

In [0]:
df = spark.read.json('/content/drive/My Drive/Movies_and_TV.json.gz').select('overall', 'reviewText', 'summary')

In [0]:
### View Data
df.show(5)

+-------+--------------------+--------------------+
|overall|          reviewText|             summary|
+-------+--------------------+--------------------+
|    5.0|really happy they...|               great|
|    5.0|Having lived in W...|Realistic and Acc...|
|    5.0|Excellent look in...|         Peace Child|
|    5.0|More than anythin...|Culturally releva...|
|    4.0|This is a great m...|Good Movie! Great...|
+-------+--------------------+--------------------+
only showing top 5 rows



### Remove null values

In [0]:
### droping na values 
df = df.na.drop()
df.count()

8755633

### Remove punctuations & special characters & lowercase words

In [0]:
### Clean Function 
def clean_text(c):
  c = lower(c)
  c = regexp_replace(c, "^rt ", "")
  c = regexp_replace(c, "[\=.]"," ")
  c = regexp_replace(c, "[^a-zA-Z0-9\\s]", "")
  c = regexp_replace(c, "  ", " ")
  c = regexp_replace(c, "   ", " ")
  c = regexp_replace(c, '\d+', "")
  return(c)

In [0]:
### View Clean 
df = df.withColumn("clean_text",clean_text(col('reviewText')))
df.show()

+-------+--------------------+--------------------+--------------------+
|overall|          reviewText|             summary|          clean_text|
+-------+--------------------+--------------------+--------------------+
|    5.0|really happy they...|               great|really happy they...|
|    5.0|Having lived in W...|Realistic and Acc...|having lived in w...|
|    5.0|Excellent look in...|         Peace Child|excellent look in...|
|    5.0|More than anythin...|Culturally releva...|more than anythin...|
|    4.0|This is a great m...|Good Movie! Great...|this is a great m...|
|    5.0|This movie was in...|           Great....|this movie was in...|
|    5.0|This is a fascina...|A remarkable true...|this is a fascina...|
|    1.0|This DVD appears ...|     Peace Child DVD|this dvd appears ...|
|    1.0|This movie is not...|      Not in English|this movie is not...|
|    5.0|So sorry I didn't...|            Amazing!|so sorry i didnt ...|
|    5.0|Product received ...|A Reunion by Cath...|

### Tokenize

In [0]:
from pyspark.ml.feature import Tokenizer

In [0]:
tokenizer = Tokenizer(inputCol="clean_text", outputCol="token_text")
token = tokenizer.transform(df).select('overall', 'token_text')
token.show()

+-------+--------------------+
|overall|          token_text|
+-------+--------------------+
|    5.0|[really, happy, t...|
|    5.0|[having, lived, i...|
|    5.0|[excellent, look,...|
|    5.0|[more, than, anyt...|
|    4.0|[this, is, a, gre...|
|    5.0|[this, movie, was...|
|    5.0|[this, is, a, fas...|
|    1.0|[this, dvd, appea...|
|    1.0|[this, movie, is,...|
|    5.0|[so, sorry, i, di...|
|    5.0|[product, receive...|
|    5.0|[believe, me, whe...|
|    5.0|[this, video, arr...|
|    5.0|[the, reunion, of...|
|    5.0|[wedding, music, ...|
|    5.0|[this, is, truly,...|
|    4.0|[it, is, an, exce...|
|    5.0|[i, have, a, thin...|
|    5.0|[this, dvd, is, u...|
|    5.0|[just, brought, t...|
+-------+--------------------+
only showing top 20 rows



### Remove stopwords

In [0]:
from pyspark.ml.feature import StopWordsRemover


In [0]:
### remove stops words 
remover = StopWordsRemover(inputCol='token_text', outputCol='swr_text')
swr = remover.transform(token).select('overall','swr_text')
swr.show(5)

+-------+--------------------+
|overall|            swr_text|
+-------+--------------------+
|    5.0|[really, happy, g...|
|    5.0|[lived, west, new...|
|    5.0|[excellent, look,...|
|    5.0|[anything, ive, c...|
|    4.0|[great, movie, mi...|
+-------+--------------------+
only showing top 5 rows



### Lemmatization

In [0]:
import nltk
nltk.download('wordnet')

[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Unzipping corpora/wordnet.zip.


True

In [0]:
from nltk.stem import WordNetLemmatizer 

# Instantiate stemmer object
stemmer = WordNetLemmatizer()

def stem(in_vec):
    out_vec = []
    for t in in_vec:
        t_stem = stemmer.lemmatize(t)
        if len(t_stem) > 2:
            out_vec.append(t_stem)       
    return(out_vec)

In [0]:
from pyspark.sql.types import *
stemmer_udf = udf(lambda x: stem(x), ArrayType(StringType()))

# Create new df with vectors containing the stemmed tokens 
lem_text = swr.withColumn("lem_text", stemmer_udf(col("swr_text"))).select('overall', 'lem_text')
lem_text.show()

+-------+--------------------+
|overall|            lem_text|
+-------+--------------------+
|    5.0|[really, happy, g...|
|    5.0|[lived, west, new...|
|    5.0|[excellent, look,...|
|    5.0|[anything, ive, c...|
|    4.0|[great, movie, mi...|
|    5.0|[movie, english, ...|
|    5.0|[fascinating, tru...|
|    1.0|[dvd, appears, ge...|
|    1.0|[movie, english, ...|
|    5.0|[sorry, didnt, pu...|
|    5.0|[product, receive...|
|    5.0|[believe, tell, r...|
|    5.0|[video, arrived, ...|
|    5.0|[reunion, cathedr...|
|    5.0|[wedding, music, ...|
|    5.0|[truly, moving, v...|
|    4.0|[excellent, exper...|
|    5.0|[thing, purchasin...|
|    5.0|[dvd, unbelievabl...|
|    5.0|[brought, dvd, ho...|
+-------+--------------------+
only showing top 20 rows



### Removing short words

In [0]:
### Removing words 
filter_length_udf = udf(lambda row: " ".join([x for x in row if len(x) >= 4]))
df2= lem_text.withColumn('words', filter_length_udf(col('lem_text'))).select('overall','words')
df2.show()

+-------+--------------------+
|overall|               words|
+-------+--------------------+
|    5.0|really happy evan...|
|    5.0|lived west guinea...|
|    5.0|excellent look co...|
|    5.0|anything challeng...|
|    4.0|great movie missi...|
|    5.0|movie english gre...|
|    5.0|fascinating true ...|
|    1.0|appears german en...|
|    1.0|movie english alt...|
|    5.0|sorry didnt purch...|
|    5.0|product received ...|
|    5.0|believe tell rece...|
|    5.0|video arrived per...|
|    5.0|reunion cathedral...|
|    5.0|wedding music fre...|
|    5.0|truly moving vide...|
|    4.0|excellent experie...|
|    5.0|thing purchasing ...|
|    5.0|unbelievable punk...|
|    5.0|brought home rock...|
+-------+--------------------+
only showing top 20 rows



### Export Data

In [0]:
df2.write.option("header", "true").csv('/content/drive/My Drive/clean_text.csv')