# First install the environment

In [1]:
# 1) First: install Java, Spark and and run a local Spark session by just running this on Google Colab:
!apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null   # !apt-get --> install java
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz  # !wget  --> download file from url
!tar xf spark-3.1.2-bin-hadoop3.2.tgz  # !tar --> like unzip 
!pip install -q findspark  # !pip  --> instal a package, we cant import a library without installing it first, most libraries that we used were already installed
# This are INSTALLATION COMMANDS IN LINUX that we run in our collab space, it's similar to downloading programs an installing them on our computers
# installs Apache Spark 3.1.2, Java 8, and Findspark, a library that makes it easy for Python to find Spark


# 2) Second: set the locations where Spark and Java are installed to let know Colab where to find it.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"


# 3) Third: import spark libraries and use them
import findspark
findspark.init("spark-3.1.2-bin-hadoop3.2") # SPARK_HOME
from pyspark.sql import SparkSession
# Create the session - We need to remember to close it at the end
# The session is basically our connection to Spark layer in the Hadoop ecosystem
spark = SparkSession.builder.master("local[*]").getOrCreate()
from pyspark.sql import Row
from pyspark.sql import functions

Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:2 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:4 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:5 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:7 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Ign:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:9 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:11 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Hit:12 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Get:13 http://archive.ubuntu.com/ubuntu b

## Establishing connection between "Colab" to "Google Drive"

In [2]:
# Code for connecting our google drive to this collab notebook
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [3]:
!ls '/content/drive/MyDrive/BDA/6. Big Data Project'  # Check what files we have in the path

'bd reference.ipynb'	      countries.csv
'BigData_Project (1).ipynb'   countries.ipynb
'BigData_Project (3).ipynb'  'Dashboard 1.docx'
 BigData_Project.ipynb	      Dim_Books.csv
 Book1.twb		      files
 BX-Book-Ratings.csv	      left_join_rating_uesr.csv
 BX-Books.csv		      mazal.ipynb
 BX-Users.csv		      states.csv
'Copy of Spark Sql.ipynb'     users.csv


In [4]:
# The session is basically our connection to Spark layer in the Hadoop ecosystem
spark = SparkSession.builder.appName('ops').getOrCreate()

# Read and analyse BX-Books 

In [5]:
path='/content/drive/MyDrive/BDA/6. Big Data Project'

In [6]:
df_books = spark.read.csv(path+'/BX-Books.csv', inferSchema = True, header = True,sep = ';',encoding="ISO-8859-1")

In [None]:
df_books.show(10,truncate=False)

+----------+--------------------------------------------------------------------------------------------------+--------------------+-------------------+--------------------------+------------------------------------------------------------+------------------------------------------------------------+------------------------------------------------------------+
|ISBN      |Book-Title                                                                                        |Book-Author         |Year-Of-Publication|Publisher                 |Image-URL-S                                                 |Image-URL-M                                                 |Image-URL-L                                                 |
+----------+--------------------------------------------------------------------------------------------------+--------------------+-------------------+--------------------------+------------------------------------------------------------+----------------------------------

In [None]:
df_books.printSchema()

root
 |-- ISBN: string (nullable = true)
 |-- Book-Title: string (nullable = true)
 |-- Book-Author: string (nullable = true)
 |-- Year-Of-Publication: integer (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- Image-URL-S: string (nullable = true)
 |-- Image-URL-M: string (nullable = true)
 |-- Image-URL-L: string (nullable = true)



##Change column names and values

In [7]:
#change column names in DF
df_books = df_books.withColumnRenamed("ISBN","isbn") \
        .withColumnRenamed("Book-Title","title") \
        .withColumnRenamed("Book-Author","author") \
        .withColumnRenamed("Year-Of-Publication","year") \
        .withColumnRenamed("Publisher","publisher") \
        .withColumnRenamed("Image-URL-S","image_s") \
        .withColumnRenamed("Image-URL-M","image_m") \
        .withColumnRenamed("Image-URL-L","image_l") 

In [8]:
#change names to lower
import pyspark.sql.functions as f
df_books = df_books.withColumn("title",f.lower("title")) \
          .withColumn("author",f.lower("author")) \
          .withColumn("publisher",f.lower("publisher"))       
df_books.show()
df_books.printSchema()

+----------+--------------------+--------------------+----+--------------------+--------------------+--------------------+--------------------+
|      isbn|               title|              author|year|           publisher|             image_s|             image_m|             image_l|
+----------+--------------------+--------------------+----+--------------------+--------------------+--------------------+--------------------+
|0195153448| classical mythology|  mark p. o. morford|2002|oxford university...|http://images.ama...|http://images.ama...|http://images.ama...|
|0002005018|        clara callan|richard bruce wright|2001|harperflamingo ca...|http://images.ama...|http://images.ama...|http://images.ama...|
|0060973129|decision in normandy|        carlo d'este|1991|     harperperennial|http://images.ama...|http://images.ama...|http://images.ama...|
|0374157065|flu: the story of...|    gina bari kolata|1999|farrar straus giroux|http://images.ama...|http://images.ama...|http://images.

##Check the field ISBN

In [9]:
from pyspark.sql.functions import desc,asc,col,trim
df_books.orderBy(desc("isbn")).show()
df_books.orderBy(asc("isbn")).show()

+----------+--------------------+--------------------+----+--------------------+--------------------+--------------------+--------------------+
|      isbn|               title|              author|year|           publisher|             image_s|             image_m|             image_l|
+----------+--------------------+--------------------+----+--------------------+--------------------+--------------------+--------------------+
|B0002K6K8O|the underground city|         jules verne|   0|       digireads.com|http://images.ama...|http://images.ama...|http://images.ama...|
|B0002JV9PY|the blockade runners|         jules verne|   0|       digireads.com|http://images.ama...|http://images.ama...|http://images.ama...|
|B00029DGGO|good wife strikes...|    elizabeth buchan|   0|        viking adult|http://images.ama...|http://images.ama...|http://images.ama...|
|B000234NC6|it must've been s...| jeffrey steingarten|2002|               knopf|http://images.ama...|http://images.ama...|http://images.

### preparing the field ISBN

In [10]:
# lets change all letters in isbn to upper
df_books=df_books.withColumn("isbn",f.upper(col('isbn')))  

In [11]:
# trim spaces and other special characters in isbn
from pyspark.sql.functions import regexp_replace
df_books=df_books.withColumn('isbn', regexp_replace(df_books['isbn']," ",""))
df_books=df_books.withColumn('isbn', regexp_replace(df_books['isbn'],"_",""))
df_books=df_books.withColumn('isbn', regexp_replace(df_books['isbn'],"#",""))
df_books.show()
df_books.printSchema

+----------+--------------------+--------------------+----+--------------------+--------------------+--------------------+--------------------+
|      isbn|               title|              author|year|           publisher|             image_s|             image_m|             image_l|
+----------+--------------------+--------------------+----+--------------------+--------------------+--------------------+--------------------+
|0195153448| classical mythology|  mark p. o. morford|2002|oxford university...|http://images.ama...|http://images.ama...|http://images.ama...|
|0002005018|        clara callan|richard bruce wright|2001|harperflamingo ca...|http://images.ama...|http://images.ama...|http://images.ama...|
|0060973129|decision in normandy|        carlo d'este|1991|     harperperennial|http://images.ama...|http://images.ama...|http://images.ama...|
|0374157065|flu: the story of...|    gina bari kolata|1999|farrar straus giroux|http://images.ama...|http://images.ama...|http://images.

<bound method DataFrame.printSchema of DataFrame[isbn: string, title: string, author: string, year: int, publisher: string, image_s: string, image_m: string, image_l: string]>

In [None]:
#check how many records we have
df_books.count()

271379

###Check nulls

There are some duplicate rows- we will remove them later after more cleaning of the data

In [None]:
# check nulls in ISBN
df_books.filter("isbn is NULL").show()

+----+-----+------+----+---------+-------+-------+-------+
|isbn|title|author|year|publisher|image_s|image_m|image_l|
+----+-----+------+----+---------+-------+-------+-------+
+----+-----+------+----+---------+-------+-------+-------+



In [12]:
# Find Count of Null, None, NaN of All DataFrame Columns
from pyspark.sql.functions import col,isnan, when, count,length
df_books.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_books.columns]).show()

+----+-----+------+----+---------+-------+-------+-------+
|isbn|title|author|year|publisher|image_s|image_m|image_l|
+----+-----+------+----+---------+-------+-------+-------+
|   0|    0|     0|   0|        0|      0|      0|      0|
+----+-----+------+----+---------+-------+-------+-------+



In [13]:
#remove ISBN null records - in case there will be in the future
df_books=df_books.na.drop(subset=["isbn"])
#check how many records we have
df_books.count()

271379

### Check length

In [None]:
#check length of value in ISBN- should be 10 or 13
df_books.createOrReplaceTempView("TAB_BOOKS")
spark.sql("select distinct isbn,length(isbn) as len_col " +
    "from TAB_BOOKS where length(isbn) > 13 or length(isbn) <10").show()


+----+-------+
|isbn|len_col|
+----+-------+
+----+-------+



In [None]:
spark.sql("select distinct length(isbn) as len_col from TAB_BOOKS").show()

+-------+
|len_col|
+-------+
|     13|
|     10|
|     11|
+-------+



In [None]:
#check records with length 11
spark.sql("select isbn  from TAB_BOOKS where length(isbn)=11").show()

+-----------+
|       isbn|
+-----------+
|0486404242	|
|34422480273|
|03857222060|
+-----------+



In [None]:
df_books.filter("length(isbn)=11").show()

+-----------+--------------------+---------------+----+------------------+--------------------+--------------------+--------------------+
|       isbn|               title|         author|year|         publisher|             image_s|             image_m|             image_l|
+-----------+--------------------+---------------+----+------------------+--------------------+--------------------+--------------------+
|0486404242	|war in kind: and ...|  stephen crane|1998|dover publications|http://images.ama...|http://images.ama...|http://images.ama...|
|34422480273|diamond age. die ...|neal stephenson|2000|          goldmann|http://images.ama...|http://images.ama...|http://images.ama...|
|03857222060|balzac and the li...|      dai sijie|2002|            anchor|http://images.ama...|http://images.ama...|http://images.ama...|
+-----------+--------------------+---------------+----+------------------+--------------------+--------------------+--------------------+



In [14]:
#cut only first 10 characters 
from pyspark.sql.functions import substring,when
df_books.filter("length(isbn)=11").select('isbn',substring("isbn", 0, 10)).show()
df_books=df_books.withColumn("len_isbn",length(df_books.isbn))
df_books=df_books.withColumn("isbn",when(df_books.len_isbn==11,substring("isbn", 0, 10)).otherwise(df_books.isbn))

+-----------+----------------------+
|       isbn|substring(isbn, 0, 10)|
+-----------+----------------------+
|0486404242	|            0486404242|
|34422480273|            3442248027|
|03857222060|            0385722206|
+-----------+----------------------+



In [None]:
df_books.select(length(col("isbn"))).distinct().show()

+------------+
|length(isbn)|
+------------+
|          13|
|          10|
+------------+



In [None]:
df_books.createOrReplaceTempView("TAB_BOOKS")
spark.sql("select distinct length(isbn) as len_col from TAB_BOOKS").show()

+-------+
|len_col|
+-------+
|     13|
|     10|
+-------+



In [None]:
#Check isbn length 13 - the first 3 characters should be 978 or 979
df_books.filter(length('isbn')==13).show(truncate=False)

+-------------+---------------------------------------+---------------+----+---------+------------------------------------------------------------+------------------------------------------------------------+------------------------------------------------------------+--------+
|isbn         |title                                  |author         |year|publisher|image_s                                                     |image_m                                                     |image_l                                                     |len_isbn|
+-------------+---------------------------------------+---------------+----+---------+------------------------------------------------------------+------------------------------------------------------------+------------------------------------------------------------+--------+
|3518365479<90|suhrkamp taschenbã?â¼cher, nr.47, frost|thomas bernhard|1972|suhrkamp |http://images.amazon.com/images/P/3518365479.01.THUMBZZZ.jpg|http://images.am

In [None]:
df_books.filter("length(isbn)=13").select('isbn',substring("isbn", 0, 3)).show()

+-------------+---------------------+
|         isbn|substring(isbn, 0, 3)|
+-------------+---------------------+
|3518365479<90|                  351|
+-------------+---------------------+



In [None]:
df_books.filter("length(isbn)=13 and substring('isbn', 0, 3)!='978' and substring('isbn', 0, 3)!='979'").show()

+-------------+--------------------+---------------+----+---------+--------------------+--------------------+--------------------+--------+
|         isbn|               title|         author|year|publisher|             image_s|             image_m|             image_l|len_isbn|
+-------------+--------------------+---------------+----+---------+--------------------+--------------------+--------------------+--------+
|3518365479<90|suhrkamp taschenb...|thomas bernhard|1972| suhrkamp|http://images.ama...|http://images.ama...|http://images.ama...|      13|
+-------------+--------------------+---------------+----+---------+--------------------+--------------------+--------------------+--------+



In [15]:
#cut only first 10 characters if first 3 characters are different from 978 or 979
df_books=df_books.withColumn("isbn",when((substring('isbn', 0, 3)!='978') & (substring('isbn', 0, 3)!='979') & (length("isbn")==13),substring("isbn", 0, 10)) \
                             .otherwise(df_books.isbn))

In [16]:
df_books=df_books.drop('len_isbn')
df_books.printSchema()

root
 |-- isbn: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- publisher: string (nullable = true)
 |-- image_s: string (nullable = true)
 |-- image_m: string (nullable = true)
 |-- image_l: string (nullable = true)



In [None]:
df_books.select(length(col("isbn"))).distinct().show()

+------------+
|length(isbn)|
+------------+
|          10|
+------------+



###Check duplicates in isbn

In [None]:
#check if there are duplicate rows
df_books.distinct().count()

271064

In [None]:
df_books.createOrReplaceTempView("TAB_BOOKS")
spark.sql("select count(isbn) as count_col from TAB_BOOKS").show()

