In [None]:
# Machine preparation

!apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null   # !apt-get --> install java
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz  # !wget  --> download file from url
!tar xf spark-3.1.2-bin-hadoop3.2.tgz  # !tar --> like unzip 
!pip install -q findspark  # !pip  --> instal a package, we cant import a library without installing it first, most libraries that we used were already installed


[33m0% [Working][0m            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
[33m0% [Connecting to archive.ubuntu.com (91.189.88.152)] [Waiting for headers] [Wa[0m                                                                               Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
[33m0% [Connecting to archive.ubuntu.com (91.189.88.152)] [Waiting for headers] [Wa[0m                                                                               Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
[33m0% [Waiting for headers] [3 InRelease 14.2 kB/88.7 kB 16%] [Waiting for headers[0m                                                                               Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64

In [None]:
# Set enviroments
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

In [None]:
# Code for connecting our google drive to this collab notebook
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!ls /content/drive/MyDrive/BDA/Big_Data_Project/Data

BX-Book-Ratings.csv  BX-Books.csv  BX-Users.csv


In [None]:
import findspark
findspark.init("spark-3.1.2-bin-hadoop3.2") # SPARK_HOME

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

from pyspark.sql import Row
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DecimalType, StringType

import shutil

# **Read Data**

In [None]:
data_path = "/content/drive/MyDrive/BDA/Big_Data_Project/Data"
file_bx_users = data_path + "/" + "BX-Users.csv"
file_bx_books = data_path + "/" + "BX-Books.csv"
file_bx_rating = data_path + "/" + "BX-Book-Ratings.csv"

##Users data

###Reading

In [None]:
df_bx_users = spark.read.csv(file_bx_users, sep=";", header=True, inferSchema=True, encoding="ISO-8859-1")

###Cleaning

In [None]:
df_bx_users.printSchema()
df_bx_users.count()

root
 |-- User-ID: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Age: string (nullable = true)



278859

In [None]:
# Rename Column
df_bx_users = df_bx_users.withColumnRenamed('User-ID', 'UserID')
df_bx_users.printSchema()

root
 |-- UserID: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Age: string (nullable = true)



####Clean Age

In [None]:
df_bx_users.registerTempTable("bx_users")
spark.sql('select * from bx_users where Age is null').show()

+---------------+----------+----+
|         UserID|  Location| Age|
+---------------+----------+----+
|         275081|cernusco s|null|
|, milan, italy"|      NULL|null|
+---------------+----------+----+



In [None]:
df_bx_users = spark.sql('select * from bx_users where Age is not null')

In [None]:
df_bx_users.count()

278857

In [None]:
df_bx_users.registerTempTable("bx_users")
spark.sql('select * from bx_users where Age is null').show()
df_bx_users.show()

+------+--------+---+
|UserID|Location|Age|
+------+--------+---+
+------+--------+---+

+------+--------------------+----+
|UserID|            Location| Age|
+------+--------------------+----+
|     1|  nyc, new york, usa|NULL|
|     2|stockton, califor...|  18|
|     3|moscow, yukon ter...|NULL|
|     4|porto, v.n.gaia, ...|  17|
|     5|farnborough, hant...|NULL|
|     6|santa monica, cal...|  61|
|     7| washington, dc, usa|NULL|
|     8|timmins, ontario,...|NULL|
|     9|germantown, tenne...|NULL|
|    10|albacete, wiscons...|  26|
|    11|melbourne, victor...|  14|
|    12|fort bragg, calif...|NULL|
|    13|barcelona, barcel...|  26|
|    14|mediapolis, iowa,...|NULL|
|    15|calgary, alberta,...|NULL|
|    16|albuquerque, new ...|NULL|
|    17|chesapeake, virgi...|NULL|
|    18|rio de janeiro, r...|  25|
|    19|           weston, ,|  14|
|    20|langhorne, pennsy...|  19|
+------+--------------------+----+
only showing top 20 rows



In [None]:
spark.sql('select * from bx_users where Age like "%NULL%"').count()

110761

In [None]:
df_bx_users = df_bx_users.withColumn("Age", F.when(df_bx_users.Age == "NULL", F.lit(0)).otherwise(df_bx_users.Age))
df_bx_users = df_bx_users.withColumn("Age", df_bx_users["Age"].cast(IntegerType()))
df_bx_users.printSchema()

root
 |-- UserID: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Age: integer (nullable = true)



In [None]:
df_bx_users.show()


+------+--------------------+---+
|UserID|            Location|Age|
+------+--------------------+---+
|     1|  nyc, new york, usa|  0|
|     2|stockton, califor...| 18|
|     3|moscow, yukon ter...|  0|
|     4|porto, v.n.gaia, ...| 17|
|     5|farnborough, hant...|  0|
|     6|santa monica, cal...| 61|
|     7| washington, dc, usa|  0|
|     8|timmins, ontario,...|  0|
|     9|germantown, tenne...|  0|
|    10|albacete, wiscons...| 26|
|    11|melbourne, victor...| 14|
|    12|fort bragg, calif...|  0|
|    13|barcelona, barcel...| 26|
|    14|mediapolis, iowa,...|  0|
|    15|calgary, alberta,...|  0|
|    16|albuquerque, new ...|  0|
|    17|chesapeake, virgi...|  0|
|    18|rio de janeiro, r...| 25|
|    19|           weston, ,| 14|
|    20|langhorne, pennsy...| 19|
+------+--------------------+---+
only showing top 20 rows



In [None]:
df_bx_users.filter("Age>100").show()
df_bx_users.filter("Age>100").count()

+------+--------------------+---+
|UserID|            Location|Age|
+------+--------------------+---+
|  1289|san jose, califor...|103|
|  1323|milano, lombardia...|104|
|  1579|akure, ondo/niger...|231|
|  3085|zürich, switzerla...|104|
|  3211|le mesnil saint d...|119|
|  3437|honolulu, hawaii,...|103|
|  3689|vigo, galicia, spain|104|
|  4255|genève, genève, s...|103|
|  4783|keilor park, vict...|104|
|  6524|larnaca, n/a, cyprus|104|
|  6712|78126 königsfeld,...|104|
|  8068|springfield, illi...|104|
|  8330|newcastle, thenor...|114|
|  8458|milano, lombardia...|230|
|  8655|belfast, northern...|104|
|  8782|calgary, alberta,...|239|
| 11055|washington, tyne ...|104|
| 11087|burgdorf, nieders...|104|
| 11326|johannesburg, gau...|148|
| 12692|jacksonville, flo...|151|
+------+--------------------+---+
only showing top 20 rows



366

In [None]:
df_bx_users = df_bx_users.withColumn("Age", F.when(df_bx_users.Age > 100, df_bx_users.Age/10).otherwise(df_bx_users.Age))
df_bx_users
df_bx_users.filter("Age>100").show()

+------+--------+---+
|UserID|Location|Age|
+------+--------+---+
+------+--------+---+



In [None]:
age_avg = int(df_bx_users.agg({"Age":"avg"}).collect()[0][0])
print("Average age is " + str(age_avg))
df_bx_users = df_bx_users.withColumn("Age", F.when(df_bx_users.Age == 0, age_avg).otherwise(df_bx_users.Age))
df_bx_users.show(3)

Average age is 20
+------+--------------------+----+
|UserID|            Location| Age|
+------+--------------------+----+
|     1|  nyc, new york, usa|20.0|
|     2|stockton, califor...|18.0|
|     3|moscow, yukon ter...|20.0|
+------+--------------------+----+
only showing top 3 rows



In [None]:
df_bx_users = df_bx_users.withColumn('Age', df_bx_users.Age.cast(DecimalType()))
df_bx_users.filter("UserID==8782").show()
df_bx_users.printSchema()

+------+--------------------+---+
|UserID|            Location|Age|
+------+--------------------+---+
|  8782|calgary, alberta,...| 24|
+------+--------------------+---+

root
 |-- UserID: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Age: decimal(10,0) (nullable = true)



In [None]:
df_bx_users.filter("Age>100").show()
df_bx_users.filter("Age==0").show()

+------+--------+---+
|UserID|Location|Age|
+------+--------+---+
+------+--------+---+

+------+--------+---+
|UserID|Location|Age|
+------+--------+---+
+------+--------+---+



#### Clean location

In [None]:
df_bx_users.select('Location').show(3)

+--------------------+
|            Location|
+--------------------+
|  nyc, new york, usa|
|stockton, califor...|
|moscow, yukon ter...|
+--------------------+
only showing top 3 rows



In [None]:
split_location = F.split(df_bx_users.Location, ',')
df_bx_users = df_bx_users.withColumn('UserCity', split_location.getItem(0))
df_bx_users = df_bx_users.withColumn('UserCountry', split_location.getItem(2))
df_bx_users = df_bx_users.drop('Location')

In [None]:
df_bx_users.printSchema()

root
 |-- UserID: string (nullable = true)
 |-- Age: decimal(10,0) (nullable = true)
 |-- UserCity: string (nullable = true)
 |-- UserCountry: string (nullable = true)



#### Clean UserCity and UserCountry

In [None]:
df_bx_users = df_bx_users.withColumn('UserCity', F.regexp_replace('UserCity', '\d+', ''))
df_bx_users = df_bx_users.withColumn('UserCity', F.regexp_replace('UserCity', '&', ''))
df_bx_users = df_bx_users.withColumn('UserCity', F.regexp_replace('UserCity', '#', ''))
df_bx_users = df_bx_users.withColumn('UserCity', F.regexp_replace('UserCity', ';', ''))
df_bx_users = df_bx_users.withColumn('UserCity', F.regexp_replace('UserCity', ',', ''))
df_bx_users = df_bx_users.withColumn('UserCity', F.regexp_replace('UserCity', '(currently living in england)', ''))
df_bx_users = df_bx_users.withColumn('UserCity', 
    F.when((df_bx_users.UserCity == 'nyc') | (df_bx_users.UserCity == 'ny'),
            'new york').otherwise(df_bx_users.UserCity))
df_bx_users = df_bx_users.withColumn('UserCountry', F.regexp_replace('UserCountry', '&', ''))
df_bx_users = df_bx_users.withColumn('UserCountry', F.regexp_replace('UserCountry', '#', ''))
df_bx_users = df_bx_users.withColumn('UserCountry', F.regexp_replace('UserCountry', ';', ''))
df_bx_users = df_bx_users.withColumn('UserCountry', F.regexp_replace('UserCountry', ',', ''))

In [None]:
df_bx_users.filter(df_bx_users.UserCity == 'n/a').show()
df_bx_users.filter(df_bx_users.UserCountry == 'n/a').show()

+------+---+--------+------------+
|UserID|Age|UserCity| UserCountry|
+------+---+--------+------------+
|    65| 20|     n/a|   australia|
|  1373| 20|     n/a|     lesotho|
|  2065| 16|     n/a|   singapore|
|  2197| 26|     n/a|      canada|
|  2760| 23|     n/a| switzerland|
|  3096| 20|     n/a|         n/a|
|  3569| 20|     n/a|         usa|
|  4221| 20|     n/a|         usa|
|  4510| 20|     n/a|   hong kong|
|  5348| 37|     n/a|     ireland|
|  6140| 26|     n/a|      canada|
|  7115| 24|     n/a|       spain|
|  8752| 45|     n/a|      monaco|
|  8875| 20|     n/a| netherlands|
|  9596| 20|     n/a|   singapore|
| 10647| 20|     n/a|         n/a|
| 11210| 19|     n/a|   singapore|
| 11605| 24|     n/a|       italy|
| 11676| 20|     n/a|         n/a|
| 11769| 33|     n/a|            |
+------+---+--------+------------+
only showing top 20 rows

+------+---+--------+-----------+
|UserID|Age|UserCity|UserCountry|
+------+---+--------+-----------+
+------+---+--------+-----------

In [None]:
df_bx_users = df_bx_users.withColumn("UserCity", 
  F.when((df_bx_users.UserCity == 'n/a') | 
         (df_bx_users.UserCity == ''), 'other').otherwise(df_bx_users.UserCity))

df_bx_users = df_bx_users.withColumn("UserCountry", 
  F.when((df_bx_users.UserCountry == 'n/a') |
         (df_bx_users.UserCountry == ''), 'other').otherwise(df_bx_users.UserCountry))

In [None]:
df_bx_users.filter(df_bx_users.UserCity == 'n/a').show()
df_bx_users.filter(df_bx_users.UserCountry == 'n/a').show()

+------+---+--------+-----------+
|UserID|Age|UserCity|UserCountry|
+------+---+--------+-----------+
+------+---+--------+-----------+

+------+---+--------+-----------+
|UserID|Age|UserCity|UserCountry|
+------+---+--------+-----------+
+------+---+--------+-----------+



##Books Data

### Reading

In [None]:
df_bx_books = spark.read.csv(file_bx_books, sep=";", header=True, inferSchema=True, encoding="ISO-8859-1")

In [None]:
df_bx_books.show()

+----------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|      ISBN|          Book-Title|         Book-Author|Year-Of-Publication|           Publisher|         Image-URL-S|         Image-URL-M|         Image-URL-L|
+----------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|0195153448| Classical Mythology|  Mark P. O. Morford|               2002|Oxford University...|http://images.ama...|http://images.ama...|http://images.ama...|
|0002005018|        Clara Callan|Richard Bruce Wright|               2001|HarperFlamingo Ca...|http://images.ama...|http://images.ama...|http://images.ama...|
|0060973129|Decision in Normandy|        Carlo D'Este|               1991|     HarperPerennial|http://images.ama...|http://images.ama...|http://images.ama...|
|0374157065|Flu: The Story of...|    Gina Bari

### Cleaning

In [None]:
df_bx_books = df_bx_books.withColumnRenamed('Book-Title', 'BookTitle')
df_bx_books = df_bx_books.withColumnRenamed('Book-Author', 'BookAuthor')
df_bx_books = df_bx_books.withColumnRenamed('Year-Of-Publication', 'YearOfPublication')
df_bx_books = df_bx_books.drop(df_bx_books['Image-URL-S'])
df_bx_books = df_bx_books.drop(df_bx_books['Image-URL-M'])
df_bx_books = df_bx_books.drop(df_bx_books['Image-URL-L'])

In [None]:
df_bx_books.show(5)

+----------+--------------------+--------------------+-----------------+--------------------+
|      ISBN|           BookTitle|          BookAuthor|YearOfPublication|           Publisher|
+----------+--------------------+--------------------+-----------------+--------------------+
|0195153448| Classical Mythology|  Mark P. O. Morford|             2002|Oxford University...|
|0002005018|        Clara Callan|Richard Bruce Wright|             2001|HarperFlamingo Ca...|
|0060973129|Decision in Normandy|        Carlo D'Este|             1991|     HarperPerennial|
|0374157065|Flu: The Story of...|    Gina Bari Kolata|             1999|Farrar Straus Giroux|
|0393045218|The Mummies of Ur...|     E. J. W. Barber|             1999|W. W. Norton &amp...|
+----------+--------------------+--------------------+-----------------+--------------------+
only showing top 5 rows



# Rating Data

### Reading

In [None]:
# Get Columns Function
def parsingInput(line):
    data = []

    fields = line.split(";")
    for field in fields:
      field = field.replace('"', '')
      field = field.replace(',', '')
      field = field.replace('\n', '')
      data.append(field)
    
    return data

columns = []
data = []

with open(file_bx_rating, mode='r', encoding="ISO-8859-1") as file:
  columns = parsingInput(file.readline())
  other_lines = file.readlines()
file.close()

for line in other_lines:
  data.append(parsingInput(line))

df_bx_rating = spark.createDataFrame(data, columns)


In [None]:
df_bx_rating.show(5)
df_bx_rating.printSchema()

+-------+----------+-----------+
|User-ID|      ISBN|Book-Rating|
+-------+----------+-----------+
| 276725|034545104X|          0|
| 276726|0155061224|          5|
| 276727|0446520802|          0|
| 276729|052165615X|          3|
| 276729|0521795028|          6|
+-------+----------+-----------+
only showing top 5 rows

root
 |-- User-ID: string (nullable = true)
 |-- ISBN: string (nullable = true)
 |-- Book-Rating: string (nullable = true)



### Cleaning

In [None]:
df_bx_rating = df_bx_rating.withColumnRenamed('User-ID', 'UserID')
df_bx_rating = df_bx_rating.withColumnRenamed('Book-Rating', 'BookRating')
df_bx_rating = df_bx_rating.withColumn('BookRating', df_bx_rating.BookRating.cast(IntegerType()))
df_bx_rating.printSchema()
df_bx_rating.show(5)

root
 |-- UserID: string (nullable = true)
 |-- ISBN: string (nullable = true)
 |-- BookRating: integer (nullable = true)

+------+----------+----------+
|UserID|      ISBN|BookRating|
+------+----------+----------+
|276725|034545104X|         0|
|276726|0155061224|         5|
|276727|0446520802|         0|
|276729|052165615X|         3|
|276729|0521795028|         6|
+------+----------+----------+
only showing top 5 rows



# Create CSV File for Tableau

In [None]:
print("Books")
df_bx_books.printSchema()
print("Users")
df_bx_users.printSchema()
print("Rating")
df_bx_rating.printSchema()

Books
root
 |-- ISBN: string (nullable = true)
 |-- BookTitle: string (nullable = true)
 |-- BookAuthor: string (nullable = true)
 |-- YearOfPublication: integer (nullable = true)
 |-- Publisher: string (nullable = true)

Users
root
 |-- UserID: string (nullable = true)
 |-- Age: decimal(10,0) (nullable = true)
 |-- UserCity: string (nullable = true)
 |-- UserCountry: string (nullable = true)

Rating
root
 |-- UserID: string (nullable = true)
 |-- ISBN: string (nullable = true)
 |-- BookRating: integer (nullable = true)



In [None]:
df_bx_books.show(5)

+----------+--------------------+--------------------+-----------------+--------------------+
|      ISBN|           BookTitle|          BookAuthor|YearOfPublication|           Publisher|
+----------+--------------------+--------------------+-----------------+--------------------+
|0195153448| Classical Mythology|  Mark P. O. Morford|             2002|Oxford University...|
|0002005018|        Clara Callan|Richard Bruce Wright|             2001|HarperFlamingo Ca...|
|0060973129|Decision in Normandy|        Carlo D'Este|             1991|     HarperPerennial|
|0374157065|Flu: The Story of...|    Gina Bari Kolata|             1999|Farrar Straus Giroux|
|0393045218|The Mummies of Ur...|     E. J. W. Barber|             1999|W. W. Norton &amp...|
+----------+--------------------+--------------------+-----------------+--------------------+
only showing top 5 rows



In [None]:
df_bx_rating.show(5)

+------+----------+----------+
|UserID|      ISBN|BookRating|
+------+----------+----------+
|276725|034545104X|         0|
|276726|0155061224|         5|
|276727|0446520802|         0|
|276729|052165615X|         3|
|276729|0521795028|         6|
+------+----------+----------+
only showing top 5 rows



In [None]:
df_books_rating = df_bx_books.join(df_bx_rating, df_bx_books.ISBN == df_bx_rating.ISBN, "inner")
df_books_rating = df_books_rating.drop(df_bx_rating.ISBN)
df_books_rating.show(5)

+----------+--------------------+-------------+-----------------+--------------------+------+----------+
|      ISBN|           BookTitle|   BookAuthor|YearOfPublication|           Publisher|UserID|BookRating|
+----------+--------------------+-------------+-----------------+--------------------+------+----------+
|0002234947|Miss Hobbema Pageant|W.P. Kinsella|             1990|Harpercollins Juv...| 39608|         0|
|0002234947|Miss Hobbema Pageant|W.P. Kinsella|             1990|Harpercollins Juv...|219445|         0|
|0002234947|Miss Hobbema Pageant|W.P. Kinsella|             1990|Harpercollins Juv...|227275|         0|
|0007640617|  Energize Your Life|   Nic Rowley|             2002| Thorsons Publishers|189334|         0|
|0020444400|The voyage of the...|   C. S Lewis|             1986|             Collier|207782|         9|
+----------+--------------------+-------------+-----------------+--------------------+------+----------+
only showing top 5 rows



In [None]:
df_bx_users.printSchema()

root
 |-- UserID: string (nullable = true)
 |-- Age: decimal(10,0) (nullable = true)
 |-- UserCity: string (nullable = true)
 |-- UserCountry: string (nullable = true)



In [None]:
df_result = df_books_rating.join(df_bx_users, df_books_rating.UserID == df_bx_users.UserID, "inner")
df_result = df_result.drop(df_bx_users.UserID)
df_result.show(5)

+----------+--------------------+----------------+-----------------+----------------+------+----------+---+------------+-----------+
|      ISBN|           BookTitle|      BookAuthor|YearOfPublication|       Publisher|UserID|BookRating|Age|    UserCity|UserCountry|
+----------+--------------------+----------------+-----------------+----------------+------+----------+---+------------+-----------+
|0553297864| Dance While You Can|Shirley MacLaine|             1992|    Bantam Books|100010|         0| 20|eaton rapids|        usa|
|0451192095|The Border Empire...|   Ralph Compton|             1997|     Signet Book|100010|         0| 20|eaton rapids|        usa|
|0451187873|  The Killing Season|   Ralph Compton|             1996|     Signet Book|100010|         0| 20|eaton rapids|        usa|
|0671743279|       Pandora's Box|  Elizabeth Gage|             1996|    Pocket Books|100010|         0| 20|eaton rapids|        usa|
|0345396936|       Cry to Heaven|       Anne Rice|             1995|B

In [None]:
df_result.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df_result.columns]).show()

+----+---------+----------+-----------------+---------+------+----------+---+--------+-----------+
|ISBN|BookTitle|BookAuthor|YearOfPublication|Publisher|UserID|BookRating|Age|UserCity|UserCountry|
+----+---------+----------+-----------------+---------+------+----------+---+--------+-----------+
|   0|        0|         0|                0|        0|     0|         0|  0|       0|          0|
+----+---------+----------+-----------------+---------+------+----------+---+--------+-----------+



In [None]:
df_result.count()

940675

In [None]:
result_directory = data_path + "/../result"

if os.path.exists(result_directory) and os.path.isdir(result_directory):
    shutil.rmtree(result_directory)

df_result.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").option("encoding", "ISO-8859-1").save(result_directory)


In [None]:
os.listdir(result_directory)

['part-00000-40a4e682-2ce1-4205-afe9-b87ef9164e36-c000.csv',
 '.part-00000-40a4e682-2ce1-4205-afe9-b87ef9164e36-c000.csv.crc',
 '_SUCCESS',
 '._SUCCESS.crc']

In [None]:
spark.stop()