## Data Cleaning with spark
### in this notebook the useless columns will be removed

### **PLEASE NOTE :**  
### Since this script stores the results in hadoop, execute it only once, otherwise an error will be thrown

---

### Import Libraries

In [None]:
# import libraries
import pandas as pd
import pyspark as ps
import findspark
import string

from pyspark.sql.functions import col, sum,split, regexp_replace,lit,lower
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql import SparkSession


### Initialize Spark

In [None]:
# Locate the spark installation
findspark.init()

# Initialize a SparkContext
spark_session = SparkSession.builder.appName("data_cleaning").getOrCreate()


### Connect and import data from HDFS directly into a Spark DataFrame

In [None]:
# Define schema for better manipulation

data_schema = StructType([
    StructField("Title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("authors", StringType(), True),
    StructField("image", StringType(), True),
    StructField("previewLink", StringType(), True),
    StructField("publisher", StringType(), True),
    StructField("publishedDate", StringType(), True),
    StructField("infoLink", StringType(), True),
    StructField("categories", StringType(), True),
    StructField("ratingsCount", FloatType(), True)
])

ratings_schema = StructType([
    StructField("Id", IntegerType(), True),
    StructField("Title", StringType(), True),
    StructField("Price", FloatType(), True),
    StructField("User_id", IntegerType(), True),
    StructField("profileName", StringType(), True),
    StructField("review/helpfulness", StringType(), True),
    StructField("review/score", FloatType(), True),
    StructField("review/time", IntegerType(), True),
    StructField("review/summary", StringType(), True),
    StructField("review/text", StringType(), True)
])

# Load the original data

df_data = spark_session.read.option('escape', '"').csv(
    'hdfs://localhost:9900/user/book_reviews/original_data/books_data.csv', header=True, schema=data_schema)
df_ratings = spark_session.read.option('escape', '"').csv(
    'hdfs://localhost:9900/user/book_reviews/original_data/books_rating.csv', header=True, schema=ratings_schema)

## Data Transformations
---

### - Remove useless columns

These are the columns whcih are not useful for our analysis. The original files are kept unchanged in HDFS, and the new files are stored in HDFS as well.

**Data Table:**
All the links are removed.
- image
- previewLink
- infoLink
- ratingsCount

**Rating Table:**
- id

In [None]:
# Remove image column from data
df_data = df_data.drop(df_data.image)

# Remove previewLink column from data
df_data = df_data.drop(df_data.previewLink)

# Remove infoLink column from data
df_data = df_data.drop(df_data.infoLink)

# Remove ratingsCount column from data
df_data = df_data.drop(df_data.ratingsCount)

# Show the results
df_data.show(5)

# Remove Id column from ratings data
df_ratings = df_ratings.drop(df_ratings.Id)

# Show the results
df_ratings.show(5)

### - Remove all the punctuation inside each column

This is to avoid parsing problem when the csv in read

In [None]:
# Remove illegal characters from the review
ratings_cols_to_change = ['Title', 'profileName',
                          'review/summary', 'review/text']

for column in ratings_cols_to_change:
    df_ratings = df_ratings.withColumn(
        column, regexp_replace(col(column), "\t", " "))
    df_ratings = df_ratings.withColumn(
        column, regexp_replace(col(column), "\n", " "))
    df_ratings = df_ratings.withColumn(
        column, regexp_replace(col(column), "\"", " "))
    df_ratings = df_ratings.withColumn(
        column, regexp_replace(col(column), "\\\\", " "))


# Remove illegal characters from the books data
data_cols_to_change = ['Title', 'description',
                       'authors', 'publisher', 'categories']
for column in data_cols_to_change:
    df_data = df_data.withColumn(
        column, regexp_replace(col(column), "\t", " "))
    df_data = df_data.withColumn(
        column, regexp_replace(col(column), "\n", " "))
    df_data = df_data.withColumn(
        column, regexp_replace(col(column), "\"", " "))
    df_data = df_data.withColumn(
        column, regexp_replace(col(column), "\\\\", " "))

In [None]:

# Separate the helpfulness into two columns
df_ratings = df_ratings.withColumn('N_helpful', split(col('review/helpfulness'), '/').getItem(
    0)).withColumn('Tot_votes', split(col('review/helpfulness'), '/').getItem(1))

# Remove review/helpfulness column from ratings data
df_ratings = df_ratings.drop(col('review/helpfulness'))

df_ratings.show(truncate=False)

In [None]:
# # Check if a given column contains a given character

# contains_A = df_ratings.filter(col("review/text").contains("\t")).count() > 0
# print("Does the 'name' column contain 'A'? ", contains_A)

### Store the results in hadoop

In [None]:
df_ratings.repartition(1).write.csv(
    'hdfs://localhost:9900/user/book_reviews/books_rating_cleaned', mode='overwrite', header=True, sep='\t')

df_data.repartition(1).write.csv(
    'hdfs://localhost:9900/user/book_reviews/books_data_cleaned', mode='overwrite', header=True, sep='\t')

---

## Read the new data to check soundness

In [None]:
data_df = spark_session.read.csv(
    'hdfs://localhost:9900/user/book_reviews/books_data_cleaned', header=True, inferSchema=True, sep='\t')
rating_df = spark_session.read.csv(
    'hdfs://localhost:9900/user/book_reviews/books_rating_cleaned', header=True, inferSchema=True, sep='\t')

In [None]:
data_df.show(5)
#data_df.printSchema()
print("Num values :",data_df.count())
#data_df.describe().show()

In [None]:
rating_df.show(5)
#rating_df.printSchema()
print("Num values :",rating_df.count())
#rating_df.describe().show()

rating_df.filter(rating_df['Title'].startswith('17 Contemporary Christian')).show()

## Join the Tables with Spark

In [None]:
# join the two tables on Title
joined_df = data_df.join(rating_df, on=['Title'], how='inner')

joined_df.count()

In [None]:
joined_df.show(5)

In [None]:
joined_df.write.csv('hdfs://localhost:9900/user/book_reviews/joined_tables_spark', mode='overwrite', header=True, sep='\t')

In [None]:
spark_session.stop()