+---------+
|count_col|
+---------+
|   271379|
+---------+



In [None]:
spark.sql("select count(distinct isbn) as count_col from TAB_BOOKS").show()

+---------+
|count_col|
+---------+
|   271063|
+---------+



In [None]:
df_books.groupBy("isbn").count().filter("count > 1").show()

+----------+-----+
|      isbn|count|
+----------+-----+
|067174139X|    2|
|037376099X|    2|
|034540288X|    2|
|051512558X|    2|
|044021145X|    2|
|034543773X|    2|
|078688505X|    2|
|006091646X|    2|
|059043053X|    2|
|043935806X|    2|
|086611873X|    2|
|067944243X|    2|
|074900505X|    2|
|225303956X|    2|
|031205436X|    2|
|080410297X|    2|
|089107290X|    2|
|051513628X|    2|
|067165344X|    2|
|193072229X|    2|
+----------+-----+
only showing top 20 rows



In [None]:
spark.sql("select isbn from TAB_BOOKS group by isbn having count(isbn)>1 order by isbn").show()

+----------+
|      isbn|
+----------+
|000225669X|
|000648302X|
|000649840X|
|000651202X|
|002089130X|
|002411510X|
|002542730X|
|003049186X|
|006000438X|
|006016848X|
|006017143X|
|006039384X|
|006051406X|
|006059537X|
|006064589X|
|006091291X|
|006091646X|
|006091985X|
|006092909X|
|006097401X|
+----------+
only showing top 20 rows



There are duplicate ISBN- lets remove them

In [17]:
df_books = df_books.distinct()
print("Distinct count: "+str(df_books.count()))
df_books.show()

Distinct count: 271064
+----------+--------------------+--------------------+----+--------------------+--------------------+--------------------+--------------------+
|      isbn|               title|              author|year|           publisher|             image_s|             image_m|             image_l|
+----------+--------------------+--------------------+----+--------------------+--------------------+--------------------+--------------------+
|039456894X| impossible vacation|       spalding gray|1992|    random house inc|http://images.ama...|http://images.ama...|http://images.ama...|
|0445402040|          the hunted|      elmore leonard|1986|   warner books (mm)|http://images.ama...|http://images.ama...|http://images.ama...|
|0312145543|brewing up a stor...|         emma lathen|1996|       st martins pr|http://images.ama...|http://images.ama...|http://images.ama...|
|0816152446|the women in his ...|barbara taylor br...|1991|        thorndike pr|http://images.ama...|http://image

In [None]:
df_books.createOrReplaceTempView("TAB_BOOKS")
spark.sql("select isbn from TAB_BOOKS group by isbn having count(isbn)>1 order by isbn").show()

+----------+
|      isbn|
+----------+
|051513628X|
+----------+



There is still one duplicate isbn- lets check it

In [None]:
df_books.groupBy("isbn").count().filter("count >1").show()

+----------+-----+
|      isbn|count|
+----------+-----+
|051513628X|    2|
+----------+-----+



In [None]:
spark.sql("select * from TAB_BOOKS where isbn in (select isbn from TAB_BOOKS group by isbn having count(isbn)>1) order by isbn").show()

