#**Big Data Project - BookCrossing** 📚

###**Preparing work environment**

In [None]:
!apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null 
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark 

Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:2 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:5 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:6 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Ign:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:8 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [696 B]
Hit:9 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:10 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:11 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:12 http://ppa.launchpad.net/cran/libgit2/u

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

In [None]:
import findspark
findspark.init("spark-3.1.2-bin-hadoop3.2")

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

from pyspark.sql import Row
from pyspark.sql import functions as f
from pyspark.sql.types import IntegerType, DecimalType, StringType

Connecting google drive to colab notebook

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


**Read Data**

In [None]:
data_path = "/content/drive/MyDrive/Big Data/Big Data Project"
Bx_Users = data_path + "/" + "BX-Users.csv"
Bx_Books = data_path + "/" + "BX-Books.csv"
Bx_Ratings = data_path + "/" + "BX-Book-Ratings.csv"

**Creating Data Frames for each data file**\
(including countries file for data cleansing in the location column of users' file)

In [None]:
Dim_Users = spark.read.csv(Bx_Users, sep=";", header=True, inferSchema=True, encoding="ISO-8859-1")
Dim_Books = spark.read.csv(Bx_Books, sep=";", header=True, inferSchema=True, encoding="ISO-8859-1")

Fact_Ratings - the file containing several separators between fields, must be replaced before being imported.

In [None]:
def parsingInput(line):
    fileds_data = []

    fields = line.split(";")
    for field in fields:
      field = field.replace('"', '')
      field = field.replace(',', '')
      field = field.replace('\n', '')
      fileds_data.append(field)
    
    return fileds_data

columns = []
fileds_data = []

with open(Bx_Ratings, mode='r', encoding="ISO-8859-1") as file:
  columns = parsingInput(file.readline())
  other_lines = file.readlines()
file.close()

for line in other_lines:
  fileds_data.append(parsingInput(line))

Fact_Ratings = spark.createDataFrame(fileds_data, columns)

###**Data Cleansing**

**Dim_Users**

In [None]:
Dim_Users.printSchema()

root
 |-- User-ID: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Age: string (nullable = true)



In [None]:
Dim_Users.show()

+-------+--------------------+----+
|User-ID|            Location| Age|
+-------+--------------------+----+
|      1|  nyc, new york, usa|NULL|
|      2|stockton, califor...|  18|
|      3|moscow, yukon ter...|NULL|
|      4|porto, v.n.gaia, ...|  17|
|      5|farnborough, hant...|NULL|
|      6|santa monica, cal...|  61|
|      7| washington, dc, usa|NULL|
|      8|timmins, ontario,...|NULL|
|      9|germantown, tenne...|NULL|
|     10|albacete, wiscons...|  26|
|     11|melbourne, victor...|  14|
|     12|fort bragg, calif...|NULL|
|     13|barcelona, barcel...|  26|
|     14|mediapolis, iowa,...|NULL|
|     15|calgary, alberta,...|NULL|
|     16|albuquerque, new ...|NULL|
|     17|chesapeake, virgi...|NULL|
|     18|rio de janeiro, r...|  25|
|     19|           weston, ,|  14|
|     20|langhorne, pennsy...|  19|
+-------+--------------------+----+
only showing top 20 rows



In [None]:
Dim_Users.count()

278859

In [None]:
Dim_Users=Dim_Users.withColumnRenamed('User-ID','UserID')

Converting dataframe columns to lowercase

In [None]:
for c in Dim_Users.columns:
  Dim_Users=Dim_Users.select("*", f.lower(Dim_Users[c]))
  Dim_Users = Dim_Users.drop(c)
  Dim_Users = Dim_Users.withColumnRenamed('lower('+c+')', c)

Dim_Users.show()

+------+--------------------+----+
|UserID|            Location| Age|
+------+--------------------+----+
|     1|  nyc, new york, usa|null|
|     2|stockton, califor...|  18|
|     3|moscow, yukon ter...|null|
|     4|porto, v.n.gaia, ...|  17|
|     5|farnborough, hant...|null|
|     6|santa monica, cal...|  61|
|     7| washington, dc, usa|null|
|     8|timmins, ontario,...|null|
|     9|germantown, tenne...|null|
|    10|albacete, wiscons...|  26|
|    11|melbourne, victor...|  14|
|    12|fort bragg, calif...|null|
|    13|barcelona, barcel...|  26|
|    14|mediapolis, iowa,...|null|
|    15|calgary, alberta,...|null|
|    16|albuquerque, new ...|null|
|    17|chesapeake, virgi...|null|
|    18|rio de janeiro, r...|  25|
|    19|           weston, ,|  14|
|    20|langhorne, pennsy...|  19|
+------+--------------------+----+
only showing top 20 rows



Converting datatypes

In [None]:
Dim_Users = Dim_Users.withColumn('UserID', Dim_Users.UserID.cast(IntegerType()))
Dim_Users = Dim_Users.withColumn('Age', Dim_Users.Age.cast(DecimalType()))

Dim_Users.dtypes

[('UserID', 'int'), ('Location', 'string'), ('Age', 'decimal(10,0)')]

**Location column** - splitting the column to three columns: city, region, & country

In [None]:
split_location = f.split(Dim_Users.Location, ',')
Dim_Users = Dim_Users.withColumn('UserCity', split_location.getItem(0))
Dim_Users = Dim_Users.withColumn('UserRegion', split_location.getItem(1))
Dim_Users = Dim_Users.withColumn('UserCountry', split_location.getItem(2))
Dim_Users = Dim_Users.drop('Location')

Cleaning punctuation in location columns

In [None]:
punc_regex = '''(^ |\!|\(|\)|-|\[|\]|\{|\}|;|:|,|'|"|\<|\>|\.|\/|\?|@|�|#|\$|%|\^|\&|\*|_|~|\d| $)'''

Dim_Users=Dim_Users.withColumn('UserCity', f.regexp_replace('UserCity', punc_regex, ''))
Dim_Users=Dim_Users.withColumn('UserRegion', f.regexp_replace('UserRegion', punc_regex, ''))
Dim_Users=Dim_Users.withColumn('UserCountry', f.regexp_replace('UserCountry', punc_regex, ''))

Finding cells with a null value and empty cells in all columns

In [None]:
Dim_Users.select([f.count(f.when(f.col(c).contains('None') |\
                                 f.col(c).contains('NULL') | \
                                 (f.col(c) == '' ) | \
                                 (f.col(c) == ' ' ) | \
                                 f.col(c).isNull() | \
                                 f.isnan(c), c)).alias(c) for c in Dim_Users.columns]).show()

+------+------+--------+----------+-----------+
|UserID|   Age|UserCity|UserRegion|UserCountry|
+------+------+--------+----------+-----------+
|     1|110763|     327|      4022|       4641|
+------+------+--------+----------+-----------+



Age Coulmn - handling unusable values for analysis


In [None]:
Dim_Users.describe(["Age"]).show()

+-------+-----------------+
|summary|              Age|
+-------+-----------------+
|  count|           168096|
|   mean|          34.7514|
| stddev|14.42809738245543|
|    min|                0|
|    max|              244|
+-------+-----------------+



Transforming the age column's unusable values

In [None]:
Dim_Users = Dim_Users.withColumn("Age", f.when((Dim_Users.Age > 100)|(Dim_Users.Age<10)|(Dim_Users["Age"].isNull()),-1).otherwise(Dim_Users.Age))

Verifying transformation

In [None]:
Dim_Users.registerTempTable("Users")
spark.sql('select * from Users where (Age is null or Age>100 or Age<10) and Age>-1').count()

0

In [None]:
Dim_Users.select([f.count(f.when(f.col(c).contains('None') |\
                                 f.col(c).contains('NULL') | \
                                 (f.col(c) == '' ) | \
                                 (f.col(c) == ' ' ) | \
                                 f.col(c).isNull() | \
                                 f.isnan(c), c)).alias(c) for c in Dim_Users.columns]).show()

+------+---+--------+----------+-----------+
|UserID|Age|UserCity|UserRegion|UserCountry|
+------+---+--------+----------+-----------+
|     1|  0|     327|      4022|       4641|
+------+---+--------+----------+-----------+



The result shows a singal row with a null for UserID column - finding it

In [None]:
Dim_Users.filter(f.col('UserID').contains('None') | \
                            f.col('UserID').contains('NULL') | \
                            (f.col('UserID') == '' ) | \
                            (f.col('UserID') == ' ' ) | \
                            f.col('UserID').isNull() | \
                            f.isnan('UserID')).show()

+------+---+--------+----------+-----------+
|UserID|Age|UserCity|UserRegion|UserCountry|
+------+---+--------+----------+-----------+
|  null| -1|    null|      null|       null|
+------+---+--------+----------+-----------+



Due to there is no data, we are dropping the row

In [None]:
Dim_Users=Dim_Users.filter(f.col('UserID').isNotNull())

**UserCity column** - handling unusable values for analysis

In [None]:
Dim_Users.sort(Dim_Users.UserCity.desc()).show()

+------+---+--------+---------------+-----------+
|UserID|Age|UserCity|     UserRegion|UserCountry|
+------+---+--------+---------------+-----------+
|184164| 30|   ýzmýr|             na|     turkey|
|268024| 27|   ýzmir|    connecticut|     turkey|
|278595| -1|   ýzmir|             na|     turkey|
|106234| 20|   ýzmir|             na|     turkey|
|188910| -1|   ýzmir|             na|     turkey|
|  6162| 25|   ýzmir|             na|     turkey|
| 19675| -1|ýstanbul|             na|     turkey|
| 82759| 25|ýstanbul|       ýstanbul|     turkey|
| 42853| -1|ýstanbul|             uk|     turkey|
|274464| 25|ýstanbul|       ýstanbul|     turkey|
| 14246| -1|ýstanbul|             na|     turkey|
|125152| -1|ýstanbul|             na|     turkey|
| 36095| -1|ýstanbul|             na|     turkey|
| 79393| -1|ýstanbul|        ontario|     turkey|
| 36670| -1|ýstanbul| marmara region|     turkey|
|218117| 25|ýstanbul|             na|     turkey|
|221792| -1|ýstanbul|        marmara|     turkey|


In [None]:
Dim_Users = Dim_Users.withColumn("UserCity", f.when(Dim_Users.UserCity=='na','unknown').otherwise(Dim_Users.UserCity))

In [None]:
Dim_Users = Dim_Users.withColumn("UserCity",f.when((Dim_Users["UserCity"] == '')|(Dim_Users["UserCity"] == ' '),'unknown').otherwise(f.col("UserCity")))

Dim_Users.filter(Dim_Users['UserCity'].like ("")|Dim_Users['UserCity'].like (" ")).count()

0

**UserRegion column** - handling unusable values for analysis

In [None]:
Dim_Users = Dim_Users.withColumn("UserRegion", f.when(Dim_Users.UserRegion=='na','unknown').otherwise(Dim_Users.UserRegion))

In [None]:
Dim_Users = Dim_Users.withColumn("UserRegion",f.when((Dim_Users["UserRegion"] == '')|(Dim_Users["UserRegion"] == ' '),'unknown').otherwise(f.col("UserRegion")))

Dim_Users.filter(Dim_Users['UserRegion'].like ("")|Dim_Users['UserRegion'].like (" ")).count()

0

**UserCountry column**

Downloading "countries" file from GITHUB

In [None]:
! wget "https://raw.githubusercontent.com/lukes/ISO-3166-Countries-with-Regional-Codes/master/all/all.csv"

--2022-04-11 15:18:00--  https://raw.githubusercontent.com/lukes/ISO-3166-Countries-with-Regional-Codes/master/all/all.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.108.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 20759 (20K) [text/plain]
Saving to: ‘all.csv’


2022-04-11 15:18:00 (103 MB/s) - ‘all.csv’ saved [20759/20759]



In [None]:
countries = spark.read.format("csv").option("inferSchema",True).option("header", True).option("encoding","ISO-8859-1").csv('/content/all.csv')
countries.show()

+-------------------+-------+-------+------------+-------------+--------+--------------------+-------------------+-----------+---------------+------------------------+
|               name|alpha-2|alpha-3|country-code|   iso_3166-2|  region|          sub-region|intermediate-region|region-code|sub-region-code|intermediate-region-code|
+-------------------+-------+-------+------------+-------------+--------+--------------------+-------------------+-----------+---------------+------------------------+
|        Afghanistan|     AF|    AFG|           4|ISO 3166-2:AF|    Asia|       Southern Asia|               null|        142|             34|                    null|
|     Ãland Islands|     AX|    ALA|         248|ISO 3166-2:AX|  Europe|     Northern Europe|               null|        150|            154|                    null|
|            Albania|     AL|    ALB|           8|ISO 3166-2:AL|  Europe|     Southern Europe|               null|        150|             39|                  

In [None]:
countries = countries.drop('country-code','iso_3166-2','region','sub-region','intermediate-region','region-code','sub-region-code','intermediate-region-code')
countries.show()

+-------------------+-------+-------+
|               name|alpha-2|alpha-3|
+-------------------+-------+-------+
|        Afghanistan|     AF|    AFG|
|     Ãland Islands|     AX|    ALA|
|            Albania|     AL|    ALB|
|            Algeria|     DZ|    DZA|
|     American Samoa|     AS|    ASM|
|            Andorra|     AD|    AND|
|             Angola|     AO|    AGO|
|           Anguilla|     AI|    AIA|
|         Antarctica|     AQ|    ATA|
|Antigua and Barbuda|     AG|    ATG|
|          Argentina|     AR|    ARG|
|            Armenia|     AM|    ARM|
|              Aruba|     AW|    ABW|
|          Australia|     AU|    AUS|
|            Austria|     AT|    AUT|
|         Azerbaijan|     AZ|    AZE|
|            Bahamas|     BS|    BHS|
|            Bahrain|     BH|    BHR|
|         Bangladesh|     BD|    BGD|
|           Barbados|     BB|    BRB|
+-------------------+-------+-------+
only showing top 20 rows



Converting countries' data frame columns to lowercase

In [None]:
for c in countries.columns:
  countries=countries.select("*", f.lower(countries[c]))
  countries = countries.drop(c)
  countries = countries.withColumnRenamed('lower('+c+')', c)

countries.show()

+-------+-------+-------------------+
|alpha-2|alpha-3|               name|
+-------+-------+-------------------+
|     af|    afg|        afghanistan|
|     ax|    ala|     ãland islands|
|     al|    alb|            albania|
|     dz|    dza|            algeria|
|     as|    asm|     american samoa|
|     ad|    and|            andorra|
|     ao|    ago|             angola|
|     ai|    aia|           anguilla|
|     aq|    ata|         antarctica|
|     ag|    atg|antigua and barbuda|
|     ar|    arg|          argentina|
|     am|    arm|            armenia|
|     aw|    abw|              aruba|
|     au|    aus|          australia|
|     at|    aut|            austria|
|     az|    aze|         azerbaijan|
|     bs|    bhs|            bahamas|
|     bh|    bhr|            bahrain|
|     bd|    bgd|         bangladesh|
|     bb|    brb|           barbados|
+-------+-------+-------------------+
only showing top 20 rows



Generating lists from countries' dataframe columns

In [None]:
countryname = countries.select('name').collect()
alpha2 = countries.select('alpha-2').collect()
alpha3 = countries.select('alpha-3').collect()

Generating a dictionary of correct countries, from the countries file, as the  keys with found countries, from Dim_Users, as the values

In [None]:
usercountries = Dim_Users.select('UserCountry').collect()

newCountries = {}
for i,country in enumerate(usercountries):
  if country in countryname:
    continue
  elif country in alpha2:
    index = alpha2.index(country)
    selectedCountry = countryname[index][0]
  elif country in alpha3:
    index = alpha3.index(country)
    selectedCountry = countryname[index][0]
  else: 
    selectedCountry = 'unknown'

  if (selectedCountry in newCountries.keys()):
    newCountries[selectedCountry].add(country[0])
  else:
    newCountries[selectedCountry] = { country[0] }


newCountries.keys()

dict_keys(['united states of america', 'unknown', 'namibia', 'germany', 'canada', 'panama', 'china', 'ukraine', 'ghana', 'azerbaijan', 'united kingdom of great britain and northern ireland', 'puerto rico', 'malaysia', 'indonesia', 'tunisia', 'kazakhstan', 'new zealand', 'india', 'bosnia and herzegovina', 'israel', 'netherlands', 'holy see'])

Updating UserCountry column with newCountries dictionary

In [None]:
condition = f.when(f.col('UserCountry').isNull(), 'unknown')

for country in newCountries.keys():
  optionalVariations = list(newCountries[country])
  condition = condition.when(f.col('UserCountry').isin(optionalVariations), country)

condition = condition.otherwise(f.col('UserCountry'))
Dim_Users = Dim_Users.withColumn('UserCountry', condition)


Dim_Users.show()


+------+---+--------------+---------------+--------------------+
|UserID|Age|      UserCity|     UserRegion|         UserCountry|
+------+---+--------------+---------------+--------------------+
|     1| -1|           nyc|       new york|united states of ...|
|     2| 18|      stockton|     california|united states of ...|
|     3| -1|        moscow|yukon territory|             unknown|
|     4| 17|         porto|         vngaia|            portugal|
|     5| -1|   farnborough|          hants|             unknown|
|     6| 61|  santa monica|     california|united states of ...|
|     7| -1|    washington|             dc|united states of ...|
|     8| -1|       timmins|        ontario|              canada|
|     9| -1|    germantown|      tennessee|united states of ...|
|    10| 26|      albacete|      wisconsin|               spain|
|    11| 14|     melbourne|       victoria|           australia|
|    12| -1|    fort bragg|     california|united states of ...|
|    13| 26|     barcelon

Checking duplicates

In [None]:
dup = Dim_Users.join(
    Dim_Users.groupBy(Dim_Users.columns).agg((f.count("*")>1).cast(IntegerType()).alias("Duplicate_indicator")),
    on=Dim_Users.columns,
    how="inner")

dup.select(f.sum("Duplicate_indicator")).show()

+------------------------+
|sum(Duplicate_indicator)|
+------------------------+
|                       0|
+------------------------+



**Dim_Books**

In [None]:
Dim_Books.printSchema()

root
 |-- ISBN: string (nullable = true)
 |-- Book-Title: string (nullable = true)
 |-- Book-Author: string (nullable = true)
 |-- Year-Of-Publication: integer (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- Image-URL-S: string (nullable = true)
 |-- Image-URL-M: string (nullable = true)
 |-- Image-URL-L: string (nullable = true)



In [None]:
Dim_Books.count()

271379

In [None]:
Dim_Books = Dim_Books.withColumnRenamed('Book-Title', 'BookTitle')
Dim_Books = Dim_Books.withColumnRenamed('Book-Author', 'BookAuthor')
Dim_Books = Dim_Books.withColumnRenamed('Year-Of-Publication', 'YearOfPublication')

Dropping irrelevant columns for analysis

In [None]:
Dim_Books = Dim_Books.drop('Image-URL-S','Image-URL-M','Image-URL-L')

Converting dataframe columns to lowercase

In [None]:
for c in Dim_Books.columns:
  Dim_Books = Dim_Books.select("*", f.lower(Dim_Books[c]))
  Dim_Books = Dim_Books.drop(c)
  Dim_Books = Dim_Books.withColumnRenamed('lower('+c+')', c)

Converting datatype to the Year-Of-Publication column, since it was changed when we converted to lowercase

In [None]:
Dim_Books = Dim_Books.withColumn('YearOfPublication', Dim_Books['YearOfPublication'].cast(IntegerType()))

Cleaning punctuation in columns with a string datatype (excluding ISBN)

In [None]:
punc_regex = '''(^ |\!|\(|\)|-|\[|\]|\{|\}|;|:|,|'|"|\<|\>|\.|\/|\?|@|�|#|\$|%|\^|\&|\*|_|~|\d| $)'''

Dim_Books=Dim_Books.withColumn('BookTitle', f.regexp_replace('BookTitle', punc_regex, ''))
Dim_Books=Dim_Books.withColumn('BookAuthor', f.regexp_replace('BookAuthor', punc_regex, ''))
Dim_Books=Dim_Books.withColumn('Publisher', f.regexp_replace('Publisher', punc_regex, ''))

In [None]:
Dim_Books.show()

+----------+--------------------+--------------------+-----------------+--------------------+
|      ISBN|           BookTitle|          BookAuthor|YearOfPublication|           Publisher|
+----------+--------------------+--------------------+-----------------+--------------------+
|0195153448| classical mythology|    mark p o morford|             2002|oxford university...|
|0002005018|        clara callan|richard bruce wright|             2001|harperflamingo ca...|
|0060973129|decision in normandy|         carlo deste|             1991|     harperperennial|
|0374157065|flu the story of ...|    gina bari kolata|             1999|farrar straus giroux|
|0393045218|the mummies of ur...|        e j w barber|             1999|w w norton amp co...|
|0399135782|the kitchen gods ...|             amy tan|             1991|    putnam pub group|
|0425176428|what if the world...|       robert cowley|             2000|berkley publishin...|
|0671870432|     pleading guilty|         scott turow|      

Finding cells with a null value and empty  cells

In [None]:
Dim_Books.select([f.count(f.when(f.col(c).contains('None') |\
                                 f.col(c).contains('NULL') | \
                                 (f.col(c) == '' ) | \
                                 (f.col(c) == ' ' ) | \
                                 f.col(c).isNull() | \
                                 f.isnan(c), c)).alias(c) for c in Dim_Books.columns]).show()

+----+---------+----------+-----------------+---------+
|ISBN|BookTitle|BookAuthor|YearOfPublication|Publisher|
+----+---------+----------+-----------------+---------+
|   0|       50|         0|                0|       56|
+----+---------+----------+-----------------+---------+



Book-Title column - handling with empty cells

In [None]:
Dim_Books = Dim_Books.withColumn("BookTitle",f.when((Dim_Books["BookTitle"] == '')|(Dim_Books["BookTitle"] == ' '),'unknown').otherwise(f.col("BookTitle")))

Dim_Books.filter(Dim_Books['BookTitle'].like ("")|Dim_Books["BookTitle"].like (' ')).count()

0

Publisher column - handling with empty cells

In [None]:
Dim_Books = Dim_Books.withColumn("Publisher",f.when((Dim_Books["Publisher"] == '')|(Dim_Books["Publisher"] == ' '),'unknown').otherwise(f.col("Publisher")))

Dim_Books.filter(Dim_Books['Publisher'].like ("")|Dim_Books["Publisher"].like(' ')).count()

0

Year-Of-Publication - handling unusable values for analysis

In [None]:
Dim_Books.describe(["YearOfPublication"]).show()

+-------+------------------+
|summary| YearOfPublication|
+-------+------------------+
|  count|            271379|
|   mean|1959.7560496574902|
| stddev| 258.0113625638109|
|    min|                 0|
|    max|              2050|
+-------+------------------+



Transforming the Year-Of-Publication column's unusable values

In [None]:
Dim_Books = Dim_Books.withColumn('YearOfPublication', f.when((Dim_Books['YearOfPublication'] > 2022)|(Dim_Books['YearOfPublication']==0),1900).otherwise(Dim_Books['YearOfPublication']))

In [None]:
Dim_Books.show()

+----------+--------------------+--------------------+-----------------+--------------------+
|      ISBN|           BookTitle|          BookAuthor|YearOfPublication|           Publisher|
+----------+--------------------+--------------------+-----------------+--------------------+
|0195153448| classical mythology|    mark p o morford|             2002|oxford university...|
|0002005018|        clara callan|richard bruce wright|             2001|harperflamingo ca...|
|0060973129|decision in normandy|         carlo deste|             1991|     harperperennial|
|0374157065|flu the story of ...|    gina bari kolata|             1999|farrar straus giroux|
|0393045218|the mummies of ur...|        e j w barber|             1999|w w norton amp co...|
|0399135782|the kitchen gods ...|             amy tan|             1991|    putnam pub group|
|0425176428|what if the world...|       robert cowley|             2000|berkley publishin...|
|0671870432|     pleading guilty|         scott turow|      

Checking duplicates

In [None]:
dup = Dim_Books.join(
    Dim_Books.groupBy(Dim_Books.columns).agg((f.count("*")>1).cast(IntegerType()).alias("Duplicate_indicator")),
    on=Dim_Books.columns,
    how="inner")

dup.select(f.sum("Duplicate_indicator")).show()

+------------------------+
|sum(Duplicate_indicator)|
+------------------------+
|                     626|
+------------------------+



In [None]:
Dim_Books=Dim_Books.join(
    Dim_Books.groupBy(Dim_Books.columns).agg((f.count("*")>1).cast(IntegerType()).alias("Duplicate_indicator")),
    on=Dim_Books.columns,
    how="inner")

#Dim_Books=Dim_Books.filter(Dim_Books['Duplicate_indicator']==0)

In [None]:
Dim_Books.filter(Dim_Books['Duplicate_indicator']==1).count()

626

In [None]:
Dim_Books.filter(Dim_Books['Duplicate_indicator']==0).count()

**Fact_Ratings**

In [None]:
Fact_Ratings.printSchema()

root
 |-- User-ID: string (nullable = true)
 |-- ISBN: string (nullable = true)
 |-- Book-Rating: string (nullable = true)



In [None]:
Fact_Ratings = Fact_Ratings.withColumn('Book-Rating', Fact_Ratings['Book-Rating'].cast(IntegerType()))

In [None]:
Fact_Ratings.count()

1048575

In [None]:
Fact_Ratings.show()

+-------+----------+-----------+
|User-ID|      ISBN|Book-Rating|
+-------+----------+-----------+
| 276725|034545104X|          0|
| 276726|0155061224|          5|
| 276727|0446520802|          0|
| 276729|052165615X|          3|
| 276729|0521795028|          6|
| 276733|2080674722|          0|
| 276736|3257224281|          8|
| 276737|0600570967|          6|
| 276744|038550120X|          7|
| 276745| 342310538|         10|
| 276746|0425115801|          0|
| 276746|0449006522|          0|
| 276746|0553561618|          0|
| 276746|055356451X|          0|
| 276746|0786013990|          0|
| 276746|0786014512|          0|
| 276747|0060517794|          9|
| 276747|0451192001|          0|
| 276747|0609801279|          0|
| 276747|0671537458|          9|
+-------+----------+-----------+
only showing top 20 rows



Finding cells with a null value

In [None]:
Fact_Ratings.select([f.count(f.when(Fact_Ratings[c].isNull() , c)).alias(c) for c in Fact_Ratings.columns]).show()

+-------+----+-----------+
|User-ID|ISBN|Book-Rating|
+-------+----+-----------+
|      0|   0|          0|
+-------+----+-----------+



Finding empty cells

In [None]:
for c in Fact_Ratings.columns:
  EC=Fact_Ratings.filter(Fact_Ratings[c].like ("")).count()
  print(c + ' ' + str(EC))

User-ID 0
ISBN 0
Book-Rating 0


##**Top ten Books**
(not including books that rated below 10 times)

In [None]:
#creating BookNames dict
BookNames = {row['ISBN']:row['Book-Title'] for row in Dim_Books.collect()}

In [None]:
# Creating 2 Lists from "BookNames" Dictionary
a = list(BookNames.keys())
b = list(BookNames.values())

# Creating one list of tuples from a,b lists
listOfTuples =  list(zip(a,b))


# Creating a list to help define the columns
columns = ['ISBN', 'Book-Title']

# Creating dataframe
df_BookNames = spark.createDataFrame(listOfTuples,columns)
df_BookNames.show()

+----------+--------------------+
|      ISBN|          Book-Title|
+----------+--------------------+
|0195153448| classical mythology|
|0002005018|        clara callan|
|0060973129|decision in normandy|
|0374157065|flu the story of ...|
|0393045218|the mummies of ur...|
|0399135782|the kitchen gods ...|
|0425176428|what if the world...|
|0671870432|     pleading guilty|
|0679425608|under the black f...|
|074322678x|where youll find ...|
|0771074670|nights below stat...|
|080652121x|hitlers secret ba...|
|0887841740|  the middle stories|
|1552041778|            jane doe|
|1558746218|a second chicken ...|
|1567407781|the witchfinder a...|
|1575663937|more cunning than...|
|1881320189|goodbye to the bu...|
|0440234743|       the testament|
|0452264464|beloved plume con...|
+----------+--------------------+
only showing top 20 rows



In [None]:
# Computing average rating for each ISBN(Book ID)
averageRatings = Fact_Ratings.groupBy('ISBN').avg('Book-Rating')

# Computing count of ratings for each ISBN
counts = Fact_Ratings.groupBy('ISBN').count()

# Join the two together
averagesAndCounts = counts.join(averageRatings, 'ISBN')

# Filtering books rated 10 or fewer times 
popularAveragesAndCounts = averagesAndCounts.filter('count > 10')

# Join the results with book names, with ratings desc
BooksAvgRatings = popularAveragesAndCounts.join(df_BookNames,'ISBN').orderBy(popularAveragesAndCounts['avg(Book-Rating)'].desc())

BooksAvgRatings.show(10)

+----------+-----+-----------------+--------------------+
|      ISBN|count| avg(Book-Rating)|          Book-Title|
+----------+-----+-----------------+--------------------+
|0091842050|   11|9.181818181818182|the blue day book...|
|0316779059|   11|9.090909090909092|the baby book eve...|
|8478886451|   13|8.384615384615385|harry potter y el...|
|0836213122|   16|            8.375|theres treasure e...|
|0394823370|   12|8.166666666666666|           the lorax|
|0060294698|   14|8.142857142857142|    allamerican girl|
|1577780728|   14|8.071428571428571|jesus freaks dc t...|
|0615116426|   15|8.066666666666666|marching through ...|
|3551551936|   11|              8.0|harry potter und ...|
|1844262553|   49|7.979591836734694|                free|
+----------+-----+-----------------+--------------------+
only showing top 10 rows



###**Generating CSV files for each dataframe**

Deleting any files in the "folderPath" to ensure the folder is empty

In [None]:
#creating a new folder for tables files
folderPath = '/content/drive/MyDrive/TCBDA/Big Data/Big Data Project/results'

In [None]:
import os
import glob

filesInFolder = folderPath + '/*.*'

fileList = glob.glob(filesInFolder)

for filePath in fileList:
    try:
        os.remove(filePath)
    except:
        print("Error while deleting file : ", filePath)


fileList = glob.glob(filesInFolder)

In [None]:
Fact_Ratings.coalesce(1).write.mode("append").option("header", "true").csv(folderPath)
Dim_Users.coalesce(1).write.mode("append").option("header", "true").csv(folderPath)
Dim_Books.coalesce(1).write.mode("append").option("header", "true").csv(folderPath)
BooksAvgRatings.coalesce(1).write.mode("append").option("header", "true").csv(folderPath)

In [None]:
#spark.stop()