# **Big Data Project**

**Yair Barel  •  Shlomi Kiko  •  Ori Valdman**

***
***
***

### **Install packages and import libraries**

In [None]:
!apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

import findspark
findspark.init("spark-3.1.2-bin-hadoop3.2")

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

from pyspark.sql import Row
from pyspark.sql import functions
from pyspark.sql.functions import *
import pyspark

In [None]:
from google.colab import drive
drive.mount('/content/drive')

### **Rating Table**

In [109]:
# import csv
df_rating = spark.read.option("delimiter", ";").option("quote","").csv('/content/drive/MyDrive/BDA/BigData/BX-Book-Ratings.csv',inferSchema = True, header = True)

# clean and rename the columns titles
df_rating=df_rating.withColumnRenamed('"User-ID','user_id')
df_rating=df_rating.withColumnRenamed('""ISBN""','isbn')
df_rating=df_rating.withColumnRenamed('""Book-Rating""",,','rating')

# clean the rows
df_rating = df_rating.withColumn('user_id', regexp_replace('user_id', '[^A-Z0-9_]', ''))
df_rating = df_rating.withColumn('isbn', regexp_replace('isbn', '[^A-Z0-9_]', ''))
df_rating = df_rating.withColumn('rating', regexp_replace('rating', '[^A-Z0-9_]', ''))

# replace empty values with null
df_rating = df_rating.withColumn('isbn', when(col('isbn') == '', None).otherwise(col('isbn')))

# drop all null from the table 
df_rating = df_rating.na.drop()

# drop rows with defective ISBN values
df_rating = df_rating.filter('(isbn rlike "^[0-9]+$") or (isbn rlike "^[0-9]+X$")')
df_rating = df_rating.filter(length(col("isbn")) == 10)

# trim trail and leading blank spaces
df_rating = df_rating.withColumn('user_id', trim(df_rating.user_id))
df_rating = df_rating.withColumn('isbn', trim(df_rating.isbn))
df_rating = df_rating.withColumn('rating', trim(df_rating.rating))

# convert data type to INT
df_rating = df_rating.withColumn("user_id", df_rating.user_id.cast('int'))
df_rating = df_rating.withColumn("rating", df_rating.rating.cast('int'))

# remove duplicate rows (found 4 duplicates)
df_rating = df_rating.distinct()

In [110]:
df_rating.show(10,truncate = False)
df_rating.count()

+-------+----------+------+
|user_id|isbn      |rating|
+-------+----------+------+
|276875 |8806155873|0     |
|277042 |0446611212|8     |
|277187 |038528859X|0     |
|277195 |006251170X|0     |
|277427 |042514755X|0     |
|277427 |0915943263|0     |
|277427 |0930014464|0     |
|277478 |034066049X|0     |
|277478 |0440234743|0     |
|277478 |0860685179|0     |
+-------+----------+------+
only showing top 10 rows



1038353

In [111]:
# export df_rating into csv
df_rating.repartition(1).write.csv("/content/drive/MyDrive/CSV/df_rating.csv", header = True)

### **Books Table**

In [112]:
# import csv
df_book = spark.read.option("delimiter", ";").csv('/content/drive/MyDrive/BDA/BigData/BX-Books.csv',inferSchema = True, header = True)

# drop irrelevant columns
df_book = df_book.drop("Image-URL-S","Image-URL-M","Image-URL-L")

# change the columns name
df_book = df_book.withColumnRenamed("ISBN","isbn") \
      .withColumnRenamed("Book-Title","book_title") \
      .withColumnRenamed("Book-Author","book_author") \
      .withColumnRenamed("Year-Of-Publication","publication_year") \
      .withColumnRenamed("Publisher","publisher")

# drop rows with defective ISBN values
df_book = df_book.filter('(isbn rlike "^[0-9]+$") or (isbn rlike "^[0-9]+X$")')
df_book = df_book.filter(length(col("isbn")) == 10)

# publication_year - replace defective values into null
df_book = df_book.withColumn('publication_year', when(col('publication_year') == '', None).otherwise(col('publication_year')))
df_book = df_book.withColumn('publication_year', when(length('publication_year') != 4, None).otherwise(col('publication_year')))
df_book = df_book.withColumn('publication_year', when(col('publication_year') > 2022, None).otherwise(col('publication_year')))

# publication_year - filter numbers only and null
df_book = df_book.na.fill({"publication_year": -1})

# drop rows that have both NULL in book_author and numbers in book_title
df_book = df_book.filter('(book_title not rlike "^[0-9]+$") and (book_author is not null)')