+----------+--------------------+------------+----+----------+--------------------+--------------------+--------------------+
|      isbn|               title|      author|year| publisher|             image_s|             image_m|             image_l|
+----------+--------------------+------------+----+----------+--------------------+--------------------+--------------------+
|051513628X|        key of light|nora roberts|2003|jove books|http://images.ama...|http://images.ama...|http://images.ama...|
|051513628X|key of light (key...|nora roberts|2003|jove books|http://images.ama...|http://images.ama...|http://images.ama...|
+----------+--------------------+------------+----+----------+--------------------+--------------------+--------------------+



In [18]:
#delete duplicate records in isbn
df_books = df_books.dropDuplicates(["isbn"])
df_books.groupBy("isbn").count().filter("count >1").show()

+----+-----+
|isbn|count|
+----+-----+
+----+-----+



In [None]:
df_books.count()

271063

###check isbn with characters different then :0-9 ,X

In [None]:
df_books.orderBy(desc("isbn")).show()

+----------+--------------------+--------------------+----+--------------------+--------------------+--------------------+--------------------+
|      isbn|               title|              author|year|           publisher|             image_s|             image_m|             image_l|
+----------+--------------------+--------------------+----+--------------------+--------------------+--------------------+--------------------+
|B0002K6K8O|the underground city|         jules verne|   0|       digireads.com|http://images.ama...|http://images.ama...|http://images.ama...|
|B0002JV9PY|the blockade runners|         jules verne|   0|       digireads.com|http://images.ama...|http://images.ama...|http://images.ama...|
|B00029DGGO|good wife strikes...|    elizabeth buchan|   0|        viking adult|http://images.ama...|http://images.ama...|http://images.ama...|
|B000234NC6|it must've been s...| jeffrey steingarten|2002|               knopf|http://images.ama...|http://images.ama...|http://images.

In [None]:
# check records with invalid isbn
df_books.filter(df_books.isbn.rlike("[^0-9X]")).show(truncate=False)
#count how many records we have with invalid ISBN
df_books.filter(df_books.isbn.rlike("[^0-9X]")).count()

+----------+-----------------------------------------------------------------------+-------------------+----+------------------------------+------------------------------------------------------------+------------------------------------------------------------+------------------------------------------------------------+
|isbn      |title                                                                  |author             |year|publisher                     |image_s                                                     |image_m                                                     |image_l                                                     |
+----------+-----------------------------------------------------------------------+-------------------+----+------------------------------+------------------------------------------------------------+------------------------------------------------------------+------------------------------------------------------------+
|B000051ZUO|triangle        

114

In [None]:
df_books.count()

271063

In [19]:
# drop those columns
df_books1=df_books.where(df_books.isbn.rlike("[^0-9X]"))
df_books1.where(df_books.isbn.rlike("[^0-9X]")).show()
df_books1.count()
df_books=df_books.subtract(df_books1)
df_books.count()

+----------+--------------------+-------------------+----+--------------------+--------------------+--------------------+--------------------+
|      isbn|               title|             author|year|           publisher|             image_s|             image_m|             image_l|
+----------+--------------------+-------------------+----+--------------------+--------------------+--------------------+--------------------+
|B000051ZUO|            triangle|     sondra marshak|   0|        pocket books|http://images.ama...|http://images.ama...|http://images.ama...|
|B00008WFXL|   the da vinci code|          dan brown|   0|           doubleday|http://images.ama...|http://images.ama...|http://images.ama...|
|B00006H3BO|                1984|      george orwell|   0|   rosettabooks, llc|http://images.ama...|http://images.ama...|http://images.ama...|
|B00007FYKW|            dead air|         iain banks|   0|little, brown and...|http://images.ama...|http://images.ama...|http://images.ama...|

270949

In [20]:
df_books.orderBy(desc("isbn")).show(truncate=False)

+----------+-------------------------------------------------------------------------------------------------------------------+-------------------------+----+-----------------------------+------------------------------------------------------------+------------------------------------------------------------+------------------------------------------------------------+
|isbn      |title                                                                                                              |author                   |year|publisher                    |image_s                                                     |image_m                                                     |image_l                                                     |
+----------+-------------------------------------------------------------------------------------------------------------------+-------------------------+----+-----------------------------+------------------------------------------------------------+----

###Check whether isbn is valid

In [21]:
import sys
!pip install isbntools
!pip install isbnlib
from isbntools.app import *
from isbnlib import *
from pyspark.sql.types import StringType

from pyspark.sql.functions import udf
def validISBN(isbn):
    return is_isbn10(isbn)
x='0261665782'
print(validISBN(x))
validISBNUdf = udf(lambda z:validISBN(z),StringType())   
df_books=df_books.withColumn("isbn_valid",validISBNUdf(col("isbn")))
df_books.filter("isbn_valid=='false'").show()

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting isbntools
  Downloading isbntools-4.3.28-py2.py3-none-any.whl (39 kB)
Collecting isbnlib<3.11.0,>=3.10.9
  Downloading isbnlib-3.10.10-py2.py3-none-any.whl (66 kB)
[K     |████████████████████████████████| 66 kB 2.6 MB/s 
[?25hInstalling collected packages: isbnlib, isbntools
Successfully installed isbnlib-3.10.10 isbntools-4.3.28
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
True
+----+-----+------+----+---------+-------+-------+-------+----------+
|isbn|title|author|year|publisher|image_s|image_m|image_l|isbn_valid|
+----+-----+------+----+---------+-------+-------+-------+----------+
+----+-----+------+----+---------+-------+-------+-------+----------+



In [None]:
df_books.filter("isbn_valid=='true'").show()

+----------+--------------------+------------------+----+--------------------+--------------------+--------------------+--------------------+----------+
|      isbn|               title|            author|year|           publisher|             image_s|             image_m|             image_l|isbn_valid|
+----------+--------------------+------------------+----+--------------------+--------------------+--------------------+--------------------+----------+
|0553219693|the pearls of sha...|   fayrene preston|1989|           loveswept|http://images.ama...|http://images.ama...|http://images.ama...|      true|
|0722655274|the iguanodon mys...|   william edmonds|1979|viking children's...|http://images.ama...|http://images.ama...|http://images.ama...|      true|
|0764112147|dictionary of mar...| betsy-ann toffler|2000|barrons education...|http://images.ama...|http://images.ama...|http://images.ama...|      true|
|0887307930|the leadership en...|     noel m. tichy|1997|harpercollins pub...|http

It seems that all isbns are valid!

check that after cleaning we don't have null isbns

In [137]:
df_books.filter(df_books.isbn.isNull()|isnan("isbn")).show(10)

+----+-----+------+----+---------+-------+-------+-------+--------+
|isbn|title|author|year|publisher|image_s|image_m|image_l|fix_year|
+----+-----+------+----+---------+-------+-------+-------+--------+
+----+-----+------+----+---------+-------+-------+-------+--------+



In [138]:
df_books.filter(df_books.isbn=="").show(10)

+----+-----+------+----+---------+-------+-------+-------+--------+
|isbn|title|author|year|publisher|image_s|image_m|image_l|fix_year|
+----+-----+------+----+---------+-------+-------+-------+--------+
+----+-----+------+----+---------+-------+-------+-------+--------+



## Check the field Title

Check if there are duplication in Book-Title.

In [None]:
df_books.select(count(col("title"))).show(10)

+------------+
|count(title)|
+------------+
|      270949|
+------------+



In [22]:
from pyspark.sql.functions import countDistinct
df_books.select(countDistinct(col("title"))).show()

+---------------------+
|count(DISTINCT title)|
+---------------------+
|               238937|
+---------------------+



In [23]:
df_books=df_books.withColumn('title',trim('title'))

In [None]:
df_books.createOrReplaceTempView("TAB_BOOKS")
spark.sql("select count(distinct title) from TAB_BOOKS").show()

+---------------------+
|count(DISTINCT title)|
+---------------------+
|               238933|
+---------------------+



In [None]:
spark.sql("select title from TAB_BOOKS group by title having count(*)>1").show(10)

+--------------------+
|               title|
+--------------------+
|               poppy|
|zenzele: a letter...|
|              vortex|
|play ball (scrapp...|
|          commanders|
|playing it by hea...|
|sarum: the novel ...|
|        regency buck|
| sweeter than dreams|
|have spacesuit wi...|
+--------------------+
only showing top 10 rows



In [None]:
spark.sql("select * from TAB_BOOKS where (title,year) in (select title,year from TAB_BOOKS group by title,year having count(*)>1) order by title").show(10,truncate=False)

+----------+-------------------------------------------------+-----------------------+----+-------------------------------------------+------------------------------------------------------------+------------------------------------------------------------+------------------------------------------------------------+----------+
|isbn      |title                                            |author                 |year|publisher                                  |image_s                                                     |image_m                                                     |image_l                                                     |isbn_valid|
+----------+-------------------------------------------------+-----------------------+----+-------------------------------------------+------------------------------------------------------------+------------------------------------------------------------+------------------------------------------------------------+----------+
|156619625

There are duplicate titles but differen isbn- seems ok

##Check the fields related to image

Check if there are duplicates in pictures

In [None]:
#check if there are same pictures
df_books.createOrReplaceTempView("TAB_BOOKS")
spark.sql("select count(image_s) from TAB_BOOKS").show()

+--------------+
|count(image_s)|
+--------------+
|        270949|
+--------------+



In [24]:
#first remove spaces if there are any or special chacateher from image_s
df_books=df_books.withColumn('image_s', regexp_replace(df_books['image_s']," ",""))

In [None]:
df_books.select(countDistinct(col("isbn"))).show()

+--------------------+
|count(DISTINCT isbn)|
+--------------------+
|              270949|
+--------------------+



In [None]:
df_books.createOrReplaceTempView("TAB_BOOKS")
spark.sql("select * from TAB_BOOKS where image_s in (select image_s from TAB_BOOKS group by image_s having count(*)>1) order by isbn" ).show(truncate=False)

+----+-----+------+----+---------+-------+-------+-------+----------+
|isbn|title|author|year|publisher|image_s|image_m|image_l|isbn_valid|
+----+-----+------+----+---------+-------+-------+-------+----------+
+----+-----+------+----+---------+-------+-------+-------+----------+



Check image_m,image_l

In [25]:
#first remove spaces if there are any or special chacateher from image_m and image_l
df_books=df_books.withColumn('image_m', regexp_replace(df_books['image_m']," ",""))
df_books=df_books.withColumn('image_l', regexp_replace(df_books['image_l']," ",""))

In [None]:
df_books.createOrReplaceTempView("TAB_BOOKS")
spark.sql("select * from TAB_BOOKS where image_m in (select image_m from TAB_BOOKS group by image_m having count(*)>1) order by isbn" ).show(truncate=False)
spark.sql("select * from TAB_BOOKS where image_l in (select image_l from TAB_BOOKS group by image_l having count(*)>1) order by isbn" ).show(truncate=False)

+----+-----+------+----+---------+-------+-------+-------+----------+
|isbn|title|author|year|publisher|image_s|image_m|image_l|isbn_valid|
+----+-----+------+----+---------+-------+-------+-------+----------+
+----+-----+------+----+---------+-------+-------+-------+----------+

+----+-----+------+----+---------+-------+-------+-------+----------+
|isbn|title|author|year|publisher|image_s|image_m|image_l|isbn_valid|
+----+-----+------+----+---------+-------+-------+-------+----------+
+----+-----+------+----+---------+-------+-------+-------+----------+



It seems there aren't duplications in images

##Check the field Author

Check if Author has some special characters or numbers

In [26]:
df_books=df_books.withColumn('author',trim('author'))

In [None]:
df_books.filter(df_books.author.rlike("[^a-z.-:' ]")).show(truncate=False)

+----------+----------------------------------------------------------------------------------------------------------------+----------------------------+----+----------------------------+------------------------------------------------------------+------------------------------------------------------------+------------------------------------------------------------+----------+
|isbn      |title                                                                                                           |author                      |year|publisher                   |image_s                                                     |image_m                                                     |image_l                                                     |isbn_valid|
+----------+----------------------------------------------------------------------------------------------------------------+----------------------------+----+----------------------------+----------------------------------------------

In [27]:
#check if author contains integers
df_books=df_books.withColumn('check_author',f.col("author").cast("int").isNotNull())

In [None]:
df_books.show(10)

+----------+--------------------+------------------+----+--------------------+--------------------+--------------------+--------------------+----------+------------+
|      isbn|               title|            author|year|           publisher|             image_s|             image_m|             image_l|isbn_valid|check_author|
+----------+--------------------+------------------+----+--------------------+--------------------+--------------------+--------------------+----------+------------+
|0553219693|the pearls of sha...|   fayrene preston|1989|           loveswept|http://images.ama...|http://images.ama...|http://images.ama...|      true|       false|
|0722655274|the iguanodon mys...|   william edmonds|1979|viking children's...|http://images.ama...|http://images.ama...|http://images.ama...|      true|       false|
|0764112147|dictionary of mar...| betsy-ann toffler|2000|barrons education...|http://images.ama...|http://images.ama...|http://images.ama...|      true|       false|
|088

In [None]:
df_books.filter('check_author==true').show()

+----+-----+------+----+---------+-------+-------+-------+----------+------------+
|isbn|title|author|year|publisher|image_s|image_m|image_l|isbn_valid|check_author|
+----+-----+------+----+---------+-------+-------+-------+----------+------------+
+----+-----+------+----+---------+-------+-------+-------+----------+------------+



It seems that author doesn't have any integers

In [28]:
df_books=df_books.drop("check_author")

## Check the field year

In [None]:
# check nulls in Year
df_books.filter("year is NULL").show()

+----+-----+------+----+---------+-------+-------+-------+----------+
|isbn|title|author|year|publisher|image_s|image_m|image_l|isbn_valid|
+----+-----+------+----+---------+-------+-------+-------+----------+
+----+-----+------+----+---------+-------+-------+-------+----------+



In [29]:
#replace null values if exists
df_books=df_books.withColumn("year",when(isnan("year") | col("year").isNull(), 0).otherwise(df_books.year))

In [None]:
df_books.select("year").distinct().orderBy(col("year")).show()

+----+
|year|
+----+
|   0|
|1376|
|1378|
|1806|
|1897|
|1900|
|1901|
|1902|
|1904|
|1906|
|1908|
|1909|
|1910|
|1911|
|1914|
|1917|
|1919|
|1920|
|1921|
|1922|
+----+
only showing top 20 rows



In [None]:
df_books.select("year").distinct().orderBy(col("year").desc()).show()

+----+
|year|
+----+
|2050|
|2038|
|2037|
|2030|
|2026|
|2024|
|2021|
|2020|
|2012|
|2011|
|2010|
|2008|
|2006|
|2005|
|2004|
|2003|
|2002|
|2001|
|2000|
|1999|
+----+
only showing top 20 rows



check how many records we have with future year (year>2022)

In [None]:
df_books.filter(df_books.year>2022).count()

13

check how many records we have with year==0

In [None]:
df_books.filter(df_books.year=='0').show()

+----------+--------------------+-------------------+----+--------------------+--------------------+--------------------+--------------------+----------+
|      isbn|               title|             author|year|           publisher|             image_s|             image_m|             image_l|isbn_valid|
+----------+--------------------+-------------------+----+--------------------+--------------------+--------------------+--------------------+----------+
|0261665782|      tarot handbook|     angeles arrien|   0|fairmount books l...|http://images.ama...|http://images.ama...|http://images.ama...|      true|
|2020257815|le troisieme mens...|            kristof|   0|   editions du seuil|http://images.ama...|http://images.ama...|http://images.ama...|      true|
|208070446X|fugitive albertin...|      marcel proust|   0|          flammarion|http://images.ama...|http://images.ama...|http://images.ama...|      true|
|0679853014|bears are curious...|       joyce milton|   0|random house book.

In [None]:
#check how many records we have with year=0
df_books.filter(df_books.year==0).count()

4557

In [30]:
#lets try to complete missing years according to isbn
#import sys
#!pip install isbntools
#!pip install isbnlib
#from isbntools.app import *
#from isbnlib import *

isbn = "1568490674"

meta_dict = meta(isbn, service='goob')

print(meta_dict['Authors'])
print(meta_dict['Title'])
print(meta_dict['Year'])
meta_dict
print('check method:is_isbn10(isbn10like)',is_isbn10(isbn))
print('check classify(isbn)', classify(isbn))

isbn='B12345678'
print('check is_isbn for isbn %s:' % isbn,is_isbn10(isbn))
#print('check meta for isbn %s:' % isbn,meta(isbn))


['Richard Wright']
Black Boy - (American Hunger) : A Record Of Childhood And Youth
1991
check method:is_isbn10(isbn10like) True
check classify(isbn) {'owi': '4241235160'}
check is_isbn for isbn B12345678: False


In [31]:
import isbnlib
def get_title(isbn):
    try:
        return isbnlib.meta(isbn)['Title']
        return 'Exist'
    except isbnlib.NotValidISBNError:
        return None
isbn='B123456789'
isbn_exist=get_title(isbn)
print('isbn is :',isbn,'the validity check:',isbn_exist)
isbn='1568490674'
isbn_exist=get_title(isbn)
print('isbn is :',isbn,'the validity check:',isbn_exist)
is_isbn10(isbn)

isbn is : B123456789 the validity check: None
isbn is : 1568490674 the validity check: Black Boy - (American Hunger) : A Record Of Childhood And Youth


True

In [32]:
from pyspark.sql.functions import map_values
#def findYear(isbn):
#    meta_dict = meta(isbn, service='goob')
#    return meta_dict.get('Year')

def findYear(isbn):
  try:
    meta_dict = meta(isbn, service='goob')
    return meta_dict.get('Year')
  except isbnlib.NotValidISBNError:
        return 'Unknown'  
x='2760402428'
print(findYear(x))
y=findYear(x)
print('isbn is :',x,'the year check:',y)
isbn='B123456789'
isbn_year=findYear(isbn)
print('isbn is :',isbn,'the year check:',isbn_year)


1985
isbn is : 2760402428 the year check: 1985
isbn is : B123456789 the year check: Unknown


In [33]:
#create UDF for year completion
findYearUdf = udf(lambda z:findYear(z),StringType())   
# lets complete the missing years
df_books=df_books.withColumn("fix_year",when((df_books.year==0) | (df_books.year>2022),findYearUdf(col("isbn"))).otherwise(df_books.year))

In [None]:
df_books.printSchema()

root
 |-- isbn: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- publisher: string (nullable = true)
 |-- image_s: string (nullable = true)
 |-- image_m: string (nullable = true)
 |-- image_l: string (nullable = true)
 |-- isbn_valid: string (nullable = true)
 |-- fix_year: string (nullable = true)



In [34]:
df_books.filter("year==0 or year>2022").select("isbn","title","year","fix_year").show()

+----------+--------------------+----+--------+
|      isbn|               title|year|fix_year|
+----------+--------------------+----+--------+
|0261665782|      tarot handbook|   0|    null|
|2020257815|le troisieme mens...|   0|    1991|
|208070446X|fugitive albertin...|   0|    1986|
|0679853014|bears are curious...|   0|    1998|
|0373630239|     april thirtieth|   0|    1982|
|0880298472|thurber country a...|   0|    1953|
|0434007838|        orchid thief|   0|    1999|
|0441322379|heathcliff catch ...|   0|    1986|
|0880299002|last of the mohicans|   0|    1992|
|8835945453|storia del cinema...|   0|    1998|
|2020057697|   ti-jean l'horizon|   0|    1979|
|0099479419|     ladder of years|   0|    1996|
|0575055294|          red azalea|   0|    1993|
|0002243962|girlfriend in a coma|   0|    1998|
|3548205003| mein schwarzes sofa|   0|    1985|
|0340581247|        better angel|   0|    1992|
|3442035171|morgen ist ein ne...|   0|    1978|
|2760402428|         lenfirouape|   0|  

In [None]:
##Check with Amir why can't select null, seems as if there is some mismatch of types
#Replace "unknown" for null on only fix_year column 
#df_books.filter("fix_year is null").show()
#df_books.filter((isnan(df_books.fix_year)) | (col(df_books.fix_year).isNull()))
#df_books=df_books.withColumn('fix_year',when(isnan(df_books.fix_year) | col(df_books.fix_year).isNull(),'unknown').otherwise(df_books.fix_year))

In [35]:
#update year=0 to "-1"
df_books=df_books.withColumn("year",when(df_books.year=='0',"-1").otherwise(df_books.year))

In [None]:
df_books.filter("year==(-1)").select("isbn","title","year","fix_year").show()

In [None]:
df_books.show()

+----------+--------------------+------------------+----+--------------------+--------------------+--------------------+--------------------+----------+--------+
|      isbn|               title|            author|year|           publisher|             image_s|             image_m|             image_l|isbn_valid|fix_year|
+----------+--------------------+------------------+----+--------------------+--------------------+--------------------+--------------------+----------+--------+
|0553219693|the pearls of sha...|   fayrene preston|1989|           loveswept|http://images.ama...|http://images.ama...|http://images.ama...|      true|    1989|
|0722655274|the iguanodon mys...|   william edmonds|1979|viking children's...|http://images.ama...|http://images.ama...|http://images.ama...|      true|    1979|
|0764112147|dictionary of mar...| betsy-ann toffler|2000|barrons education...|http://images.ama...|http://images.ama...|http://images.ama...|      true|    2000|
|0887307930|the leadership e

In [None]:
# check how many records we have with year greater than 2022
df_books.filter(df_books.year>2022).count()

13

In [36]:
#update year>2022 to "-1"
df_books=df_books.withColumn("year",when(df_books.year>2022,"-1").otherwise(df_books.year))

In [None]:
df_books.filter(df_books.title.startswith('playmaker')).show(truncate=False)

+----------+---------+---------------+----+--------------------+------------------------------------------------------------+------------------------------------------------------------+------------------------------------------------------------+----------+--------+
|isbn      |title    |author         |year|publisher           |image_s                                                     |image_m                                                     |image_l                                                     |isbn_valid|fix_year|
+----------+---------+---------------+----+--------------------+------------------------------------------------------------+------------------------------------------------------------+------------------------------------------------------------+----------+--------+
|0340422637|playmaker|thomas keneally|-1  |trafalgar square    |http://images.amazon.com/images/P/0340422637.01.THUMBZZZ.jpg|http://images.amazon.com/images/P/0340422637.01.MZZZZZZZ.jpg|http://ima

In [None]:
df_books.select("year").distinct().orderBy(col("year")).show()

+----+
|year|
+----+
|  -1|
|1376|
|1378|
|1806|
|1897|
|1900|
|1901|
|1902|
|1904|
|1906|
|1908|
|1909|
|1910|
|1911|
|1914|
|1917|
|1919|
|1920|
|1921|
|1922|
+----+
only showing top 20 rows



In [162]:
df_books.select("year").distinct().orderBy(col("year").desc()).show()

+----+
|year|
+----+
|2021|
|2020|
|2012|
|2011|
|2010|
|2008|
|2006|
|2005|
|2004|
|2003|
|2002|
|2001|
|2000|
|1999|
|1998|
|1997|
|1996|
|1995|
|1994|
|1993|
+----+
only showing top 20 rows



In [163]:
df_books.select("isbn","year","fix_year").show()

+----------+----+--------+
|      isbn|year|fix_year|
+----------+----+--------+
|0553219693|1989|    1989|
|0722655274|1979|    1979|
|0764112147|2000|    2000|
|0887307930|1997|    1997|
|1881273423|2002|    2002|
|8485204050|1975|    1975|
|0060560754|2004|    2004|
|014042136X|1971|    1971|
|0517123657|1995|    1995|
|0671876961|1995|    1995|
|0741416158|2003|    2003|
|0875182534|1983|    1983|
|0882707949|2000|    2000|
|0060975105|1992|    1992|
|0261665782|  -1|    null|
|0307107760|2000|    2000|
|0345420799|1998|    1998|
|0965353303|1997|    1997|
|0965521818|1998|    1998|
|1890208728|1997|    1997|
+----------+----+--------+
only showing top 20 rows



In [180]:
df_books.filter("year==(-1)").count()

4570

In [178]:
df_books.filter(df_books.year.isNull()).count()

0

In [177]:
#check how many records were left with missing year
#df_books.filter(df_books.fix_year.rlike("[0-9]")).count()
#df_books.filter("year==(-1)").select("fix_year").distinct().show(10)

It seems that all values are legal in the field year

##Check the field Publisher

In [None]:
#Check nulls
df_books.filter("publisher is NULL or publisher==''").show()

+----+-----+------+----+---------+-------+-------+-------+----------+--------+
|isbn|title|author|year|publisher|image_s|image_m|image_l|isbn_valid|fix_year|
+----+-----+------+----+---------+-------+-------+-------+----------+--------+
+----+-----+------+----+---------+-------+-------+-------+----------+--------+



In [37]:
df_books=df_books.withColumn('publisher',trim('publisher'))

In [None]:
#check values
df_books.select('publisher').orderBy(asc("publisher")).show()

+--------------------+
|           publisher|
+--------------------+
|           "corvina"|
|             "nauka"|
|  "otokar kersovani"|
|             'k' pub|
|(3 queen sq., wc1...|
|(49 poland st., w...|
|               10-18|
|               10-18|
|               10-18|
|               10-18|
|               10-18|
|               10-18|
|               10-18|
|               10-18|
|               10-18|
|               10-18|
|               10-18|
|               10-18|
|               10-18|
|               10-18|
+--------------------+
only showing top 20 rows



In [None]:
df_books.filter(df_books.publisher.rlike("^[0-9a-z '()-]")).show(truncate=False)
df_books.filter(df_books.publisher.rlike("[0-9]")).show(truncate=False)

+----------+-------------------------------------------------------------------------+------------------+----+-------------------------------+------------------------------------------------------------+------------------------------------------------------------+------------------------------------------------------------+----------+--------+
|isbn      |title                                                                    |author            |year|publisher                      |image_s                                                     |image_m                                                     |image_l                                                     |isbn_valid|fix_year|
+----------+-------------------------------------------------------------------------+------------------+----+-------------------------------+------------------------------------------------------------+------------------------------------------------------------+--------------------------------------------

In [None]:
# check whether there is publisher only with integers
df_books.filter(col("publisher").cast("int").isNotNull()).show()
df_books.filter(col("publisher").rlike("^[0-9]*$")).show()

+----------+--------------------+--------------------+----+---------+--------------------+--------------------+--------------------+----------+--------+
|      isbn|               title|              author|year|publisher|             image_s|             image_m|             image_l|isbn_valid|fix_year|
+----------+--------------------+--------------------+----+---------+--------------------+--------------------+--------------------+----------+--------+
|0967389305|restoring intimac...|         drew pinsky|1999|        3|http://images.ama...|http://images.ama...|http://images.ama...|      true|    1999|
|2264034173| un troublant retour|  patricia wentworth|2002|    37547|http://images.ama...|http://images.ama...|http://images.ama...|      true|    2002|
|2264033932|         david bowie|jã?â©rã?â´me soligny|2002|    37547|http://images.ama...|http://images.ama...|http://images.ama...|      true|    2002|
+----------+--------------------+--------------------+----+---------+-------------

In [None]:
df_books.filter(col("publisher").rlike("^[a-z]&-.()' ")).select('isbn','publisher').show(truncate=False)

+----+---------+
|isbn|publisher|
+----+---------+
+----+---------+



In [None]:
df_books.createOrReplaceTempView("TAB_BOOKS")
spark.sql("select isbn,publisher from TAB_BOOKS order by publisher desc").show()

+----------+--------------------+
|      isbn|           publisher|
+----------+--------------------+
|3215020408|      ã?â¶bv&amp;hpt|
|2264034602|   ã?â?ditions 10/18|
|3215043653|ã?sterreichischer...|
|3704601594|ã?sterreichische ...|
|392721731X|             ã?lbaum|
|284186023X|  ã?ditions michalon|
|2880250161|ã?ditions la matz...|
|2890452107|ã?ditions hurtubi...|
|2020045672|  ã?ditions du seuil|
|2020056879|  ã?ditions du seuil|
|2020002167|  ã?ditions du seuil|
|2020044595|  ã?ditions du seuil|
|2020046059|  ã?ditions du seuil|
|2020046784|  ã?ditions du seuil|
|2020049570|  ã?ditions du seuil|
|2020050609|  ã?ditions du seuil|
|2707301108| ã?ditions de minuit|
|2707302961| ã?ditions de minuit|
|2826500112| ã?ditions 24 heures|
|1891024663|    zzdap publishing|
+----------+--------------------+
only showing top 20 rows



In [38]:
df_books.printSchema()

root
 |-- isbn: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author: string (nullable = true)
 |-- year: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- image_s: string (nullable = true)
 |-- image_m: string (nullable = true)
 |-- image_l: string (nullable = true)
 |-- isbn_valid: string (nullable = true)
 |-- fix_year: string (nullable = true)



In [39]:
df_books=df_books.drop('isbn_valid')

In [None]:
df_books.printSchema()

root
 |-- isbn: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author: string (nullable = true)
 |-- year: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- image_s: string (nullable = true)
 |-- image_m: string (nullable = true)
 |-- image_l: string (nullable = true)
 |-- fix_year: string (nullable = true)



In [None]:
df_books.count()

270949

In [139]:
df_books.filter(df_books.isbn.isNull()|isnan("isbn")).count()

0

#Read and analyse BX-Book-Ratings

In [40]:
df_ratings = spark.read.csv(path+'/BX-Book-Ratings.csv', inferSchema = True, header = True,sep = ';',encoding="ISO-8859-1")

In [None]:
df_ratings.printSchema()
df_ratings.show(10)

root
 |-- "User-ID;""ISBN"": string (nullable = true)
 |-- ""Book-Rating""",,: string (nullable = true)

+--------------------+------------------+
|   "User-ID;""ISBN""|""Book-Rating""",,|
+--------------------+------------------+
|"276725;""0345451...|          ""0""",,|
|"276726;""0155061...|          ""5""",,|
|"276727;""0446520...|          ""0""",,|
|"276729;""0521656...|          ""3""",,|
|"276729;""0521795...|          ""6""",,|
|"276733;""2080674...|          ""0""",,|
|"276736;""3257224...|          ""8""",,|
|"276737;""0600570...|          ""6""",,|
|"276744;""0385501...|          ""7""",,|
|"276745;""3423105...|         ""10""",,|
+--------------------+------------------+
only showing top 10 rows



##change columns names and values

In [41]:
# first lets split the columns
from pyspark.sql.functions import split
df_ratings = df_ratings.withColumn('userid', split(df_ratings['"User-ID;""ISBN""'], ';').getItem(0)) \
        .withColumn('isbn', split(df_ratings['"User-ID;""ISBN""'], ';').getItem(1)) 
df_ratings.printSchema()
df_ratings.show(10)

root
 |-- "User-ID;""ISBN"": string (nullable = true)
 |-- ""Book-Rating""",,: string (nullable = true)
 |-- userid: string (nullable = true)
 |-- isbn: string (nullable = true)

+--------------------+------------------+-------+--------------+
|   "User-ID;""ISBN""|""Book-Rating""",,| userid|          isbn|
+--------------------+------------------+-------+--------------+
|"276725;""0345451...|          ""0""",,|"276725|""034545104X""|
|"276726;""0155061...|          ""5""",,|"276726|""0155061224""|
|"276727;""0446520...|          ""0""",,|"276727|""0446520802""|
|"276729;""0521656...|          ""3""",,|"276729|""052165615X""|
|"276729;""0521795...|          ""6""",,|"276729|""0521795028""|
|"276733;""2080674...|          ""0""",,|"276733|""2080674722""|
|"276736;""3257224...|          ""8""",,|"276736|""3257224281""|
|"276737;""0600570...|          ""6""",,|"276737|""0600570967""|
|"276744;""0385501...|          ""7""",,|"276744|""038550120X""|
|"276745;""3423105...|         ""10""",,|

In [42]:
df_ratings = df_ratings.withColumn('rating', split(df_ratings['""Book-Rating""",,'], ';').getItem(0))
df_ratings.printSchema()
df_ratings.show(10)

root
 |-- "User-ID;""ISBN"": string (nullable = true)
 |-- ""Book-Rating""",,: string (nullable = true)
 |-- userid: string (nullable = true)
 |-- isbn: string (nullable = true)
 |-- rating: string (nullable = true)

+--------------------+------------------+-------+--------------+---------+
|   "User-ID;""ISBN""|""Book-Rating""",,| userid|          isbn|   rating|
+--------------------+------------------+-------+--------------+---------+
|"276725;""0345451...|          ""0""",,|"276725|""034545104X""| ""0""",,|
|"276726;""0155061...|          ""5""",,|"276726|""0155061224""| ""5""",,|
|"276727;""0446520...|          ""0""",,|"276727|""0446520802""| ""0""",,|
|"276729;""0521656...|          ""3""",,|"276729|""052165615X""| ""3""",,|
|"276729;""0521795...|          ""6""",,|"276729|""0521795028""| ""6""",,|
|"276733;""2080674...|          ""0""",,|"276733|""2080674722""| ""0""",,|
|"276736;""3257224...|          ""8""",,|"276736|""3257224281""| ""8""",,|
|"276737;""0600570...|          "

In [43]:
#drop columns
df_ratings=df_ratings.drop('"User-ID;""ISBN""','""Book-Rating""",,')
df_ratings.printSchema()

root
 |-- userid: string (nullable = true)
 |-- isbn: string (nullable = true)
 |-- rating: string (nullable = true)



In [44]:
#remove all " 
df_ratings=df_ratings.withColumn('userid', regexp_replace(df_ratings['userid'],'"',"")) \
    .withColumn('isbn', regexp_replace(df_ratings['isbn'],'"',"")) \
    .withColumn('rating', regexp_replace(df_ratings['rating'],'"',""))
df_ratings.show(10)

+------+----------+------+
|userid|      isbn|rating|
+------+----------+------+
|276725|034545104X|   0,,|
|276726|0155061224|   5,,|
|276727|0446520802|   0,,|
|276729|052165615X|   3,,|
|276729|0521795028|   6,,|
|276733|2080674722|   0,,|
|276736|3257224281|   8,,|
|276737|0600570967|   6,,|
|276744|038550120X|   7,,|
|276745| 342310538|  10,,|
+------+----------+------+
only showing top 10 rows



In [45]:
#split the column rating by ,
df_ratings = df_ratings.withColumn('rating1', split(df_ratings['rating'], ',').getItem(0)) \
  .withColumn('rating2', split(df_ratings['rating'], ',').getItem(1)) \
  .withColumn('rating3', split(df_ratings['rating'], ',').getItem(2))
df_ratings.printSchema()
df_ratings.show(10)

root
 |-- userid: string (nullable = true)
 |-- isbn: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- rating1: string (nullable = true)
 |-- rating2: string (nullable = true)
 |-- rating3: string (nullable = true)

+------+----------+------+-------+-------+-------+
|userid|      isbn|rating|rating1|rating2|rating3|
+------+----------+------+-------+-------+-------+
|276725|034545104X|   0,,|      0|       |       |
|276726|0155061224|   5,,|      5|       |       |
|276727|0446520802|   0,,|      0|       |       |
|276729|052165615X|   3,,|      3|       |       |
|276729|0521795028|   6,,|      6|       |       |
|276733|2080674722|   0,,|      0|       |       |
|276736|3257224281|   8,,|      8|       |       |
|276737|0600570967|   6,,|      6|       |       |
|276744|038550120X|   7,,|      7|       |       |
|276745| 342310538|  10,,|     10|       |       |
+------+----------+------+-------+-------+-------+
only showing top 10 rows



In [None]:
#check if there are any values in the fields rating2 and rating3
df_ratings.createOrReplaceTempView("TAB_RATINGS")
spark.sql("select count(rating2) from TAB_RATINGS").show()
spark.sql("select distinct rating2,rating3 from TAB_RATINGS").show()


+--------------+
|count(rating2)|
+--------------+
|       1048564|
+--------------+

+-------+-------+
|rating2|rating3|
+-------+-------+
|       |       |
|   null|   null|
|       |   null|
+-------+-------+



In [46]:
# drop columns rating2 and rating3 since they contain only nulls
df_ratings=df_ratings.drop('rating','rating2','rating3')
#rename column rating1
df_ratings=df_ratings.withColumnRenamed('rating1', 'rating')
df_ratings.printSchema()
df_ratings.show(10)

root
 |-- userid: string (nullable = true)
 |-- isbn: string (nullable = true)
 |-- rating: string (nullable = true)

+------+----------+------+
|userid|      isbn|rating|
+------+----------+------+
|276725|034545104X|     0|
|276726|0155061224|     5|
|276727|0446520802|     0|
|276729|052165615X|     3|
|276729|0521795028|     6|
|276733|2080674722|     0|
|276736|3257224281|     8|
|276737|0600570967|     6|
|276744|038550120X|     7|
|276745| 342310538|    10|
+------+----------+------+
only showing top 10 rows



In [47]:
# Replacing problemitic characters
for i in df_ratings.columns:
  df_ratings=df_ratings.withColumn(i,regexp_replace(i,'"\"',''))
  df_ratings=df_ratings.withColumn(i,regexp_replace(i,'"?"',''))
  df_ratings=df_ratings.withColumn(i,regexp_replace(i,'"',''))
  df_ratings=df_ratings.withColumn(i,regexp_replace(i,'_',''))

df_ratings.show()

+------+----------+------+
|userid|      isbn|rating|
+------+----------+------+
|276725|034545104X|     0|
|276726|0155061224|     5|
|276727|0446520802|     0|
|276729|052165615X|     3|
|276729|0521795028|     6|
|276733|2080674722|     0|
|276736|3257224281|     8|
|276737|0600570967|     6|
|276744|038550120X|     7|
|276745| 342310538|    10|
|276746|0425115801|     0|
|276746|0449006522|     0|
|276746|0553561618|     0|
|276746|055356451X|     0|
|276746|0786013990|     0|
|276746|0786014512|     0|
|276747|0060517794|     9|
|276747|0451192001|     0|
|276747|0609801279|     0|
|276747|0671537458|     9|
+------+----------+------+
only showing top 20 rows



In [48]:
#trim
df_ratings=df_ratings.withColumn('userid', trim(df_ratings['userid'])) \
    .withColumn('isbn', trim(df_ratings['isbn'])) \
    .withColumn('rating', trim(df_ratings['rating']))
df_ratings.show(10)

+------+----------+------+
|userid|      isbn|rating|
+------+----------+------+
|276725|034545104X|     0|
|276726|0155061224|     5|
|276727|0446520802|     0|
|276729|052165615X|     3|
|276729|0521795028|     6|
|276733|2080674722|     0|
|276736|3257224281|     8|
|276737|0600570967|     6|
|276744|038550120X|     7|
|276745| 342310538|    10|
+------+----------+------+
only showing top 10 rows



In [49]:
from pyspark.sql.types import IntegerType,BooleanType,DateType
# Convert String to Integer Type
df_ratings = df_ratings.withColumn("rating",df_ratings.rating.cast(IntegerType()))
df_ratings = df_ratings.withColumn("userid",df_ratings.userid.cast(IntegerType()))

In [None]:
df_ratings.count()

1048575

##Check nulls

In [None]:
df_ratings.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_ratings.columns]).show()

+------+----+------+
|userid|isbn|rating|
+------+----+------+
|     0|   0|    10|
+------+----+------+



##check field Rating

In [None]:
df_ratings.printSchema()

root
 |-- userid: integer (nullable = true)
 |-- isbn: string (nullable = true)
 |-- rating: integer (nullable = true)



In [None]:
df_ratings.createOrReplaceTempView("TAB_RATINGS")
spark.sql("select distinct rating from TAB_RATINGS").show()

+------+
|rating|
+------+
|  null|
|     1|
|     6|
|     3|
|     5|
|     9|
|     4|
|     8|
|     7|
|    10|
|     2|
|     0|
+------+



In [None]:
df_ratings.printSchema()

root
 |-- userid: integer (nullable = true)
 |-- isbn: string (nullable = true)
 |-- rating: integer (nullable = true)



###check nulls

In [None]:
# check the null values
spark.sql("select count(*) total_rec from TAB_RATINGS").show()
spark.sql("select count(*) null_rec from TAB_RATINGS where rating is NULL").show()

+---------+
|total_rec|
+---------+
|  1048575|
+---------+

+--------+
|null_rec|
+--------+
|      10|
+--------+



In [50]:
#drop null records
df_ratings=df_ratings.dropna(how="any")
df_ratings.count()

1048565

### Check 0 values

In [None]:
df_ratings.filter('rating==0').count()

651328

It seems that most of the data contains no ratings!

## Check field ISBN

In [51]:
df_ratings=df_ratings.withColumn('isbn',trim('isbn'))

In [52]:
# lets change all letters in isbn to upper
df_ratings=df_ratings.withColumn("isbn",f.upper(col('isbn')))  

In [53]:
#remove invalid characters
# trim spaces and other special characters in isbn
from pyspark.sql.functions import regexp_replace
df_ratings=df_ratings.withColumn('isbn', regexp_replace(df_ratings['isbn']," ",""))
df_ratings=df_ratings.withColumn('isbn', regexp_replace(df_ratings['isbn'],"_",""))
df_ratings=df_ratings.withColumn('isbn', regexp_replace(df_ratings['isbn'],"#",""))
df_ratings=df_ratings.withColumn('isbn', regexp_replace(df_ratings['isbn'],'"."',""))
df_ratings=df_ratings.withColumn('isbn', regexp_replace(df_ratings['isbn'],'","',""))
df_ratings=df_ratings.withColumn('isbn', regexp_replace(df_ratings['isbn'],'"*"',""))
df_ratings=df_ratings.withColumn('isbn', regexp_replace(df_ratings['isbn'],'"+"',""))
df_ratings.printSchema()

root
 |-- userid: integer (nullable = true)
 |-- isbn: string (nullable = true)
 |-- rating: integer (nullable = true)



In [None]:
df_ratings.show()

+------+----------+------+
|userid|      isbn|rating|
+------+----------+------+
|276725|034545104X|     0|
|276726|0155061224|     5|
|276727|0446520802|     0|
|276729|052165615X|     3|
|276729|0521795028|     6|
|276733|2080674722|     0|
|276736|3257224281|     8|
|276737|0600570967|     6|
|276744|038550120X|     7|
|276745| 342310538|    10|
|276746|0425115801|     0|
|276746|0449006522|     0|
|276746|0553561618|     0|
|276746|055356451X|     0|
|276746|0786013990|     0|
|276746|0786014512|     0|
|276747|0060517794|     9|
|276747|0451192001|     0|
|276747|0609801279|     0|
|276747|0671537458|     9|
+------+----------+------+
only showing top 20 rows



In [None]:
df_ratings.sort(col("isbn").asc()).show(truncate=False)

+------+-------------+------+
|userid|isbn         |rating|
+------+-------------+------+
|194500|(THEWINDMILLP|0     |
|124304|)416195113   |8     |
|238681|)440206529   |0     |
|111422|)452273056   |8     |
|100120|)553267833   |7     |
|100120|)959326839   |0     |
|66942 |*0452281903  |0     |
|157799|*0452281903  |0     |
|190925|*0515128325  |0     |
|238557|+0451197399  |0     |
|98647 |+0451197399  |0     |
|130499|,.0330486187 |6     |
|52796 |/8741060773  |9     |
|90963 |0*449002632  |7     |
|54031 |0*553*072412 |5     |
|218258|0*708880258  |0     |
|154781|0+399139745  |9     |
|143571|0-517-18725-6|4     |
|144841|0.15.602732.1|7     |
|229329|0.330241664  |10    |
+------+-------------+------+
only showing top 20 rows



