In [None]:
# %pip install -U spacy
# !python -m spacy download en_core_web_sm
# !python -m spacy download fr_core_news_sm
# !python -m spacy download de_core_news_sm

In [None]:
from pyspark.sql import SparkSession

# Create spark session on local machine
spark = SparkSession.builder \
    .master("local") \
    .config("spark.driver.bindAddress","127.0.0.1") \
    .config("spark.executor.memory", "2g")\
    .getOrCreate()

In [None]:
from ReviewClass import Review

spark.sparkContext.addPyFile('ReviewClass.py')
spark.sparkContext.addPyFile('data_cleaning.py')

In [None]:
import pandas as pd

# Load data from csv files into dataframe
df1 = pd.read_csv('reviews/train-1.csv')
df2 = pd.read_csv('reviews/train-2.csv')
df3 = pd.read_csv('reviews/train-3.csv')
df4 = pd.read_csv('reviews/train-4.csv')
df5 = pd.read_csv('reviews/train-5.csv')
df6 = pd.read_csv('reviews/train-6.csv')
df7 = pd.read_csv('reviews/train-7.csv')
df8 = pd.read_csv('reviews/train-8.csv')

df_train = pd.concat([df1, df2, df3, df4, df5, df6, df7, df8])

# Change name of first column
df_train.rename(columns={ df_train.columns[0]: "review_id" }, inplace = True)

In [None]:
# Add data from dataframe into spark session
reviews = [Review(**kwargs) for kwargs in df_train.to_dict(orient='records')]

review_rdd = spark.sparkContext.parallelize(reviews, 100)

In [None]:
# PoC of total character count per category
# total_character_count_per_product_category_id = review_rdd\
#     .flatMap(lambda review: [(review.product_category_id, review.ReviewBodyCharCount())])\
#     .reduceByKey(lambda count1, count2: count1 + count2)\
#     .sortByKey()\
#     .collect()

# print(total_character_count_per_product_category_id)

In [None]:
# PoC of total word count per category
# total_word_count_per_product_category_id = review_rdd\
#     .flatMap(lambda review: [(review.product_category_id, review.ReviewBodyWordCount())])\
#     .reduceByKey(lambda count1, count2: count1 + count2)\
#     .sortByKey()\
#     .collect()

# print(total_word_count_per_product_category_id)

In [None]:
# PoC of tagged review bodies
# list_of_tagged_reviews = review_rdd\
#     .map(lambda review: (review.review_id, review.TaggedReviewBody()))\
#     .collect()

In [None]:
import re

def RemovePunctuation(review):

    new_body = str(review.review_body).replace('.', ' ')
    new_body = str(new_body).replace(',', ' ')
    new_body = re.sub(r'[^\w\s]', '', new_body)

    review.review_body = str(new_body)

    return review

In [None]:
import html

def RemoveASCII(review):

    review.review_headline = html.unescape(str(review.review_headline))
    review.review_body = html.escape(str(review.review_body))

    return review

In [None]:
def RemoveBreaklines(review):

    review.review_headline = str(review.review_headline).replace('<br />', '')
    review.review_body = str(review.review_body).replace('<br />', '')

    return review

In [None]:
import re
import numpy as np

def replace_acute_accents(text, accent_map):
    for [accent, char] in accent_map:
        text = re.sub(accent, char, text)
    return text

def RemoveAccentsFromBody(review):
    
    acute_map = np.array([['á', 'a'], ['Á', 'A'], ['é', 'e'], 
                          ['É', 'E'], ['ớ', 'o'], ['ó', 'o'], 
                          ['Ó', 'O'], ['ú', 'u'], ['Ú', 'U']])

    review.review_body = replace_acute_accents(review.review_body, acute_map)

    return review

def RemoveAccentsFromHeadline(review):
    
    acute_map = np.array([['á', 'a'], ['Á', 'A'], ['é', 'e'], 
                          ['É', 'E'], ['ớ', 'o'], ['ó', 'o'], 
                          ['Ó', 'O'], ['ú', 'u'], ['Ú', 'U']])

    review.review_headline = replace_acute_accents(review.review_headline, acute_map)

    return review

In [None]:
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize

def RemoveStopwords(review):

    if review.ReviewBodyWordCount() < 5:
        return review

    if review.marketplace_id == 2:
        stop_words = stopwords.words('french')
    elif review.marketplace_id == 3:
        stop_words = stopwords.words('german')
    else:
        stop_words = stopwords.words('english')

    new_body = [word for word in word_tokenize(str(review.review_body)) if not word in stop_words]

    a_str = " "

    review.review_body = a_str.join(new_body)

    return review

In [None]:
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize

def Stemming(review):
    
    new_body = [WordNetLemmatizer().lemmatize(word) for word in word_tokenize(str(review.review_body))]

    a_str = " "

    review.review_body = a_str.join(new_body)

    return review


In [None]:
def Lower(review):

    review.review_body = str(review.review_body).lower()

    return review

In [None]:
from sklearn.feature_extraction.text import CountVectorizer
import pandas as pd
import numpy as np

def BagOfNTopWords(review, n):

    if review.ReviewBodyWordCount() < 5:
        return review

    countVec = CountVectorizer(ngram_range=(1,1))

    result = countVec.fit_transform([review.review_body]).toarray()[0]
    features = np.array(countVec.get_feature_names())

    bag = []

    for r, f in np.c_[result, features]:
        bag.append((r, f))

    bag.sort(key = lambda b: b[0], reverse = True)

    top_n = bag[0:n]

    words = [t[1] for t in top_n]

    a_str = " "

    review.bag_of_words_top_10 = a_str.join(words)

    return review


In [None]:
def ScaledPos(review):

    review.TaggedReviewBody()

    return review

In [None]:
def NumOfChar(review):
    
    review.num_of_char = review.ReviewBodyCharCount()

    return review

In [None]:
def NumOfWords(review):

    review.num_of_words = review.ReviewBodyWordCount()

    return review

In [None]:
from data_cleaning import sent_tokenize

def sent_count(text):
    return len(sent_tokenize(text))

def AvgSentLength(review):
    sents = sent_tokenize(review.review_body)
    words = word_tokenize(review.review_body)
    
    review.avg_sent_length = len(words) / len(sents)

    return review

In [None]:
cleaned_reviews = review_rdd\
    .filter(lambda review: review.review_date == review.review_date)\
    .map(lambda review: AvgSentLength(review))\
    .map(lambda review: RemovePunctuation(review))\
    .map(lambda review: RemoveASCII(review))\
    .map(lambda review: RemoveBreaklines(review))\
    .map(lambda review: RemoveAccentsFromHeadline(review))\
    .map(lambda review: RemoveAccentsFromBody(review))\
    .map(lambda review: RemoveStopwords(review))\
    .map(lambda review: Stemming(review))\
    .map(lambda review: Lower(review))\
    .map(lambda review: BagOfNTopWords(review, 10))\
    .map(lambda review: ScaledPos(review))\
    .map(lambda review: NumOfChar(review))\
    .map(lambda review: NumOfWords(review))\
    .collect()

In [None]:
import pandas as pd

df = pd.DataFrame([o.__dict__ for o in cleaned_reviews])

df.to_csv("cleaned_reviews.csv", index = False)