# filter out special character from columns
df_book = df_book.withColumn('book_title', regexp_replace('book_title', '[$&+,:;=?@#|<>^*()%!�"]', ''))
df_book = df_book.withColumn('book_author', regexp_replace('book_author', '[$&+,:;=?@#|<>^*()%!�"]', ''))
df_book = df_book.withColumn('publisher', regexp_replace('publisher', '[$&+,:;=?@#|<>^*()%!�"]', ''))

# trim trail and leading blank spaces
df_book = df_book.withColumn('isbn', trim(df_book.isbn))
df_book = df_book.withColumn('book_title', trim(df_book.book_title))
df_book = df_book.withColumn('book_author', trim(df_book.book_author))
df_book = df_book.withColumn('publication_year', trim(df_book.publication_year))
df_book = df_book.withColumn('publisher', trim(df_book.publisher))

# convert data type to INT
df_book = df_book.withColumn("publication_year", df_book.publication_year.cast('int'))

# lowercase all values
df_book.createOrReplaceTempView('df_book')
df_book = spark.table('df_book').withColumn('book_title', lower(col('book_title')))

df_book.createOrReplaceTempView('df_book')
df_book = spark.table('df_book').withColumn('book_author', lower(col('book_author')))

df_book.createOrReplaceTempView('df_book')
df_book = spark.table('df_book').withColumn('publisher', lower(col('publisher')))

# convert null-like values into NULL
Dict = {'n/a':None,'NULL':None,'':None,'None':None,' ':None,'null':None,'nan':None,'none':None}
df_book = df_book.replace(Dict,subset=['book_title','book_author','publisher'])

# convert NULL to unknown
df_book = df_book.na.fill({"book_title": "unknown", "book_author": "unknown", "publisher": "unknown"})

In [113]:
df_book.show(10,truncate = False)
df_book.count()

+----------+-------------------------------------------------------------------------------------------------+--------------------+----------------+------------------------+
|isbn      |book_title                                                                                       |book_author         |publication_year|publisher               |
+----------+-------------------------------------------------------------------------------------------------+--------------------+----------------+------------------------+
|0195153448|classical mythology                                                                              |mark p. o. morford  |2002            |oxford university press |
|0002005018|clara callan                                                                                     |richard bruce wright|2001            |harperflamingo canada   |
|0060973129|decision in normandy                                                                             |carlo d'este        

270813

In [114]:
# export df_book into csv
df_book.repartition(1).write.csv("/content/drive/MyDrive/CSV/df_book.csv", header = True)

### **Users Table**

In [115]:
# import csv
df_user = spark.read.option("delimiter", ";").csv('/content/drive/MyDrive/BDA/BigData/BX-Users.csv',inferSchema = True, header = True)

# split the Location column
df_user = df_user.withColumn('city', split(df_user['Location'], ',').getItem(0)) \
                 .withColumn('state', split(df_user['Location'], ',').getItem(1)) \
                 .withColumn('country', split(df_user['Location'], ',').getItem(2)).drop('Location')

# rename columns
df_user=df_user.withColumnRenamed('User-ID','user_id')
df_user=df_user.withColumnRenamed('Age','age')

# filter irrelevant values from Age column into NULL
df_user = df_user.withColumn('age', when(col('age') < 10, None).otherwise(col('age')))
df_user = df_user.withColumn('age', when(col('age') > 120, None).otherwise(col('age')))
df_user = df_user.withColumn('age', when(col('age') == 'NULL', None).otherwise(col('age')))

# turn NULL into -1
df_user = df_user.na.fill({"age": -1})

# filter out non-digits characters from Age column
df_user = df_user.filter(~df_user.user_id.rlike('\D+'))

# filter out special characters
df_user = df_user.withColumn('city', regexp_replace('city', '[$&+,:;=?@#|<>^*()%!�"]', ''))
df_user = df_user.withColumn('state', regexp_replace('state', '[$&+,:;=?@#|<>^*()%!�"]', ''))
df_user = df_user.withColumn('country', regexp_replace('country', '[$&+,:;=?@#|<>^*()%!�"]', ''))

# lowercase all values
df_user.createOrReplaceTempView('df_user')
df_user = spark.table('df_user').withColumn('city', lower(col('city')))

df_user.createOrReplaceTempView('df_user')
df_user = spark.table('df_user').withColumn('state', lower(col('state')))

df_user.createOrReplaceTempView('df_user')
df_user = spark.table('df_user').withColumn('country', lower(col('country')))

# trim trail and leading blank spaces
df_user = df_user.withColumn('user_id', trim(df_user.user_id))
df_user = df_user.withColumn('age', trim(df_user.age))
df_user = df_user.withColumn('city', trim(df_user.city))
df_user = df_user.withColumn('state', trim(df_user.state))
df_user = df_user.withColumn('country', trim(df_user.country))