In [None]:
df_ratings.sort(col("isbn").desc()).show(truncate=False)

+------+-------------+------+
|userid|isbn         |rating|
+------+-------------+------+
|187517|Ô½CROSOFT    |7     |
|86103 |´3499128624  |8     |
|85250 |§423350229   |0     |
|191707|`3502103682  |0     |
|78773 |`3502103682  |0     |
|11676 |ZR903CX0003  |1     |
|222073|ZR903CX0003  |0     |
|224108|Z380703475   |8     |
|174791|YOUTELLEM,AND|0     |
|48100 |Y99697115    |8     |
|49180 |XXXXXXXXXXXXX|0     |
|135172|XXXXXXXXXX   |0     |
|134243|X903145730   |0     |
|28586 |X439361760   |5     |
|208392|X113780760   |7     |
|11676 |X000000000   |10    |
|19355 |X000000000   |0     |
|136363|X000000000   |0     |
|78591 |WEAREWITNESSE|0     |
|199538|VG3862004    |0     |
+------+-------------+------+
only showing top 20 rows



In [None]:
# check records with invalid isbn
df_ratings.filter(df_ratings.isbn.rlike("[^0-9X]")).show(truncate=False)

+------+-------------+------+
|userid|isbn         |rating|
+------+-------------+------+
|276762|B0000BLD7X   |0     |
|276762|N3453124715  |4     |
|276884|B158991965   |6     |
|276929|2.02.032126.2|0     |
|276929|2.264.03602.8|0     |
|276959|680ISBN359623|6     |
|277305|O6712345670  |7     |
|278054|88O6166255   |8     |
|278418|0451E64100   |0     |
|278491|01420.01740  |10    |
|278559|DITISEENSOORT|0     |
|183   |100940/86    |9     |
|183   |10622/86     |0     |
|183   |10745/85     |0     |
|183   |10756/85     |0     |
|183   |127420/98    |8     |
|183   |13440/86     |7     |
|183   |15019/87     |7     |
|183   |16560/87     |10    |
|183   |16785/87     |7     |
+------+-------------+------+
only showing top 20 rows



