In [None]:
# I checked warnings, but for the final report I prefer ignore those 
#that really does not affect the results (warnings of libraries, etc)
import warnings
warnings.simplefilter('ignore')

In [2]:
#my own functions
%load_ext autoreload
%autoreload 2

from utils.py_functions import *
from utils.cleaning_functions import *

In [3]:
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
from functools import reduce
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import matplotlib.pyplot as plt
from wordcloud import WordCloud 
import pandas as pd
import re
import string

## **Create spark session and provide master as yarn-client and provide application name.**

In [4]:
# Configuration properties of Apache Spark
#sc.stop()
from pyspark import SparkConf
from pyspark.sql import SparkSession

APP_NAME = 'pyspark_python'
MASTER = 'local[*]'

conf = SparkConf().setAppName(APP_NAME)
conf = conf.setMaster(MASTER)
spark = SparkSession.builder.config(conf = conf).getOrCreate()
sc = spark.sparkContext

## **LOAD DATA**

In [5]:
schema = StructType([
    StructField("marketplace",  StringType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("product_id",  StringType(), True),
    StructField("product_parent", IntegerType(), True),
    StructField("product_title", StringType(), True),
    StructField("product_category", StringType(), True),
    StructField("star_rating", IntegerType(), True),
    StructField("helpful_votes", IntegerType(), True),
    StructField("total_votes", IntegerType(), True),
    StructField("vine", StringType(), True),
    StructField("verified_purchase", StringType(), True),
    StructField("review_headline", StringType(), True),
    StructField("review_body", StringType(), True),
    StructField("review_date", StringType(), True)])


df_video_games = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option("delimiter","\t")\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .load('data/amazon_reviews_us_Digital_Video_Games_v1_00.tsv')

df_software = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option("delimiter","\t")\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .load('data/amazon_reviews_us_Software_v1_00.tsv')

df_digital_software = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option("delimiter","\t")\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .load('data/amazon_reviews_us_Digital_Software_v1_00.tsv')

## **MERGE DATA**

In [6]:
df = df_digital_software.union(df_software);
df = df.union(df_video_games);

## **ELIMINATE DUPLICATED DATA**

In [7]:
#duplicated data
df_X = df.dropDuplicates(subset= ['product_title', 'product_category', 'product_title'])

In [8]:
#pd.set_option('display.max_colwidth', 199) 
#df_X.select("review_body").toPandas().head(9)

Unnamed: 0,review_body
0,available for windows and mac ?
1,"It say 2015 but the IRS 941 form print out 2014, so you can get data out from program and go to IRS website to print out 2015"
2,I would like to first address the previous reviewer. This individual should not be taken seriously -- why?<br />1. He did not even buy the product<br />2. He has a competing product also for s...
3,Exelent
4,"If you're looking for software that will help you decide where to further invest your time and engery in discovering an era of Western Art, this two-volume set is perfect for that. It will give y..."
5,"Will not run on my system, which it seems like it should."
6,Delivered on time and installation was easy. Still working with the integration with Window Outlook 2010
7,Ismok
8,This version does not work with Windows 8.1. Very disappointing.


## **POLARITY: subjective or objective opinions**

* Define the threshold that separates the opinions that are subjective from those that are not. We will consider that for a threshold lower than 0.4 the opinion is not objective.
* ELiminate those duplicate products with subjective opinion.
* Those objective opinions would be used in `TOPIC MODELLING`

In [9]:
#POLARITY
df_X = df_X.rdd.map(lambda x: (x["review_id"], x["product_category"],  x["product_title"], x["review_body"], polarity_txt(x["review_body"])))
df_X=spark.createDataFrame(df_X, schema = ["review_id", "product_category", "product_title", "review_body", "review_body_pol"])

In [10]:
df_X.select("review_body", "review_body_pol").show().foreach(println)

+--------------------+-------------------+
|         review_body|    review_body_pol|
+--------------------+-------------------+
|available for win...|                0.4|
|It say 2015 but t...|                0.0|
|I would like to f...| 0.1168997668997669|
|             Exelent|                0.0|
|If you're looking...| 0.3615277777777777|
|Will not run on m...|                0.0|
|Delivered on time...|0.43333333333333335|
|               Ismok|                0.0|
|This version does...|              -0.78|
|Works great excel...|                0.9|
|Very happy with m...| 0.7208333333333334|
|download it sever...|0.05555555555555555|
|My son enjoyed pl...|            0.04625|
|This one of the b...| 0.7333333333333334|
|This game is a gr...|          -0.088125|
|hey its a grate g...|               -0.4|
|I was looking for...|0.11652056277056276|
|Great software, I...|                0.8|
|I loved it. it wa...|0.24000000000000005|
|I ordered the ver...|               -0.1|
+----------

AttributeError: 'NoneType' object has no attribute 'foreach'

In [None]:
#So, we eliminate them
print(df_X.toPandas().shape)
#df_not_dup = df.dropDuplicates(subset= ['product_title', 'product_category', 'product_title'])
#print(df.toPandas().shape)

## **COMPLETE MISSING**

In [None]:
#df = df.toPandas()
#df.review_body.fillna(df.product_title, inplace=True)
#df=spark.createDataFrame(df)

df.withColumn("product_title",coalesce(df.product_title,df.review_body)) 

## **GENERATE NEW FEATURES**

In [None]:
# Our list of functions to apply.
transform_functions = [
    lambda x: len(x),
    lambda x: x.count(" "),
    lambda x: x.count("."),
    lambda x: x.count("!"),
    lambda x: x.count("?"),
    lambda x: len(x) / (x.count(" ") + 1),
    lambda x: x.count(" ") / (x.count(".") + 1),
    lambda x: len(re.findall("CD|DVD", x)), # CD 
    lambda x: len(re.findall(r"\d+st|\d+th|\d+sd", x)), # th--> 4th, 5th or 1st or 2sd
    lambda x: len(re.findall("[A-Z]", x)), # number of uppercase letters
    lambda x: len(re.findall("[0-9]", x)), #numbers
    lambda x: len(re.findall("\d{4}", x)),
    lambda x: len(re.findall("\d$", x)), #end with number
    lambda x: len(re.findall("^\d", x)), #start with number
    lambda x: len(re.findall("[\w]+-[\w]+",x)), #words separated with -
    lambda x: len(re.findall("OLD VERSION|Old Version|old version",x)), #old version
]

transform_functions_len = [
    lambda x: len(x)
]

In [None]:
df_num_2 = df.toPandas()
df_num = df_num_2[['product_title']]
df_num_2 = df_num_2[['review_id']]
for func in transform_functions:
     df_num_2 = pd.concat([df_num_2, df_num['product_title'].apply(func)], axis=1)

In [None]:
df_num_2.columns = ['review_id', 'title_len', 'title_words', 'title_points',
                  'title_exc', 'title_int', 'ratio_spaces_point', 'ratio_len_points', 
                    'title_cd','title_th', 'title_upper_letters', 'title_numbers',
                    'title_years', 'end_number', 'starts_number', 'word_sep', 
                  'title_old_version']

## **CLEAN FEATURE: review_body**

## **is it an informative feature or nor?**

In [None]:
df_X = df.rdd.map(lambda x: (x["review_id"], x["product_category"],  x["product_title"], x["review_body"], polarity_txt(x["review_body"])))
df_X=spark.createDataFrame(df_X, schema = ["review_id", "product_category", "product_title", "review_body", "review_body_pol"])

In [None]:
df_X.show(5)

In [None]:
from pyspark.sql import functions as F
df_product_title = df_X.groupBy(["product_title"]).agg(F.count('product_title'))

In [None]:
df_product_title.show(5)

In [None]:
df2 = df_X.join(df_product_title, df_X.product_title == df_product_title.product_title, 'left')

In [None]:
df2.show(5)

In [None]:
df_X.toPandas().shape

In [None]:
def product_title_cleaning(df):
    #eliminate contractions I'm -> I am
    df_X = df.rdd.map(lambda x: (x["review_id"], x["product_category"],  x["product_title"], fix_abbreviation(x["review_body"])))
    df_X=spark.createDataFrame(df_X, schema = ["review_id", "product_category", "product_title", "review_body"])
    #consider only noums in the text
    df_X = df_X.rdd.map(lambda x: (x["review_id"], x["product_category"],  x["product_title"], tag_and_remove(x["review_body"])))
    df_X=spark.createDataFrame(df_X, schema = ["review_id", "product_category", "product_title", "review_body"])
    #lemmatization
    df_X = df_X.rdd.map(lambda x: (x["review_id"], x["product_category"],  x["product_title"], lemitizeWords(x["review_body"])))
    df_X=spark.createDataFrame(df_X, schema = ["review_id", "product_category", "product_title", "review_body"])

    #clean text
    df_X = df_X.rdd.map(lambda x: (x["review_id"], x["product_category"],  x["product_title"], clean_text(x["review_body"])))
    df_X=spark.createDataFrame(df_X, schema = ["review_id", "product_category", "product_title", "review_body"])
    #spelling correction
    df_X = df_X.rdd.map(lambda x: (x["review_id"], x["product_category"],  x["product_title"], spell_correction(x["review_body"])))
    df_X=spark.createDataFrame(df_X, schema = ["review_id", "product_category", "product_title", "review_body"])
    return df_X