# convert data type to INT
df_user = df_user.withColumn("age", df_user.age.cast('int'))
df_user = df_user.withColumn("user_id", df_user.user_id.cast('int'))

In [116]:
# clean the Country column
df_user.createOrReplaceTempView('df_user')
df_user_temp = spark.sql("SELECT *, CASE " +
"WHEN country IN ('us','usa','u.s','u.s.a','united sates','united stated','united states of america','virginia','washington','west virginia','texas','ohio','south carolina','new york','california','dc','colorado','oregon','pennsylvania','florida','hawaii','illinois','indiana','massachusetts','missouri','minnesota','new jersey') THEN 'united states' " +
"WHEN country IN ('uk','scotland','wales','england') THEN 'united kingdom' " +
"WHEN country IN ('victoria','western australia') THEN 'australia' " +
"WHEN country IN ('catalunya','catalonia','espaa') THEN 'spain' " +
"WHEN country IN ('british columbia','ontario') THEN 'canada' " +
"WHEN country IN ('u.a.e') THEN 'united arab emirates' " +
"WHEN country IN ('italia','l`italia') THEN 'italy' " +
"WHEN country IN ('czech republic') THEN 'czechia' " +
"WHEN country IN ('deutschland') THEN 'germany' " +
"WHEN country IN ('la france') THEN 'france' " +
"WHEN country IN ('urugua') THEN 'uruguay' " +
"WHEN country IN ('yugoslavia') THEN null " +
"ELSE country END AS country_temp FROM df_user").drop('country')

# import the Countries CSV
df_countries = df_rating = spark.read.csv('/content/drive/MyDrive/BDA/BigData/Countries.csv', inferSchema = True, header = True)

# Rename df_countries title
df_countries=df_countries.withColumnRenamed('name','countries')

# lowercase df_countries values
df_countries.createOrReplaceTempView('df_countries')
df_countries = spark.table('df_countries').withColumn('countries', lower(col('countries')))

df_countries.createOrReplaceTempView('df_countries')
df_countries = spark.table('df_countries').withColumn('region', lower(col('region')))

# join (df_countries) table with (df_user_temp) table
df_join = df_user_temp.join(df_countries,df_user_temp.country_temp ==  df_countries.countries,"inner")

# remove old country column and rename the new one
df_join = df_join.withColumnRenamed('countries','country').drop('country_temp')

# convert to unknown sting column that has numbers
df_join = df_join.withColumn('city', regexp_replace('city', '[0-9]', 'unknown'))
df_join = df_join.withColumn('state', regexp_replace('state', '[0-9]', 'unknown'))

# convert to unknown special character only
df_join.createOrReplaceTempView('df_join')
df_join = spark.sql("SELECT user_id,age,state,country,region, CASE WHEN city NOT REGEXP '^.*[a-zA-Z].*$' THEN 'unknown' ELSE city END AS city FROM df_join")

df_join.createOrReplaceTempView('df_join')
df_join = spark.sql("SELECT user_id,age,city,country,region, CASE WHEN state NOT REGEXP '^.*[a-zA-Z].*$' THEN 'unknown' ELSE state END AS state FROM df_join")

# convert empty values and null-string into NULL
Dict = {'n/a':None,'NULL':None,'':None,' ':None,'None':None,'none':None,'null':None}
df_join = df_join.replace(Dict,subset=['state','city'])

# convert NULL to unknown
df_join = df_join.na.fill({"city": "unknown", "state": "unknown"})

# reorganize the column order
df_join = df_join.select('user_id',"age","city","state","country","region")

In [117]:
# assign data frame back to df_user
df_user = df_join

In [118]:
df_user.show(10,truncate = False)
df_user.count()

+-------+---+------------+---------------+--------------+-------------+
|user_id|age|city        |state          |country       |region       |
+-------+---+------------+---------------+--------------+-------------+
|1      |-1 |nyc         |new york       |united states |north america|
|2      |18 |stockton    |california     |united states |north america|
|3      |-1 |moscow      |yukon territory|russia        |central asia |
|4      |17 |porto       |v.n.gaia       |portugal      |europe       |
|5      |-1 |farnborough |hants          |united kingdom|europe       |
|6      |61 |santa monica|california     |united states |north america|
|7      |-1 |washington  |dc             |united states |north america|
|8      |-1 |timmins     |ontario        |canada        |north america|
|9      |-1 |germantown  |tennessee      |united states |north america|
|10     |26 |albacete    |wisconsin      |spain         |europe       |
+-------+---+------------+---------------+--------------+-------

272762

In [119]:
# export df_user into csv
df_user.repartition(1).write.csv("/content/drive/MyDrive/CSV/df_user.csv", header = True)

### **End Session**

In [120]:
spark.stop()