In [None]:
#count how many records we have with invalid ISBN
df_ratings.filter(df_ratings.isbn.rlike("[^0-9X]")).count()

2147

In [54]:
#remove characters which are different than 0-9 and X
df_ratings=df_ratings.withColumn('isbn', regexp_replace(df_ratings['isbn'],"[^0-9X]",""))
df_ratings.filter(df_ratings.isbn.rlike("[^0-9X]")).count()

0

In [None]:
df_ratings.sort(col("isbn").desc()).show(truncate=False)

+------+-------------+------+
|userid|isbn         |rating|
+------+-------------+------+
|49180 |XXXXXXXXXXXXX|0     |
|135172|XXXXXXXXXX   |0     |
|134243|X903145730   |0     |
|28586 |X439361760   |5     |
|153621|X15          |0     |
|171276|X15          |0     |
|171276|X14          |0     |
|153621|X14          |0     |
|153621|X13          |0     |
|171276|X13          |0     |
|153621|X12          |0     |
|171276|X12          |0     |
|208392|X113780760   |7     |
|153621|X1           |0     |
|171276|X1           |0     |
|136363|X000000000   |0     |
|11676 |X000000000   |10    |
|19355 |X000000000   |0     |
|95672 |9999999999999|5     |
|33429 |9999999999   |5     |
+------+-------------+------+
only showing top 20 rows



In [None]:
#check the length of isbn
df_ratings.select(length('isbn')).distinct().show()

+------------+
|length(isbn)|
+------------+
|          12|
|           1|
|          13|
|           6|
|           3|
|           5|
|           9|
|           4|
|           8|
|           7|
|          10|
|          11|
|           2|
|           0|
+------------+



In [55]:
df_ratings=df_ratings.withColumn('len_isbn',length('isbn'))
df_ratings.printSchema()

root
 |-- userid: integer (nullable = true)
 |-- isbn: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- len_isbn: integer (nullable = true)



In [None]:
df_ratings.filter('len_isbn<10').orderBy(col('len_isbn').asc()).show()

+------+----+------+--------+
|userid|isbn|rating|len_isbn|
+------+----+------+--------+
|174791|    |     0|       0|
|235070|    |     0|       0|
|174791|    |     0|       0|
|177952|    |    10|       0|
|184672|    |     0|       0|
|153628|    |     5|       0|
|187517|    |     7|       0|
|164082|    |     0|       0|
|194500|    |     0|       0|
|170139|    |     0|       0|
|172742|    |     0|       0|
|172888|    |     8|       0|
|174081|    |     5|       0|
|221552|    |     0|       0|
|174848|    |     9|       0|
|221552|    |     3|       0|
|157787|    |     0|       0|
|224108|    |     8|       0|
|228021|    |    10|       0|
|224108|    |     0|       0|
+------+----+------+--------+
only showing top 20 rows



In [56]:
#drop rows where len_isbn=0
df_ratings=df_ratings.filter('len_isbn>0')
df_ratings.count()

1048512

In [None]:
df_ratings.filter('len_isbn<10').orderBy(col('len_isbn').asc()).show()

+------+----+------+--------+
|userid|isbn|rating|len_isbn|
+------+----+------+--------+
|212167|   2|     0|       1|
|171276|  X1|     0|       2|
|153621|  X1|     0|       2|
|171276| X15|     0|       3|
|166785| 611|     0|       3|
|153621| X14|     0|       3|
|171276| X14|     0|       3|
|249018| 155|     5|       3|
|153621| X13|     0|       3|
|153621| X15|     0|       3|
|153621| X12|     0|       3|
|171276| X12|     0|       3|
|171276| X13|     0|       3|
| 86153| 001|     8|       3|
|212167|2074|     0|       4|
|222207|0001|     7|       4|
|212167|7112|     0|       4|
|222683|1227|     4|       4|
|243243|0000|     7|       4|
|226370|0001|     0|       4|
+------+----+------+--------+
only showing top 20 rows



In [None]:
#check rows with length=7
df_ratings.filter('len_isbn==7').orderBy(col('len_isbn').asc()).show()

+------+-------+------+--------+
|userid|   isbn|rating|len_isbn|
+------+-------+------+--------+
|144232|8699308|    10|       7|
|149608|6552323|     8|       7|
|156492|7289493|     0|       7|
|157431|5814227|     0|       7|
|159506|000064X|     9|       7|
|160998|0055339|     0|       7|
|163140|1409386|     0|       7|
|164533|0000257|     0|       7|
|164533|00003X9|     7|       7|
|164533|000054X|     6|       7|
|164533|0000920|     9|       7|
|164533|00013X9|     6|       7|
|167326|00005X7|     7|       7|
|168064|0000982|     0|       7|
|169663|5021880|     0|       7|
|171118|0000544|     8|       7|
|171118|00005X0|     0|       7|
|171118|00005X2|     8|       7|
|171118|00005X4|     0|       7|
|171703|0000085|     0|       7|
+------+-------+------+--------+
only showing top 20 rows



In [57]:
from pyspark.sql.functions import lpad
df_ratings.filter('len_isbn==5').select(lpad(df_ratings.isbn, 10, '0')).show()

+-----------------+
|lpad(isbn, 10, 0)|
+-----------------+
|       0000000008|
|       0000000001|
|       0000000007|
|       000000000X|
|       0000000005|
|       0000000005|
|       0000000007|
|       0000000009|
|       0000000009|
|       0000000006|
|       0000000012|
|       0000000000|
|       0000000000|
|       0000000005|
|       0000000007|
|       0000000008|
|       0000041052|
|       0000041184|
|       0000048019|
|       0000012345|
+-----------------+
only showing top 20 rows



In [None]:
#check how many records we have with len_isbn smaller than 10
df_ratings.filter('len_isbn<10').count()

6303

In [58]:
#pad those isbns with length smaller than 10 with 0
df_ratings=df_ratings.withColumn("isbn",when(df_ratings.len_isbn<10,lpad(df_ratings.isbn, 10, '0')).otherwise(df_ratings.isbn))

In [None]:
#check isbns with length 11 and 12
df_ratings.filter('len_isbn>10 and len_isbn<13').show()
df_ratings.filter('len_isbn>10 and len_isbn<13').count()

+------+------------+------+--------+
|userid|        isbn|rating|len_isbn|
+------+------------+------+--------+
|276875|014366020444|     8|      12|
|276875| 88741800047|     8|      11|
|276929| 20202006935|     8|      11|
|276929| 20203119888|     8|      11|
|276929| 22660861003|     6|      11|
|277187| 44901766125|     0|      11|
|277196|044920597545|     0|      12|
|277196|044923894625|     0|      12|
|277339| 44901668125|     7|      11|
|277351| 35181333217|     0|      11|
|277359|071009505506|     8|      12|
|277400| 88043077730|     0|      11|
|277716|671772465095|     0|      12|
|277984|780451162182|     0|      12|
|277984|780451524201|    10|      12|
|278026|780446525800|    10|      12|
|278313|084226244270|     0|      12|
|278418| 05008601814|     0|      11|
|278418| 22578962886|     0|      11|
|278418| 44901687095|     0|      11|
+------+------------+------+--------+
only showing top 20 rows



