# Data cleaner
Disclaymer: To run this notebook, launch pyspark (command "pyspark --master local[*number of cores*]") from the folder containing the notebook.

In [None]:
from pyspark.sql import SparkSession
from utility_functions import *
from flashtext import KeywordProcessor

In [None]:
# Get directory
directory = os.path.dirname(os.getcwd()).replace("\\", "/")

# Define paths
path = directory + "/data/datasets/dataset.json"
path_cleaned = directory + "/data/datasets/dataset-cleaned.json"
path_cleaned_directory = directory + "/data/datasets/dataset-cleaned"
path_cleaned_unknown = directory + "/data/datasets/dataset-cleaned-no-unknown.json"
path_cleaned_unknown_directory = directory + "/data/datasets/dataset-cleaned-no-unknown"

In [None]:
spark = SparkSession.builder \
    .config("spark.executor.memory", "32g") \
    .config("spark.driver.memory", "32g") \
    .config("spark.network.timeout", "1200s") \
    .config("spark.executor.memoryOverhead", "12g")\
    .config("spark.executor.heartbeatInterval", "1200s")\
    .config("spark.executor.extraJavaOptions", "-Xmx32g -Xms12g") \
    .getOrCreate()

In [None]:
# Load dataset
dataset = spark.read.json(path)

# Split rdd into multiple rdds
split_rdds = dataset.randomSplit([0.1 for _ in range(0,10)])

In [None]:
# Create rdd of cleaned text
cleaned_rdds = []

for df in split_rdds:
    cleaned_rdds.append(
        df.rdd.map(lambda obj: {"overall": obj["overall"], "reviewText": obj["reviewText"].lower()})
            .filter(lambda obj: "old review" not in obj["reviewText"]) # Remove objects containing "old review"
            .map(lambda obj: {"overall": obj["overall"], "reviewText": cleaning_function_no_unknown(obj["reviewText"])}) # Clean
    )

In [None]:
# Save cleaned dataset with unknown words
for i in range(0, len(cleaned_rdds)):
    save_rdd_to_json_file(path_cleaned_directory + "/cleaned" + f"{i}", cleaned_rdds[i])

# Create dataset from files
merge_files(path_cleaned_directory, path_cleaned)

In [None]:
# Load cleaned dataset
dataset_cleaned = spark.read.json(path_cleaned)

# Split rdd into multiple rdds
split_rdds_cleaned = dataset_cleaned.randomSplit([0.1 for _ in range(0,10)])

In [None]:
# Compute list of words with number of occurrences
words_occurrences_rdds = []

for df in split_rdds_cleaned:
    words_occurrences_rdds.append(
        df.rdd.flatMap(lambda obj: [(word, 1) for word in tokenize_with_sequences(remove_symbols_before_tokenization(obj["reviewText"]))]) # Tokenize
            .reduceByKey(lambda x, y: x + y) # Add occurrences
    )

In [None]:
# Merge rdds, then reduce by key to obtain final vocabulary with number of occurreces
merged_rdd_occurrences = words_occurrences_rdds[0].union(words_occurrences_rdds[1])

for i in range(2, len(words_occurrences_rdds)):
    merged_rdd_occurrences = merged_rdd_occurrences.union(words_occurrences_rdds[i])

words_with_occurrences = merged_rdd_occurrences.reduceByKey(lambda x, y: x + y).collect()

In [None]:
# Process list of words with occurrences
count_unknown = 0
count_frequent = 0
frequent_words = []
unknown_words = []
for word in sorted(words_with_occurrences, key=lambda x: x[1]):
    if word[1] <= 10:
        count_unknown += word[1]
        unknown_words.append(word[0])
    else:
        count_frequent += word[1]
        frequent_words.append(word)

# Print percentage of retained words
print(f"The percentage of retained words is {(count_frequent * 100)/(count_frequent + count_unknown)}")

# Append unknown
frequent_words.append(('[UNKNOWN]', count_unknown))

# Create keyword processor for later use
kp = KeywordProcessor()
for word in unknown_words:
    kp.add_keyword(word, '[UNKNOWN]')

# Save dataframe with occurrences of known words for later use
path_all_occurrences = "data/sentiment-knowledge/all-words-with-occurrences.csv"
save_list_to_csv(sorted(frequent_words, key=lambda x: x[1]), path_all_occurrences, ["word", "occurrences"])

In [None]:
# Create rdd of cleaned text without unknown words
cleaned_no_unknown_rdds = []

for df in split_rdds_cleaned:
    cleaned_no_unknown_rdds.append(
        df.rdd.map(lambda obj: {"overall": obj["overall"], "reviewText": kp.replace_keywords(obj["reviewText"])}) # Remove unknown
    )

In [None]:
# Save cleaned dataset with unknown words
for i in range(0, len(cleaned_no_unknown_rdds)):
    save_rdd_to_json_file(path_cleaned_unknown_directory + "/cleaned" + f"{i}", cleaned_no_unknown_rdds[i])

# Create dataset from files
merge_files(path_cleaned_unknown_directory, path_cleaned_unknown)