2061

In [59]:
# in case len_isbn is greater than 10 and less than 13 , cut 10 right characters
#df_ratings.filter(df_ratings.len_isbn==11).withColumn("isbn_new",when((df_ratings.len_isbn>10) & (df_ratings.len_isbn<13),substring("isbn", -10, 10)) \
#                      .otherwise(df_ratings.isbn)).show()
df_ratings=df_ratings.withColumn("isbn",when((df_ratings.len_isbn>10) & (df_ratings.len_isbn<13),substring("isbn", -10, 10)) \
                      .otherwise(df_ratings.isbn))

In [None]:
#check the length of isbn
df_ratings.select(length(col("isbn"))).distinct().show()

+------------+
|length(isbn)|
+------------+
|          13|
|          10|
+------------+



In [None]:
#check the length of isbn
df_ratings.filter(length(col("isbn"))==13).show()
df_ratings.filter(length(col("isbn"))==13).count()

+------+-------------+------+--------+
|userid|         isbn|rating|len_isbn|
+------+-------------+------+--------+
|277074|9782922145441|     5|      13|
|277516|0358159521888|     7|      13|
|278418|0978043904583|     0|      13|
|278637|9782264014184|     0|      13|
|278675|9782253172109|     9|      13|
|278691|9788401499883|     0|      13|
|   460|9770390107900|     8|      13|
|   626|9783499191411|     0|      13|
|   705|9788831774956|     0|      13|
|   709|9780590962735|     9|      13|
|   709|9781562477547|     9|      13|
|  1799|9771129085155|     0|      13|
|  3017|9788826313818|     0|      13|
|  3145|0345255585225|     0|      13|
|  3224|9788532219022|     0|      13|
|  3402|9788882902933|     8|      13|
|  3743|8022264121170|     0|      13|
|  4033|8014366020000|     9|      13|
|  4334|0630892372099|     0|      13|
|  4334|0630892372330|     6|      13|
+------+-------------+------+--------+
only showing top 20 rows



1076

In [60]:
#cut only first 10 characters if first 3 characters are different from 978 or 979
df_ratings=df_ratings.withColumn("isbn",when((substring('isbn', 0, 3)!='978') & (substring('isbn', 0, 3)!='979') & (length("isbn")==13),substring("isbn", 0, 10)) \
                             .otherwise(df_ratings.isbn))
df_ratings.filter(length(col("isbn"))==13).show()
df_ratings.filter(length(col("isbn"))==13).count()

+------+-------------+------+--------+
|userid|         isbn|rating|len_isbn|
+------+-------------+------+--------+
|277074|9782922145441|     5|      13|
|278637|9782264014184|     0|      13|
|278675|9782253172109|     9|      13|
|278691|9788401499883|     0|      13|
|   626|9783499191411|     0|      13|
|   705|9788831774956|     0|      13|
|   709|9780590962735|     9|      13|
|   709|9781562477547|     9|      13|
|  3017|9788826313818|     0|      13|
|  3224|9788532219022|     0|      13|
|  3402|9788882902933|     8|      13|
|  4814|9783453148239|     0|      13|
|  5201|9782709623810|     7|      13|
|  7155|9783426650752|     8|      13|
|  7155|9783596247509|     7|      13|
|  9856|9780393029291|     0|      13|
| 10963|9780140315387|     6|      13|
| 11198|9783548224916|     4|      13|
| 11676|9780340796146|    10|      13|
| 11676|9780380977253|     0|      13|
+------+-------------+------+--------+
only showing top 20 rows



571

In [None]:
df_ratings.orderBy(col('isbn').asc()).show()

+------+----------+------+--------+
|userid|      isbn|rating|len_isbn|
+------+----------+------+--------+
|  8094|0000000000|     0|      11|
| 56039|0000000000|     0|       4|
| 56039|0000000000|     7|       4|
|  3915|0000000000|     0|       4|
| 85772|0000000000|     7|      13|
|179624|0000000000|     0|       9|
| 11676|0000000000|     8|      11|
|214319|0000000000|     0|       8|
| 55488|0000000000|     0|       9|
|183958|0000000000|     8|      13|
|236645|0000000000|     0|       9|
|243243|0000000000|     7|       4|
|164533|0000000000|     7|       5|
|207650|0000000000|     0|      12|
|178899|0000000000|     0|       5|
|218187|0000000000|     9|       8|
|149452|0000000000|     1|       9|
|224450|0000000000|     0|       9|
| 50321|0000000000|     0|      12|
|234803|0000000000|     0|      13|
+------+----------+------+--------+
only showing top 20 rows



In [61]:
#drop all rows where isbn='0000000000'
df_ratings=df_ratings.filter("isbn!='0000000000'")

In [None]:
df_ratings.orderBy(col('isbn').asc()).show()

+------+----------+------+--------+
|userid|      isbn|rating|len_isbn|
+------+----------+------+--------+
| 40204|0000000001|     5|       6|
|  1903|0000000001|     5|       5|
|226370|0000000001|     0|       4|
|142454|0000000001|     0|       5|
|142454|0000000001|     0|       5|
|111741|0000000001|     0|       6|
|244277|0000000001|     0|       4|
| 40943|0000000001|     0|       4|
|222207|0000000001|     7|       4|
| 86153|0000000001|     8|       3|
|192093|0000000001|     9|       4|
| 58551|0000000001|     7|       4|
|216467|0000000002|     9|       6|
|212167|0000000002|     0|       1|
| 76865|0000000002|     0|       6|
| 74287|0000000002|     5|       6|
| 81977|0000000002|    10|       6|
| 74287|0000000003|    10|       5|
| 74287|0000000003|    10|       6|
|142454|0000000003|     0|       5|
+------+----------+------+--------+
only showing top 20 rows



In [None]:
df_ratings.count()

1048451

##Check duplicates

In [62]:
#check duplicates
df_ratings.distinct().count()

1048428

It seems that there are some duplicate records

In [63]:
#remove the duplicates
df_ratings = df_ratings.distinct()
print("Distinct count: "+str(df_ratings.count()))
df_ratings.show()

Distinct count: 1048428
+------+----------+------+--------+
|userid|      isbn|rating|len_isbn|
+------+----------+------+--------+
|276847|3442442796|     0|      10|
|277249|8846200209|     0|      10|
|277272|9722906283|     7|      10|
|277427|0553581929|     9|      10|
|277427|0553801430|     0|      10|
|277427|0836213076|    10|      10|
|277478|0582275164|     0|      10|
|277517|0452282152|     8|      10|
|277639|0380680157|     0|      10|
|277639|0394171349|     0|      10|
|277639|0590568809|     0|      10|
|277710|0553582763|     6|      10|
|277992|0855642440|     6|      10|
|278012|9025414532|     7|      10|
|278026|038533558X|     9|      10|
|278188|0821717138|     0|      10|
|278218|014017513X|     9|      10|
|278221|0804106304|     9|      10|
|278411|1551669293|     0|      10|
|278418|006020883X|     0|      10|
+------+----------+------+--------+
only showing top 20 rows



##check field userid

In [64]:
df_ratings.printSchema()

root
 |-- userid: integer (nullable = true)
 |-- isbn: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- len_isbn: integer (nullable = true)



In [65]:
#replace null values if exists
df_ratings=df_ratings.withColumn("userid",when(isnan("userid") | col("userid").isNull(), 0).otherwise(df_ratings.userid))

In [66]:
df_ratings=df_ratings.drop('len_isbn')

In [None]:
df_ratings.count()

1048428

#Read and analyse BX-Users

In [67]:
df_users = spark.read.csv(path+'/BX-Users.csv', inferSchema=True, header= True, sep = ';' ,encoding="ISO-8859-1")
df_users.show()
df_users.printSchema()
df_users.count()

+-------+--------------------+----+
|User-ID|            Location| Age|
+-------+--------------------+----+
|      1|  nyc, new york, usa|NULL|
|      2|stockton, califor...|  18|
|      3|moscow, yukon ter...|NULL|
|      4|porto, v.n.gaia, ...|  17|
|      5|farnborough, hant...|NULL|
|      6|santa monica, cal...|  61|
|      7| washington, dc, usa|NULL|
|      8|timmins, ontario,...|NULL|
|      9|germantown, tenne...|NULL|
|     10|albacete, wiscons...|  26|
|     11|melbourne, victor...|  14|
|     12|fort bragg, calif...|NULL|
|     13|barcelona, barcel...|  26|
|     14|mediapolis, iowa,...|NULL|
|     15|calgary, alberta,...|NULL|
|     16|albuquerque, new ...|NULL|
|     17|chesapeake, virgi...|NULL|
|     18|rio de janeiro, r...|  25|
|     19|           weston, ,|  14|
|     20|langhorne, pennsy...|  19|
+-------+--------------------+----+
only showing top 20 rows

root
 |-- User-ID: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Age: string (nullable

278859

In [70]:
#define first the dataframe structure and then read the file
from pyspark.sql.types import (StructField, IntegerType,
                               StringType, StructType)
data_schema = [StructField('Userid', IntegerType(), True), 
               StructField('Location', StringType(), True),
               StructField('Age', IntegerType(), True)]
final_struct = StructType(fields=data_schema) 

In [71]:
df_users = spark.read.load(path+'//BX-Users.csv', format="csv", header="true", sep=';',encoding="ISO-8859-1", schema=final_struct)
df_users.show()
df_users.printSchema()

+------+--------------------+----+
|Userid|            Location| Age|
+------+--------------------+----+
|     1|  nyc, new york, usa|null|
|     2|stockton, califor...|  18|
|     3|moscow, yukon ter...|null|
|     4|porto, v.n.gaia, ...|  17|
|     5|farnborough, hant...|null|
|     6|santa monica, cal...|  61|
|     7| washington, dc, usa|null|
|     8|timmins, ontario,...|null|
|     9|germantown, tenne...|null|
|    10|albacete, wiscons...|  26|
|    11|melbourne, victor...|  14|
|    12|fort bragg, calif...|null|
|    13|barcelona, barcel...|  26|
|    14|mediapolis, iowa,...|null|
|    15|calgary, alberta,...|null|
|    16|albuquerque, new ...|null|
|    17|chesapeake, virgi...|null|
|    18|rio de janeiro, r...|  25|
|    19|           weston, ,|  14|
|    20|langhorne, pennsy...|  19|
+------+--------------------+----+
only showing top 20 rows

root
 |-- Userid: integer (nullable = true)
 |-- Location: string (nullable = true)
 |-- Age: integer (nullable = true)



In [72]:
from pyspark.sql.functions import split,initcap, lower,upper,length,col,regexp_replace,asc,desc,trim,when
# split location to 4 columns
df_users1=df_users.withColumn("col1", split(("Location"), ",").getItem(0)).withColumn("col2", split(("Location"), ",") \
.getItem(1)).withColumn("col3", split(("Location"), ",").getItem(2)).withColumn("col4", split(("Location"), ",")\
.getItem(3))
df_users1.show()
#we can see country is in col 3, and some usa states in col2,drop col4 - it is all null
df_users2=df_users1.withColumnRenamed('col3', 'country').withColumnRenamed('col2', 'state').withColumnRenamed('col1', 'city').drop("col4")
df_users2.show()


+------+--------------------+----+--------------+----------------+---------------+----+
|Userid|            Location| Age|          col1|            col2|           col3|col4|
+------+--------------------+----+--------------+----------------+---------------+----+
|     1|  nyc, new york, usa|null|           nyc|        new york|            usa|null|
|     2|stockton, califor...|  18|      stockton|      california|            usa|null|
|     3|moscow, yukon ter...|null|        moscow| yukon territory|         russia|null|
|     4|porto, v.n.gaia, ...|  17|         porto|        v.n.gaia|       portugal|null|
|     5|farnborough, hant...|null|   farnborough|           hants| united kingdom|null|
|     6|santa monica, cal...|  61|  santa monica|      california|            usa|null|
|     7| washington, dc, usa|null|    washington|              dc|            usa|null|
|     8|timmins, ontario,...|null|       timmins|         ontario|         canada|null|
|     9|germantown, tenne...|nul

##Check Nulls

In [None]:
df_users2.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_users2.columns]).show()

+------+--------+------+----+-----+-------+
|Userid|Location|   Age|city|state|country|
+------+--------+------+----+-----+-------+
|     1|       0|110763|   0|    3|      4|
+------+--------+------+----+-----+-------+



##Check the filed age

In [None]:
df_users2.select("Age").distinct().orderBy(asc("Age")).show()

+----+
| Age|
+----+
|null|
|   0|
|   1|
|   2|
|   3|
|   4|
|   5|
|   6|
|   7|
|   8|
|   9|
|  10|
|  11|
|  12|
|  13|
|  14|
|  15|
|  16|
|  17|
|  18|
+----+
only showing top 20 rows



In [73]:
#change null or 0 values to -1
df_users2=df_users2.withColumn("Age",when(col("Age").isNull() | (col("Age")==0),'-1').otherwise(df_users2.Age))

In [None]:
df_users2.printSchema()

root
 |-- Userid: integer (nullable = true)
 |-- Location: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)



In [74]:
# Change the field Age to integer
df_users2=df_users2.withColumn("Age",df_users2.Age.cast("int"))
df_users2.printSchema()

root
 |-- Userid: integer (nullable = true)
 |-- Location: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)



In [None]:
#check the min and max values in age
df_users2.describe(['Age']).show()

+-------+------------------+
|summary|               Age|
+-------+------------------+
|  count|            278859|
|   mean|20.549446135860777|
| stddev|20.774559646451085|
|    min|                -1|
|    max|               244|
+-------+------------------+



In [None]:
df_users2.select("Age").distinct().orderBy(asc("Age")).show()

+---+
|Age|
+---+
| -1|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+
only showing top 20 rows



In [None]:
df_users2.select("Age").distinct().orderBy(desc("Age")).show()

+---+
|Age|
+---+
|244|
|239|
|237|
|231|
|230|
|229|
|228|
|226|
|223|
|220|
|219|
|212|
|210|
|209|
|208|
|207|
|204|
|201|
|200|
|199|
+---+
only showing top 20 rows



In [None]:
df_users2.filter("Age>120").orderBy(asc("Age")).show()

+------+--------------------+---+--------------------+-----------+--------------------+
|Userid|            Location|Age|                city|      state|             country|
+------+--------------------+---+--------------------+-----------+--------------------+
|221392|tallinn, n/a, est...|123|             tallinn|        n/a|             estonia|
| 27718|blublub city, n/a...|123|        blublub city|        n/a|              belize|
| 85847|shanghai, texas, ...|123|            shanghai|      texas|               china|
|245085|succ, new jersey,...|123|                succ| new jersey|                 usa|
| 17903|antiqua, n/a, ant...|123|             antiqua|        n/a| antigua and barbuda|
| 83883|danang, danang, v...|123|              danang|     danang|             vietnam|
| 73699|zaragoza, n/a, spain|123|            zaragoza|        n/a|               spain|
|146285|ariege, montsegur...|123|              ariege|  montsegur|             rosello|
| 86721|santiago de compo...|124

In [None]:
#check how many recrds we have with age >120
df_users2.filter("Age>120").count()

78

In [75]:
#update age>120 to -1
df_users2=df_users2.withColumn("Age",when(col("Age")>120,'-1').otherwise(df_users2.Age))

In [None]:
#check now the values in age
df_users2.describe(['Age']).show()

+-------+------------------+
|summary|               Age|
+-------+------------------+
|  count|            278859|
|   mean|20.501389591155387|
| stddev| 20.61475144869394|
|    min|                -1|
|    max|                99|
+-------+------------------+



##add field with category of ages

In [76]:
from pyspark.sql.functions import count,desc
df_users2=df_users2.withColumn("Age_cat", when(df_users2.Age.between(1,12), "Children (<=12)") \
.when(df_users2.Age.between(13,20), "Youngs (13-20)").when(df_users2.Age.between(21,40),"Young Adults (21-40)")\
.when(df_users2.Age.between(41,70),"Adults (41-70)").when(df_users2.Age.between(71,120),"Seniours (70+)") \
.otherwise("Unknow"))
df_users2.show()
df_users2.groupBy('Age_cat').agg(count('Age_cat').alias("count")).sort(desc("count")).show()

+------+--------------------+---+--------------+----------------+---------------+--------------------+
|Userid|            Location|Age|          city|           state|        country|             Age_cat|
+------+--------------------+---+--------------+----------------+---------------+--------------------+
|     1|  nyc, new york, usa| -1|           nyc|        new york|            usa|              Unknow|
|     2|stockton, califor...| 18|      stockton|      california|            usa|      Youngs (13-20)|
|     3|moscow, yukon ter...| -1|        moscow| yukon territory|         russia|              Unknow|
|     4|porto, v.n.gaia, ...| 17|         porto|        v.n.gaia|       portugal|      Youngs (13-20)|
|     5|farnborough, hant...| -1|   farnborough|           hants| united kingdom|              Unknow|
|     6|santa monica, cal...| 61|  santa monica|      california|            usa|      Adults (41-70)|
|     7| washington, dc, usa| -1|    washington|              dc|        

## Check field User

In [77]:
df_users2.printSchema()

root
 |-- Userid: integer (nullable = true)
 |-- Location: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- Age_cat: string (nullable = false)



In [None]:
df_users2.count()

278859

In [None]:
df_users2.select("Userid").distinct().orderBy(asc("Userid")).show()

+------+
|Userid|
+------+
|  null|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
|    10|
|    11|
|    12|
|    13|
|    14|
|    15|
|    16|
|    17|
|    18|
|    19|
+------+
only showing top 20 rows



In [None]:
#check how many null recrds
df_users2.filter(df_users2.Userid.isNull()).count()

1

In [78]:
#drop all null users
df_users2=df_users2.filter(df_users2.Userid.isNotNull())
df_users2.count()

278858

In [None]:
df_users2.select("Userid").distinct().orderBy(asc("Userid")).show()

+------+
|Userid|
+------+
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
|    10|
|    11|
|    12|
|    13|
|    14|
|    15|
|    16|
|    17|
|    18|
|    19|
|    20|
+------+
only showing top 20 rows



In [None]:
df_users2.select("Userid").distinct().orderBy(desc("Userid")).show()

+------+
|Userid|
+------+
|278858|
|278857|
|278856|
|278855|
|278854|
|278853|
|278852|
|278851|
|278850|
|278849|
|278848|
|278847|
|278846|
|278845|
|278844|
|278843|
|278842|
|278841|
|278840|
|278839|
+------+
only showing top 20 rows



In [None]:
df_users2.printSchema()
df_users2.count()

root
 |-- Userid: integer (nullable = true)
 |-- Location: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- Age_cat: string (nullable = false)



278858

In [None]:
#Check duplicate records
df_users2.select("Userid").distinct().count()

278858

It seems there aren't any duplicate records

##Check field country

clean country ,states and city from space and other signs

In [79]:
from pyspark.sql.functions import lower,initcap,trim,regexp_replace,col

df_users2=df_users2.withColumn("country", trim(col("country")))
df_users2=df_users2.withColumn("country",regexp_replace("country","[0-9~;:'./?<>|@#$%^&*()]", ""))
df_users2=df_users2.withColumn("state", trim(col("state")))
df_users2=df_users2.withColumn("state",regexp_replace("state","[0-9~;:'./?<>|@#$%^&*()]", ""))

#first lower all digits and then init cap
df_users2=df_users2.withColumn("country",lower("country")).withColumn("state",lower("state"))
df_users2=df_users2.withColumn("country",initcap("country")).withColumn("state",initcap("state"))
 #patches changes in order to fit closed list of countris 

df_users2=df_users2.withColumn('country', regexp_replace('country', 'Usa', 'USA'))\
.withColumn('country', regexp_replace('country', 'Us', 'USA'))\
.withColumn('country', regexp_replace('country', 'Uk', 'UK'))\
.withColumn('country', regexp_replace('country', 'United Kingdom', 'UK'))\
.withColumn('country', regexp_replace('country', 'England', 'UK'))\
.withColumn('country', regexp_replace('country', 'Scotland', 'UK'))\
.withColumn('country', regexp_replace('country', 'Wales', 'UK'))
df_users2.show(truncate=False)

+------+--------------------------------------+---+--------------+---------------+---------+--------------------+
|Userid|Location                              |Age|city          |state          |country  |Age_cat             |
+------+--------------------------------------+---+--------------+---------------+---------+--------------------+
|1     |nyc, new york, usa                    |-1 |nyc           |New York       |USA      |Unknow              |
|2     |stockton, california, usa             |18 |stockton      |California     |USA      |Youngs (13-20)      |
|3     |moscow, yukon territory, russia       |-1 |moscow        |Yukon Territory|Russia   |Unknow              |
|4     |porto, v.n.gaia, portugal             |17 |porto         |Vngaia         |Portugal |Youngs (13-20)      |
|5     |farnborough, hants, united kingdom    |-1 |farnborough   |Hants          |UK       |Unknow              |
|6     |santa monica, california, usa         |61 |santa monica  |California     |USA   

In [80]:
#show most freq countries and states
from pyspark.sql.functions import count,desc
df_users2.groupBy('country').agg(count('country').alias("count_cnt")).filter(col('count_cnt') > 100).sort(desc("count_cnt")).show(300)
df_users2.groupBy('state').agg(count('state').alias("count_st")).filter(col('count_st') > 100).sort(desc("count_st")).show(300)
#USA 139196
#Null 4634

+--------------+---------+
|       country|count_cnt|
+--------------+---------+
|           USA|   139196|
|        Canada|    21558|
|            UK|    18587|
|       Germany|    17052|
|         Spain|    13206|
|     Australia|    11725|
|         Italy|    11246|
|              |     4634|
|        France|     3474|
|      Portugal|     3371|
|   New Zealand|     3094|
|   Netherlands|     3044|
|   Switzerland|     1757|
|        Brazil|     1677|
|         China|     1469|
|        Sweden|     1452|
|         India|     1282|
|       Austria|     1147|
|      Malaysia|     1102|
|     Argentina|     1081|
|       Finland|      941|
|     Singapore|      926|
|       Denmark|      852|
|       Belgium|      820|
|        Mexico|      812|
|       Ireland|      755|
|   Philippines|      719|
|        Turkey|      495|
|        Poland|      458|
|      Pakistan|      435|
|        Greece|      425|
|          Iran|      407|
|       Romania|      363|
|         Chile|      355|
|

load countries and us states data files and make it a list

In [81]:
country = spark.read.csv(path+'/countries.csv', inferSchema=True, header= True)
country.show(300)
country.printSchema()
cc=country.select("ContryName").collect()
list_country=[i[0] for i in cc]
list_country
state = spark.read.csv(path+'/states.csv', inferSchema=True, header= True)
state.show()
ss=state.select("state").collect()
list_state=[i[0] for i in ss]
list_state

+--------------------+
|          ContryName|
+--------------------+
|         Afghanistan|
|       Aland Islands|
|             Albania|
|             Algeria|
|      American Samoa|
|             Andorra|
|              Angola|
|            Anguilla|
|          Antarctica|
| Antigua and Barbuda|
|           Argentina|
|             Armenia|
|               Aruba|
|           Australia|
|             Austria|
|          Azerbaijan|
|             Bahamas|
|             Bahrain|
|          Bangladesh|
|            Barbados|
|             Belarus|
|             Belgium|
|              Belize|
|               Benin|
|             Bermuda|
|              Bhutan|
|             Bolivia|
|             Bonaire|
|Bosnia and Herzeg...|
|            Botswana|
|       Bouvet Island|
|              Brazil|
|British Indian Oc...|
|   Brunei Darussalam|
|            Bulgaria|
|        Burkina Faso|
|             Burundi|
|       Côte d'Ivoire|
|            Cambodia|
|            Cameroon|
|          

['Alaska',
 'Alabama',
 'Arkansas',
 'Arizona',
 'California',
 'Colorado',
 'Connecticut',
 'District of Columbia',
 'Delaware',
 'Florida',
 'Georgia',
 'Hawaii',
 'Iowa',
 'Idaho',
 'Illinois',
 'Indiana',
 'Kansas',
 'Kentucky',
 'Louisiana',
 'Massachusetts',
 'Maryland',
 'Maine',
 'Michigan',
 'Minnesota',
 'Missouri',
 'Mississippi',
 'Montana',
 'North Carolina',
 'North Dakota',
 'Nebraska',
 'New Hampshire',
 'New Jersey',
 'New Mexico',
 'Nevada',
 'New York',
 'Ohio',
 'Oklahoma',
 'Oregon',
 'Pennsylvania',
 'Puerto Rico',
 'Rhode Island',
 'South Carolina',
 'South Dakota',
 'Tennessee',
 'Texas',
 'Utah',
 'Virginia',
 'Vermont',
 'Washington',
 'Wisconsin',
 'West Virginia',
 'Wyoming']

detrmine a valid country and a valid us state

In [82]:
df_user2=df_users2.withColumn("valid_country",  when(df_users2.country.isin(list_country), 1).otherwise(0))
df_user2.groupBy('valid_country').agg(count('valid_country').alias("count")).sort(desc("count")).show()
# 6952 not valid country
df_user2.groupBy('country').agg(count('country').alias("count")).sort(desc("count")).show()
#139196 valied
#show group by of unvalid countries
df_user2.filter(df_user2.valid_country==0).groupBy('country').agg(count('country').alias("count")).sort(desc("count")).show(100)
# 4634 blank

+-------------+------+
|valid_country| count|
+-------------+------+
|            1|271907|
|            0|  6951|
+-------------+------+

+-----------+------+
|    country| count|
+-----------+------+
|        USA|139196|
|     Canada| 21558|
|         UK| 18587|
|    Germany| 17052|
|      Spain| 13206|
|  Australia| 11725|
|      Italy| 11246|
|           |  4634|
|     France|  3474|
|   Portugal|  3371|
|New Zealand|  3094|
|Netherlands|  3044|
|Switzerland|  1757|
|     Brazil|  1677|
|      China|  1469|
|     Sweden|  1452|
|      India|  1282|
|    Austria|  1147|
|   Malaysia|  1102|
|  Argentina|  1081|
+-----------+------+
only showing top 20 rows

+--------------------+-----+
|             country|count|
+--------------------+-----+
|                    | 4634|
|          Yugoslavia|  174|
|                  Na|   91|
|              España|   69|
|          California|   64|
|Bosnia And Herzeg...|   58|
|             UKraine|   41|
|               Texas|   39|
|           

In [83]:
# replace blank countries with USA if state column is in list state
df_user3=df_user2.withColumn("country",  when(((df_user2.country =="") & (df_user2.country.isin(list_state))), "USA").otherwise(df_user2.country))


In [84]:
# replace blank countries with state value if state column is in list country
df_user3=df_user3.withColumn("country",  when(((df_user3.country =="") & (df_user3.state.isin(list_country))),df_user3.state).otherwise(df_user3.country))

In [85]:
# replace blank countries with USA value if state column is in list state
df_user3=df_user3.withColumn("country",  when(((df_user3.country =="") & (df_user3.state.isin(list_state))), "USA").otherwise(df_user3.country))

In [86]:
# finaly serach again if country is in country list, if not put unknow.
df_user3=df_user3.withColumn("country",  when(df_user3.country.isin(list_country), df_user3.country).otherwise("Unknown"))

In [87]:
#update the valid country field after all tranformations
df_user3=df_user3.withColumn("valid_country",  when(df_user3.country.isin(list_country), 1).otherwise(0))

In [None]:
df_user3.groupBy('valid_country').agg(count('valid_country').alias("count")).sort(desc("count")).show()
df_user3.groupBy('country').agg(count('country').alias("count")).sort(desc("count")).show(100)
#USA 139830
#6256 unknown

+-------------+------+
|valid_country| count|
+-------------+------+
|            1|272602|
|            0|  6256|
+-------------+------+

+--------------------+------+
|             country| count|
+--------------------+------+
|                 USA|139830|
|              Canada| 21558|
|                  UK| 18587|
|             Germany| 17054|
|               Spain| 13209|
|           Australia| 11725|
|               Italy| 11251|
|             Unknown|  6256|
|              France|  3476|
|            Portugal|  3372|
|         New Zealand|  3095|
|         Netherlands|  3045|
|         Switzerland|  1758|
|              Brazil|  1677|
|               China|  1471|
|              Sweden|  1454|
|               India|  1284|
|             Austria|  1147|
|            Malaysia|  1102|
|           Argentina|  1081|
|             Finland|   942|
|           Singapore|   928|
|             Denmark|   854|
|             Belgium|   820|
|              Mexico|   812|
|             Ireland

In [None]:
df_user3.printSchema()

root
 |-- Userid: integer (nullable = true)
 |-- Location: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- Age_cat: string (nullable = false)
 |-- valid_country: integer (nullable = false)



In [88]:
df_user4=df_user3.drop("Location","valid_country")
df_user4.show()

+------+---+--------------+---------------+---------+--------------------+
|Userid|Age|          city|          state|  country|             Age_cat|
+------+---+--------------+---------------+---------+--------------------+
|     1| -1|           nyc|       New York|      USA|              Unknow|
|     2| 18|      stockton|     California|      USA|      Youngs (13-20)|
|     3| -1|        moscow|Yukon Territory|   Russia|              Unknow|
|     4| 17|         porto|         Vngaia| Portugal|      Youngs (13-20)|
|     5| -1|   farnborough|          Hants|       UK|              Unknow|
|     6| 61|  santa monica|     California|      USA|      Adults (41-70)|
|     7| -1|    washington|             Dc|      USA|              Unknow|
|     8| -1|       timmins|        Ontario|   Canada|              Unknow|
|     9| -1|    germantown|      Tennessee|      USA|              Unknow|
|    10| 26|      albacete|      Wisconsin|    Spain|Young Adults (21-40)|
|    11| 14|     melbourn

In [None]:
#Check if there are any nulls left
df_user4.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_user4.columns]).show()

+------+---+----+-----+-------+-------+
|Userid|Age|city|state|country|Age_cat|
+------+---+----+-----+-------+-------+
|     0|  0|   0|    2|      0|      0|
+------+---+----+-----+-------+-------+



In [89]:
#replace null values in state and city
df_users4=df_user4.withColumn("state",when((col("state").isNull())|(isnan(col("state"))) | (col("state").contains('null')),'unknown').otherwise(df_user4.state))
df_users4=df_user4.withColumn("city",when((col("city").isNull())|(isnan(col("state"))),'unknown').otherwise(df_user4.city))

In [90]:
#Check if there are any nulls left
df_user4.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_user4.columns]).show()

+------+---+----+-----+-------+-------+
|Userid|Age|city|state|country|Age_cat|
+------+---+----+-----+-------+-------+
|     0|  0|   0|    2|      0|      0|
+------+---+----+-----+-------+-------+



In [None]:
#check the null records
df_users4.filter(col("state").isNull()).show()

+------+---+-------------+-----+-------+--------------------+
|Userid|Age|         city|state|country|             Age_cat|
+------+---+-------------+-----+-------+--------------------+
|134377| 30|lawrenceville| null|Unknown|Young Adults (21-40)|
|275081| -1|   cernusco s| null|Unknown|              Unknow|
+------+---+-------------+-----+-------+--------------------+



#Check with Dana/Amir why null value isn't replaced

#check joins between df_books and df_ratings

In [91]:
print('num of records in ratings before join:',df_ratings.count())
print('num of records in users before join:',df_users.count())
print('num of records in books before join:',df_books.count())

num of records in ratings before join: 1048428
num of records in users before join: 278859
num of records in books before join: 270949


In [92]:
df_books.printSchema()

root
 |-- isbn: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author: string (nullable = true)
 |-- year: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- image_s: string (nullable = true)
 |-- image_m: string (nullable = true)
 |-- image_l: string (nullable = true)
 |-- fix_year: string (nullable = true)



In [93]:
df_books1=df_books.select(col("isbn").alias("isbn_books"),"title","author","year","publisher")
df_books1.printSchema()

root
 |-- isbn_books: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author: string (nullable = true)
 |-- year: string (nullable = true)
 |-- publisher: string (nullable = true)



In [94]:
df_rating_booksl=df_ratings.join(df_books1,df_ratings.isbn ==  df_books1.isbn_books,"left")

In [None]:
df_rating_booksl.show(10,truncate=False)

+------+----------+------+----------+--------------------+-------------+----+----------------------------+
|userid|isbn      |rating|isbn_books|title               |author       |year|publisher                   |
+------+----------+------+----------+--------------------+-------------+----+----------------------------+
|224536|0000258269|7     |null      |null                |null         |null|null                        |
|227275|0002234947|0     |0002234947|miss hobbema pageant|w.p. kinsella|1990|harpercollins juvenile books|
|219445|0002234947|0     |0002234947|miss hobbema pageant|w.p. kinsella|1990|harpercollins juvenile books|
|39608 |0002234947|0     |0002234947|miss hobbema pageant|w.p. kinsella|1990|harpercollins juvenile books|
|86033 |0002261537|7     |null      |null                |null         |null|null                        |
|170184|0004244117|10    |null      |null                |null         |null|null                        |
|52584 |0004516306|0     |null      |

In [None]:
df_rating_booksl.count()

1048428

In [None]:
df_rating_booksl.filter("isbn_books is null").count()

It seems that there are some books with ratings that are missing in df_books- lets change them to unknown member

In [201]:
#add new column "isbn_ukm" including unknown member to df_books
df_books1=df_books.select("isbn","title","author","year","publisher","fix_year")
df_books1.printSchema()

root
 |-- isbn: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author: string (nullable = true)
 |-- year: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- fix_year: string (nullable = true)



In [202]:
columns = ["isbn","title","author","year","publisher","fix_year"]

values = [("9999","unknown member","unknown member","-1","unknown member","-1")]
newRow = spark.createDataFrame(values,columns)

df_books1 = df_books1.union(newRow)

In [203]:
newRow.show()

+----+--------------+--------------+----+--------------+--------+
|isbn|         title|        author|year|     publisher|fix_year|
+----+--------------+--------------+----+--------------+--------+
|9999|unknown member|unknown member|  -1|unknown member|      -1|
+----+--------------+--------------+----+--------------+--------+



In [209]:
df_books1.filter("isbn='9999'").show()

+----+--------------+--------------+----+--------------+--------+
|isbn|         title|        author|year|     publisher|fix_year|
+----+--------------+--------------+----+--------------+--------+
|9999|unknown member|unknown member|  -1|unknown member|      -1|
+----+--------------+--------------+----+--------------+--------+



In [210]:
df_books1.count()

270950

In [97]:
#create a list of the isbn's which are unknown
df_isbn_ukm=df_rating_booksl.filter(df_rating_booksl.isbn_books.isNull()).select(df_rating_booksl.isbn).collect()
list_isbn_ukm=list_country=[i[0] for i in df_isbn_ukm]

In [98]:
#add new column "isbn_ukm" including unknown member to df_ratings
df_ratings1=df_ratings.withColumn('isbn_ukm',when(df_ratings.isbn.isin(list_isbn_ukm),"9999").otherwise(df_ratings.isbn))

In [None]:
df_ratings1.show()

In [140]:
df_books1.filter(df_books.isbn.isNull()|isnan("isbn")).show(10)

+----+-----+------+----+---------+--------+
|isbn|title|author|year|publisher|fix_year|
+----+-----+------+----+---------+--------+
+----+-----+------+----+---------+--------+



#check join between df_users4 and df_ratings 

In [None]:
df_users4.printSchema()

In [99]:
df_users5=df_users4.select(col("Userid").alias("userid_users"))
df_users5.printSchema()

root
 |-- userid_users: integer (nullable = true)



In [100]:
df_rating_usersl=df_ratings.join(df_users5,df_ratings.userid ==  df_users5.userid_users,"left")
df_rating_usersl.printSchema()
df_rating_usersl.show(10,truncate=False)

root
 |-- userid: integer (nullable = true)
 |-- isbn: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- userid_users: integer (nullable = true)

+------+----------+------+------------+
|userid|isbn      |rating|userid_users|
+------+----------+------+------------+
|276847|3442442796|0     |276847      |
|277249|8846200209|0     |277249      |
|277272|9722906283|7     |277272      |
|277427|0553581929|9     |277427      |
|277427|0553801430|0     |277427      |
|277427|0836213076|10    |277427      |
|277478|0582275164|0     |277478      |
|277517|0452282152|8     |277517      |
|277639|0380680157|0     |277639      |
|277639|0394171349|0     |277639      |
+------+----------+------+------------+
only showing top 10 rows



In [None]:
#check if there are users which appear in ratings but don't exist in users file
df_rating_usersl.filter("userid_users is null").show()

It seems all users in df_ratings appear in df_users4 

#Download df_books1 to csv

In [None]:
df_books1.printSchema()

root
 |-- isbn: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author: string (nullable = true)
 |-- year: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- fix_year: string (nullable = true)



In [145]:
#change type isbn and year
df_books2=df_books1.withColumn("year",df_books1.year.cast("int"))
df_books2.printSchema()

root
 |-- isbn: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- publisher: string (nullable = true)
 |-- fix_year: string (nullable = true)



In [146]:
df_books2.filter(df_books2.isbn.isNull()|isnan("isbn")).show(10)

+----+-----+------+----+---------+--------+
|isbn|title|author|year|publisher|fix_year|
+----+-----+------+----+---------+--------+
+----+-----+------+----+---------+--------+



In [148]:
#before downloading to csv remove ; characters from title, author, publisher
df_books2=df_books2.withColumn("title",regexp_replace("title","[;]", ""))
df_books2=df_books2.withColumn("author",regexp_replace("author","[;]", ""))
df_books2=df_books2.withColumn("publisher",regexp_replace("publisher","[;]", ""))

In [149]:
df_books2.printSchema()

root
 |-- isbn: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- publisher: string (nullable = true)
 |-- fix_year: string (nullable = true)



In [150]:
#remove the field fix_year since it might be the reason that we can not download it to csv
df_books3=df_books2.select("isbn","title","author","year","publisher")

In [160]:
df_books3.filter(df_books3.isbn.isNull()|isnan("isbn")).count()

0

In [None]:
# Export the new clean DataFrame to csv file
#df_books3.coalesce(1).write.option("header",True).csv(path+"/Dim_Books.csv")

In [156]:
# Coalesce and save the data in CSV format
df_books3.coalesce(1).write.csv('content/drive/MyDrive/BDA/books', mode='overwrite', sep=';', header=True)

In [157]:
# Look at the output of the command using the shell command `ls`
!ls "content/drive/MyDrive/BDA/books"

part-00000-211e092d-9dfc-4f35-9fe7-a6fe8a4158ba-c000.csv  _SUCCESS


In [158]:
!mv content/drive/MyDrive/BDA/books/part-00000*.csv "content/drive/MyDrive/BDA/final_books.csv"

In [159]:
# Download the file via notebook tools
from google.colab import files
files.download('content/drive/MyDrive/BDA/final_books.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

#Download df_users4 to csv

In [105]:
df_users4.printSchema()

root
 |-- Userid: integer (nullable = true)
 |-- Age: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- Age_cat: string (nullable = false)



In [106]:
#change types  of age
df_users5=df_users4.withColumn("Age",df_users4.Age.cast("int"))
df_users5.printSchema()

root
 |-- Userid: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- Age_cat: string (nullable = false)



In [107]:
#before downloading to csv remove ; characters from city, state, country
df_users5=df_users5.withColumn("city",regexp_replace("city","[;]", ""))
df_users5=df_users5.withColumn("state",regexp_replace("state","[;]", ""))
df_users5=df_users5.withColumn("country",regexp_replace("country","[;]", ""))

In [116]:
# Coalesce and save the data in CSV format
df_users5.coalesce(1).write.csv('content/drive/MyDrive/BDA/users', mode='overwrite', sep=';', header=True)


In [117]:
# Look at the output of the command using the shell command `ls`
!ls "content/drive/MyDrive/BDA/users"

part-00000-ff2d3f73-010d-479c-949f-f327cc82bf31-c000.csv  _SUCCESS


In [118]:
!mv content/drive/MyDrive/BDA/users/part-00000*.csv "content/drive/MyDrive/BDA/final_users.csv"

In [120]:
# Download the file via notebook tools
from google.colab import files
files.download('content/drive/MyDrive/BDA/final_users.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

#Download df_ratings1 to csv

In [147]:
df_ratings1.printSchema()

root
 |-- userid: integer (nullable = true)
 |-- isbn: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- isbn_ukm: string (nullable = true)



In [215]:
from pyspark.sql.functions import avg
df_ratings1.filter("rating>0").agg(avg(col("rating"))).show()

+-----------------+
|      avg(rating)|
+-----------------+
|7.601831896551724|
+-----------------+



In [121]:
# Coalesce and save the data in CSV format
df_ratings1.coalesce(1).write.csv('content/drive/MyDrive/BDA/ratings', mode='overwrite', sep=';', header=True)

In [122]:
# Look at the output of the command using the shell command `ls`
!ls "content/drive/MyDrive/BDA/ratings"

part-00000-186ce357-cc43-479d-9b3c-8d1ce3f2a8f4-c000.csv  _SUCCESS


In [123]:
!mv content/drive/MyDrive/BDA/ratings/part-00000*.csv "content/drive/MyDrive/BDA/final_ratings.csv"

In [131]:
# Download the file via notebook tools
from google.colab import files
files.download('content/drive/MyDrive/BDA/final_ratings.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# Stop Session

In [216]:
spark.stop()