---
# **Book-Crossing Data Cleaning**
---

In [1]:
# Install Java, Spark
!apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

# Start Spark seesion
import findspark
findspark.init("spark-3.3.1-bin-hadoop3")
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()
from pyspark.sql import Row
from pyspark.sql import functions


# Establishing connection to GD:
from google.colab import drive
drive.mount('/content/drive')

[33m0% [Working][0m            Hit:1 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
[33m0% [Connecting to archive.ubuntu.com (91.189.91.39)] [Connecting to security.ub[0m                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
                                                                               Hit:3 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
                                                                               Hit:4 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
                                                                               Hit:5 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
[33m                                                                               0% [Waiting for headers] [Waiting for headers] [Waiting for headers][0m[33m0% [1 InRel

In [2]:
# Importing libraries:

from pyspark.sql.types import (StructField, IntegerType, StringType, StructType)
import pyspark.sql.functions as F
from pyspark.sql.functions import (when, trim)
from pyspark.sql.functions import (countDistinct, avg, length, split, upper)
from pyspark.sql.functions import format_number
from pyspark.sql.functions import (col, round, substring)
from pyspark.sql.functions import (desc, asc, ceil)
from pyspark.sql.functions import (regexp_replace, udf)
from pyspark.rdd import RDD
from zmq.constants import NULL
import string

## **BOOKS DATASET**

In [None]:
# Install isbnlib:

!pip install isbnlib
import isbnlib as ISBN

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting isbnlib
  Downloading isbnlib-3.10.12-py2.py3-none-any.whl (66 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m66.5/66.5 KB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: isbnlib
Successfully installed isbnlib-3.10.12


In [None]:
# Defining a schema:

data_schema = StructType([StructField('ISBN', StringType(), True), 
               StructField('Book-Title', StringType(), True),
               StructField('Book-Author', StringType(), True),
               StructField('Year-Of-Publication', IntegerType(), True),
               StructField('Publisher', StringType(), True),
               StructField('Image-URL-S', StringType(), True),
               StructField('Image-URL-M', StringType(), True),
               StructField('Image-URL-L', StringType(), True)])



# Load Books dataset into a dataframe:

df_books = spark.read.options(delimiter=";", encoding='windows-1252', header=True).csv('/content/drive/MyDrive/Spark/ProjectBookCrossing/BX-Books.csv', schema = data_schema)


In [None]:
# Viewing df_books:

df_books.sample(fraction=0.1).show(10,truncate=False)

+----------+-----------------------------------------------------------------------------+-----------------+-------------------+---------------------------------------+------------------------------------------------------------+------------------------------------------------------------+------------------------------------------------------------+
|ISBN      |Book-Title                                                                   |Book-Author      |Year-Of-Publication|Publisher                              |Image-URL-S                                                 |Image-URL-M                                                 |Image-URL-L                                                 |
+----------+-----------------------------------------------------------------------------+-----------------+-------------------+---------------------------------------+------------------------------------------------------------+------------------------------------------------------------+------

In [None]:
# Dropping unnecessary columns:

df_books = df_books.drop('Image-URL-S', 'Image-URL-M', 'Image-URL-L')
df_books.printSchema()
print('Total number of rows: {}'.format(df_books.count()))

root
 |-- ISBN: string (nullable = true)
 |-- Book-Title: string (nullable = true)
 |-- Book-Author: string (nullable = true)
 |-- Year-Of-Publication: integer (nullable = true)
 |-- Publisher: string (nullable = true)

Total number of rows: 271379


---
**ISBN Column**

---

The International Standard Book Number (ISBN) is a unique numeric commercial book identifier. An ISBN is assigned to each separate edition and variation (except reprintings) of a publication. The ISBN is 10-digits long if assigned before 2007, and 13-digits long if assigned on or after 1 January 2007. ISBN characters are either 0-9 digits or the letter 'X'.

In [None]:
# Uppercase column ISBN:

df_books = df_books.withColumn('ISBN', upper(col('ISBN')))

# Trimming column ISBN:

df_books = df_books.withColumn('ISBN', trim(col('ISBN')))

In [None]:
# Checking for nulls or empty spaces:

NumNull = df_books.filter(col('ISBN').isNull()).count()
NumSpace = df_books.filter(col('ISBN')=='').count()

print('There are {} Nulls and {} empty spaces in ISBN column'.format(NumNull, NumSpace))

There are 0 Nulls and 0 empty spaces in ISBN column


In [None]:
# Checking if there are characters in 'ISBN' that are not digits or the letter 'X':

InvalidChar = df_books.filter(col('ISBN').rlike("[^0-9X]")).count()
print("There are {} ISBN's that include invalid characters.".format(InvalidChar))
df_books.select(col('ISBN')).filter(col('ISBN').rlike("[^0-9X]")).show(truncate=False)

There are 118 ISBN's that include invalid characters.
+----------+
|ISBN      |
+----------+
|B00009ANY9|
|B0000A2U93|
|B0000633PU|
|B00007FYKO|
|B00009APKU|
|B00008NRHQ|
|B0000DAPP1|
|B000069F44|
|B00005NCS7|
|B00007MAM9|
|B00006CRTE|
|B00007MF56|
|B0000523SS|
|B0001GMSV2|
|B00005Q8R2|
|B000051WXP|
|B00009NDAN|
|B00001U0CP|
|B00005U7YK|
|B000234N76|
+----------+
only showing top 20 rows



In [None]:
# Removing all invalid ISBN characters:

df_books = df_books.withColumn('ISBN', F.regexp_replace(col('ISBN'), '[^0-9X]', ''))

Since this dataset dates back to 2004, we expect ISBN's to be 10-digits long.

In [None]:
# Checking if there are 'ISBN's in different length than expected length of 10:

InvalidLength = df_books.filter(length(col('ISBN')) != 10).count()
print("There are {} ISBN's that are invalid length.".format(InvalidLength))
df_books.filter(length(col('ISBN')) != 10).show(truncate=False)

There are 117 ISBN's that are invalid length.
+--------+------------------------------------------------------------------------------+-----------------------+-------------------+-------------------------+
|ISBN    |Book-Title                                                                    |Book-Author            |Year-Of-Publication|Publisher                |
+--------+------------------------------------------------------------------------------+-----------------------+-------------------+-------------------------+
|000099  |Cane River                                                                    |Lalita Tademy          |2001               |Warner Books             |
|0000293 |Carmilla                                                                      |Joseph Sheridan Le Fanu|0                  |Soft Editions Ltd        |
|0000633 |The Story of Aladdin and the Wonderful Lamp                                   |S. Lane Poole          |0                  |Renaissance eBooks   

To validate ISBN's, we are using the isbnlib library that provides methods and functions to validate, clean and transform ISBN strings.

In [None]:
def validate_isbn(x):

  '''The function checks if the string evaluates as a 13-digits ISBN (in which case, transforms it to its equivalent 10-digits form) 
  or a 10-digits ISBN form, and returns the validated ISBN or Null if not validated.'''

  try: 
      lst = ISBN.get_isbnlike(x)
      for i in lst:
        if ISBN.is_isbn13(i) is True:
          return ISBN.to_isbn10(i)
        elif ISBN.is_isbn10(i) is True:
          return i
        else:
          return None

  except:
    return None

# Defining a udf and transforming the ISBN column:

isbnUDF = udf(lambda x: validate_isbn(x))
df_books = df_books.withColumn('ISBN', isbnUDF(col('ISBN')))

In [None]:
# How many ISBN's haven't been validated? 

NullISBN = df_books.filter(col('ISBN').isNull()).count()
print("There are {} invalid ISBN's found and transformed to Nulls.".format(NullISBN))

There are 114 invalid ISBN's found and transformed to Nulls.


We can transform Null values per all the other columns to 'Unknown' and still use the partial row data for analysis.
However, Null ISBN values are useless as this column will be used as a key to connect to other tables. Therefore, all ISBN values shouldn't be nulls or duplicates.


In [None]:
# Dropping invalid (null) ISBN:

df_books = df_books.dropna(subset=['ISBN'])

In [None]:
# Checking for 'ISBN' duplicates:
NumDup = df_books.groupBy(['ISBN', 'Book-Title', 'Book-Author', 'Year-Of-Publication','Publisher']).count().filter('count>1').count()
print('There are {} duplicate enteries.'.format(NumDup))
df_books.groupBy(['ISBN', 'Book-Title', 'Book-Author', 'Year-Of-Publication','Publisher']).count().filter('count>1').sort(desc('count')).show(5, truncate=False)

There are 315 duplicate enteries.
+----------+----------------------------------------------------+---------------------+-------------------+--------------------+-----+
|ISBN      |Book-Title                                          |Book-Author          |Year-Of-Publication|Publisher           |count|
+----------+----------------------------------------------------+---------------------+-------------------+--------------------+-----+
|038542471X|The Client                                          |John Grisham         |1993               |Doubleday Books     |2    |
|039543095X|Cultural Literacy: What Every American Needs to Know|E. D. Hirsch Jr.     |1987               |Houghton Mifflin    |2    |
|044040665X|Beezus and Ramona                                   |Beverly Cleary       |1994               |Yearling Books      |2    |
|077832012X|Decoy                                               |Jasmine Cresswell    |2004               |Mira Books          |2    |
|002089130X|On Death 

In [None]:
# Drop duplicate books:

df_books = df_books.dropDuplicates(subset=['ISBN'])

In [None]:
# As preparation for relating the Ratings data with Books data later, we'll add a row which is unknown per all fields:

newRow = spark.createDataFrame([('Unknown', 'Unknown', 'Unknown', -1, 'Unknown')], ['ISBN', 'Book-Title', 'Book-Author', 'Year-Of-Publication', 'Publisher'])
df_books = df_books.union(newRow)

---
**Book-Title Column**

---

In [None]:
# Looking for punctuation marks:

print('Total rows containing punctuation marks: {}'. format(df_books.filter(col('Book-Title').rlike('[!"#$%&()*+-./:;<=>?@_`{|}~]')).count()))
df_books.filter(col('Book-Title').rlike('[!"#$%&()*+-./:;<=>?@_`{|}~  ]')).sample(fraction=0.1).show(10, truncate=False)

Total rows containing punctuation marks: 156838
+----------+------------------------------------------------------------------+---------------------+-------------------+------------------------+
|ISBN      |Book-Title                                                        |Book-Author          |Year-Of-Publication|Publisher               |
+----------+------------------------------------------------------------------+---------------------+-------------------+------------------------+
|0001382381|Huck Scarry's Steam Train Journey                                 |Huck Scarry          |1979               |HarperCollins Publishers|
|0001711253|The Big Honey Hunt                                                |Stan Berenstein      |1942               |HarperCollins Publishers|
|0001900277|Glue (First Facts - First Skills)                                 |Harriet Hains        |1989               |HarperCollins Publishers|
|0002165368|Arthur C. Clarke's mysterious world                       

We'll look more closely at certain invalid text patterns and punctuation marks to figure out the best way to handle them.

In [None]:
# Viewing Titles with '&amp;' expressions:

print('Total rows containing "&amp;" ', df_books.select('Book-Title').filter(df_books['Book-Title'].contains('&amp;')).count())
df_books.select('Book-Title').filter(df_books['Book-Title'].contains('&amp;')).sample(fraction=0.1).show(2,truncate=False)

Total rows containing "&amp;"  4882
+---------------------------------------------+
|Book-Title                                   |
+---------------------------------------------+
|Frommer's Portable Tampa &amp; St. Petersburg|
|Arts &amp; ideas                             |
+---------------------------------------------+
only showing top 2 rows



In [None]:
# Viewing Titles with ';:' expressions:

print('Total rows containing ";:" ', df_books.select('Book-Title').filter(df_books['Book-Title'].contains(';:')).count())
df_books.select('Book-Title').filter(df_books['Book-Title'].contains(';:')).sample(fraction=0.1).show(2, truncate=False)

Total rows containing ";:"  195
+----------------------------------------------------------------+
|Book-Title                                                      |
+----------------------------------------------------------------+
|The judgement ;: And, In the penal colony (Penguin 60s classics)|
|Modern language classroom techniques;: A handbook               |
+----------------------------------------------------------------+
only showing top 2 rows



In [None]:
# Viewing Titles with ';' expressions:

print('Total rows containing " ; " ', df_books.select('Book-Title').filter(df_books['Book-Title'].contains(' ; ')).count())
df_books.select('Book-Title').filter(df_books['Book-Title'].contains(' ; ')).sample(fraction=0.1).show(2, truncate=False)

Total rows containing " ; "  385
+---------------------------------------------------------------+
|Book-Title                                                     |
+---------------------------------------------------------------+
|Once: Poems (Harvest Book ; Hb 337)                            |
|The Shadow of the Torturer (His the Book of the New Sun ; V. 1)|
+---------------------------------------------------------------+
only showing top 2 rows



In [None]:
# Viewing Titles with '"' expressions:

print('Total rows containing " ', df_books.select('Book-Title').filter(df_books['Book-Title'].contains('"')).count())
df_books.select('Book-Title').filter(df_books['Book-Title'].contains('"')).sample(fraction=0.1).show(2, truncate=False)

Total rows containing "  831
+-----------------------------------------------+
|Book-Title                                     |
+-----------------------------------------------+
|"Unsinkable": The Full Story of the RMS Titanic|
|Tolkien's Art: "a Mythology for England"       |
+-----------------------------------------------+
only showing top 2 rows



In [None]:
# Viewing Titles with '  ' expressions:

print('Total rows containing double space ', df_books.select('Book-Title').filter(df_books['Book-Title'].contains('  ')).count())
df_books.select('Book-Title').filter(df_books['Book-Title'].contains('  ')).sample(fraction=0.1).show(2, truncate=False)

Total rows containing double space  2245
+------------------------------------------------+
|Book-Title                                      |
+------------------------------------------------+
|Thoroughbred #36:  Without Wonder (Thoroughbred)|
|Free Again  (Phantom Stallion #5)               |
+------------------------------------------------+
only showing top 2 rows



In [None]:
# Viewing Titles with '&lt;i>' expressions:

print('Total rows containing &lt;i>', df_books.select('Book-Title').filter(df_books['Book-Title'].contains('&lt;i>')).count())
df_books.select('Book-Title').filter(df_books['Book-Title'].contains('&lt;i>')).show(2, truncate=False)

Total rows containing &lt;i> 15
+-------------------------------------------------------------------------------------------------------------+
|Book-Title                                                                                                   |
+-------------------------------------------------------------------------------------------------------------+
|Starting Your Marriage Right &lt;i>what You Need To Know In The Early Years To Make It Last A Lifetime&lt;/i>|
|Sisters: Book Two Of The River Of Freedom Series &lt;i>a Novel&lt;/i>                                        |
+-------------------------------------------------------------------------------------------------------------+
only showing top 2 rows



In [None]:
# Viewing Titles with '&lt;/i>' expressions:

print('Total rows containing &lt;/i>', df_books.select('Book-Title').filter(df_books['Book-Title'].contains('&lt;/i>')).count())
df_books.select('Book-Title').filter(df_books['Book-Title'].contains('&lt;/i>')).show(2, truncate=False)

Total rows containing &lt;/i> 15
+-------------------------------------------------------------------------------------------------------------+
|Book-Title                                                                                                   |
+-------------------------------------------------------------------------------------------------------------+
|Starting Your Marriage Right &lt;i>what You Need To Know In The Early Years To Make It Last A Lifetime&lt;/i>|
|Sisters: Book Two Of The River Of Freedom Series &lt;i>a Novel&lt;/i>                                        |
+-------------------------------------------------------------------------------------------------------------+
only showing top 2 rows



In [None]:
# Viewing Titles with '`' expressions:

print('Total rows containing `', df_books.select('Book-Title').filter(df_books['Book-Title'].contains('`')).count())
df_books.select('Book-Title').filter(df_books['Book-Title'].contains('`')).show(2, truncate=False)

Total rows containing ` 11
+--------------------------------------------------------------------------------------+
|Book-Title                                                                            |
+--------------------------------------------------------------------------------------+
|Escape from Intimacy : Untangling the ``Love'' Addictions: Sex, Romance, Relationships|
|The Coldest March: Scott`s Fatal Antarctic Expedition                                 |
+--------------------------------------------------------------------------------------+
only showing top 2 rows



In [None]:
# Cleaning expressions in 'Book-Title':

df_books = df_books.withColumn('Book-Title', F.regexp_replace('Book-Title', '&amp;', '&'))
df_books = df_books.withColumn('Book-Title', F.regexp_replace('Book-Title', ';:', ':'))
df_books = df_books.withColumn('Book-Title', F.regexp_replace('Book-Title', ';', ':'))
df_books = df_books.withColumn('Book-Title', F.regexp_replace('Book-Title', '"', "'"))
df_books = df_books.withColumn('Book-Title', F.regexp_replace('Book-Title', '&lt;i>', ""))
df_books = df_books.withColumn('Book-Title', F.regexp_replace('Book-Title', '&lt;/i>', ""))
df_books = df_books.withColumn('Book-Title', F.regexp_replace('Book-Title', '`', "'"))

while (df_books.select('Book-Title').filter(df_books['Book-Title'].contains('  ')).count() > 0):
  df_books = df_books.withColumn('Book-Title', F.regexp_replace('Book-Title', '  ', ' '))

df_books = df_books.withColumn('Book-Title', trim(col('Book-Title')))
df_books = df_books.withColumn('Book-Title', upper(col('Book-Title')))

---
**Book-Author Column**

---

In [None]:
# Looking for punctuation marks:

print('Total rows containing punctuation marks: {}'.format(df_books.filter(col('Book-Author').rlike('[!"#$%&()*+-./:;<=>?@_`{|}~]')).count()))
df_books.filter(col('Book-Author').rlike('[!"#$%&()*+-./:;<=>?@_`{|}~  ]')).show(10, truncate=False)

Total rows containing punctuation marks: 40771
+----------+--------------------------------------------------------------+-----------------------------+-------------------+---------------------------+
|ISBN      |Book-Title                                                    |Book-Author                  |Year-Of-Publication|Publisher                  |
+----------+--------------------------------------------------------------+-----------------------------+-------------------+---------------------------+
|0000913154|THE WAY THINGS WORK: AN ILLUSTRATED ENCYCLOPEDIA OF TECHNOLOGY|C. van Amerongen (translator)|1967               |Simon &amp; Schuster       |
|0001046438|LIAR                                                          |Stephen Fry                  |0                  |Harpercollins Uk           |
|0001046934|THE PRIME OF MISS JEAN BRODIE                                 |Muriel Spark                 |1999               |Trafalgar Square Publishing|
|0001047213|THE FIGHTING MAN 

In [None]:
# Viewing Authors with '&amp;' expressions:

print('Total rows containing "&amp;" ', df_books.select('Book-Author').filter(df_books['Book-Author'].contains('&amp;')).count())
df_books.select('Book-Author').filter(df_books['Book-Author'].contains('&amp;')).sample(fraction=0.1).show(2, truncate=False)

Total rows containing "&amp;"  270
+----------------------------+
|Book-Author                 |
+----------------------------+
|Mary-Kate &amp; Ashley Olsen|
|Mary-Kate &amp; Ashley Olsen|
+----------------------------+
only showing top 2 rows



In [None]:
# Viewing Authors with ';' expressions:

print('Total rows containing ";" ', df_books.select('Book-Author').filter(df_books['Book-Author'].contains(';')).count())
df_books.select('Book-Author').filter(df_books['Book-Author'].contains(';')).sample(fraction=0.1).show(2, truncate=False)

Total rows containing ";"  280
+---------------------------------------+
|Book-Author                            |
+---------------------------------------+
|Lerza&amp;                             |
|William &amp; Johnson, Virginia Masters|
+---------------------------------------+
only showing top 2 rows



In [None]:
# Viewing Authors with '"' expressions:

print('Total rows containing " ', df_books.select('Book-Author').filter(df_books['Book-Author'].contains('"')).count())
df_books.select('Book-Author').filter(df_books['Book-Author'].contains('"')).sample(fraction=0.1).show(2, truncate=False)

Total rows containing "  47
+-----------+
|Book-Author|
+-----------+
|Miss" "Read|
|"Lulu"     |
+-----------+
only showing top 2 rows



In [None]:
# Viewing Authors with '  ' expressions:

print('Total rows containing double space ', df_books.select('Book-Author').filter(df_books['Book-Author'].contains('  ')).count())
df_books.select('Book-Author').filter(df_books['Book-Author'].contains('  ')).sample(fraction=0.1).show(2, truncate=False)

Total rows containing double space  1310
+-------------+
|Book-Author  |
+-------------+
|Janet  Groene|
|Oren  Harari |
+-------------+
only showing top 2 rows



In [None]:
# Cleaning expressions in 'Book-Author':

df_books = df_books.withColumn('Book-Author', F.regexp_replace('Book-Author', '&amp;', 'and'))
df_books = df_books.withColumn('Book-Author', F.regexp_replace('Book-Author', '~', " "))
df_books = df_books.withColumn('Book-Author', F.regexp_replace('Book-Author', '[^a-zA-Z ]', ''))

while (df_books.select('Book-Author').filter(df_books['Book-Author'].contains('  ')).count() > 0):
       df_books = df_books.withColumn('Book-Author', F.regexp_replace('Book-Author', '  ', ' '))


df_books = df_books.withColumn('Book-Author', trim(col('Book-Author')))
df_books = df_books.withColumn('Book-Author', upper(col('Book-Author')))

---
**Year-Of-Publication Column**

---

In [None]:
# Checking for nulls

NullYear = df_books.filter(col('Year-Of-Publication').isNull()).count()
print('There are {} null Year-Of-Publication values.'.format(NullYear))

There are 0 null Year-Of-Publication values.


In [None]:
# Statistics of variable "Year-Of-Publication":

df_books.select("Year-Of-Publication").describe().show(truncate=False)

+-------+-------------------+
|summary|Year-Of-Publication|
+-------+-------------------+
|count  |270949             |
|mean   |1960.156413199532  |
|stddev |256.5047202699509  |
|min    |0                  |
|max    |2050               |
+-------+-------------------+



The 'Year-Of-Publication' min value (0) and max value (2050) don't make sense.

In [None]:
# As the dataset was curated at 2004, we check to find how many "Year-Of-Publication" higher than 2004 are listed:

FutureYear = df_books.filter(df_books["Year-Of-Publication"] > 2004).count()
print('There are {} books with Year-Of-Publication higher than 2004.'.format(FutureYear))
df_books.groupby("Year-Of-Publication").count().where(col('Year-Of-Publication') > 2004).sort(desc("Year-Of-Publication")).show(truncate=False)

There are 72 books with Year-Of-Publication higher than 2004.
+-------------------+-----+
|Year-Of-Publication|count|
+-------------------+-----+
|2050               |2    |
|2038               |1    |
|2037               |1    |
|2030               |7    |
|2026               |1    |
|2024               |1    |
|2021               |1    |
|2020               |3    |
|2012               |1    |
|2011               |2    |
|2010               |2    |
|2008               |1    |
|2006               |3    |
|2005               |46   |
+-------------------+-----+



Books started to get published in the 15th century with the invention of the printing press. Therefore, we'll assume a minimum reasonable Year-Of-Publication for our purpose is 1500.


In [None]:
# We check to find how many "Year-Of-Publication" lower than 1500 are listed:

LowYear = df_books.filter(df_books["Year-Of-Publication"] < 1500).count()
print('There are {} books with Year-Of-Publication lower than 1500.'.format(LowYear))

There are 4559 books with Year-Of-Publication lower than 1500.


In [None]:
# Viewing what seems to be incorrect years in context of the dataframe:

df_books.filter((df_books['Year-Of-Publication'] > 2004) | (df_books['Year-Of-Publication'] < 1500)).sample(fraction=0.1).show(5, truncate=False)

+----------+-----------+------------------+-------------------+-----------------------+
|ISBN      |Book-Title |Book-Author       |Year-Of-Publication|Publisher              |
+----------+-----------+------------------+-------------------+-----------------------+
|0002257378|MOST WANTED|JACQUELYN MITCHARD|0                  |Harpercollins          |
|0006474950|MARINE LIFE|LINDA SVENDSEN    |0                  |Harpercollins Publisher|
|0006485294|MINUS TIME |CATHERINE BUSH    |0                  |Harpercollins Publisher|
|0006548040|JULIP      |JIM HARRISON      |0                  |Flamingo               |
|0006550231|STATEMENT  |BRIAN MOORE       |0                  |Flamingo               |
+----------+-----------+------------------+-------------------+-----------------------+
only showing top 5 rows



As the dataset was collected in 2004, we will assume that the books with year 2005 are still reasonable as the data collection might have been updated shortly after curating it. The fact that there are 46 of these books from 2005 contributed to this intuition. We also randomally googled some of these books ISBN to confirm this.
However, all 'Year-Of-Publication' above 2005 should be considered as errors. Same goes for the entries with 'Year-Of-Publication' lower than 1500.
We'll convert all unreasonable years to '-1' (equivalent to Unknown).

In [None]:
# Replacing incorrect values with -1:

df_books = df_books.withColumn("Year-Of-Publication",
       when(col("Year-Of-Publication") < 1500 ,-1).when(col("Year-Of-Publication") > 2005 ,-1).otherwise(col("Year-Of-Publication")))

In [None]:
# Verifying min and max years:

df_books.select("Year-Of-Publication").where(col('Year-Of-Publication') != -1).describe().show(truncate=False)

+-------+-------------------+
|summary|Year-Of-Publication|
+-------+-------------------+
|count  |266364             |
|mean   |1993.6893499121502 |
|stddev |8.144241390384382  |
|min    |1806               |
|max    |2005               |
+-------+-------------------+



---
**Publisher Column**

---

In [None]:
# Looking for punctuation marks:

print('Total rows containing punctuations: {}'.format(df_books.filter(col('Publisher').rlike('[!"#$%&()*+-./:;<=>?@_`{|}~]')).count()))
df_books.filter(col('Publisher').rlike('[!"#$%&()*+-./:;<=>?@_`{|}~]')).show(10, truncate=False)

Total rows containing punctuations: 46528
+----------+--------------------------------------------------------------+--------------------------+-------------------+-----------------------------------------------------------------+
|ISBN      |Book-Title                                                    |Book-Author               |Year-Of-Publication|Publisher                                                        |
+----------+--------------------------------------------------------------+--------------------------+-------------------+-----------------------------------------------------------------+
|0000913154|THE WAY THINGS WORK: AN ILLUSTRATED ENCYCLOPEDIA OF TECHNOLOGY|C VAN AMERONGEN TRANSLATOR|1967               |Simon &amp; Schuster                                             |
|0002219980|CONTRACT                                                      |GERALD SEYMOUR            |1980               |Harper &amp; Collins                                             |
|000222674X|M

In [None]:
# Viewing Publishers with '&amp;' expressions:

print('Total rows containing "&amp;" ', df_books.select('Publisher').filter(df_books['Publisher'].contains('&amp;')).count())
df_books.select('Publisher').filter(df_books['Publisher'].contains('&amp;')).sample(fraction=0.1).show(2, truncate=False)

Total rows containing "&amp;"  15764
+-----------------------------+
|Publisher                    |
+-----------------------------+
|Simon &amp; Schuster         |
|Holmes &amp; Meier Publishers|
+-----------------------------+
only showing top 2 rows



In [None]:
# Viewing Publishers with ' ; ' expressions:

print('Total rows containing " ; " ', df_books.select('Publisher').filter(df_books['Publisher'].contains(' ; ')).count())
df_books.select('Publisher').filter(df_books['Publisher'].contains(' ; ')).show(2, truncate=False)

Total rows containing " ; "  4
+------------------------------------------------------------+
|Publisher                                                   |
+------------------------------------------------------------+
|Allen &amp; Unwin ; distributed by Books Reps ( New Zealand)|
|Editions du NoroÃ£it ; Buschek Books                        |
+------------------------------------------------------------+
only showing top 2 rows



In [None]:
# Viewing Publishers with '"' expressions:

print('Total rows containing " ', df_books.select('Publisher').filter(df_books['Publisher'].contains('"')).count())
df_books.select('Publisher').filter(df_books['Publisher'].contains('"')).sample(fraction=0.5).show(2, truncate=False)

Total rows containing "  16
+-------------------+
|Publisher          |
+-------------------+
|Wydawn. PTTK "Kraj"|
|Wydawn. PTTK "Kraj"|
+-------------------+
only showing top 2 rows



In [None]:
# Viewing Publishers with '  ' expressions:

print('Total rows containing double space ', df_books.select('Publisher').filter(df_books['Publisher'].contains('  ')).count())
df_books.select('Publisher').filter(df_books['Publisher'].contains('  ')).show(2, truncate=False)

Total rows containing double space  2
+-----------------------+
|Publisher              |
+-----------------------+
|Batsford Brassey,  Inc.|
|Ku PaÂ°a  Publishing   |
+-----------------------+



In [None]:
# Cleaning expressions in 'Publisher':

df_books = df_books.withColumn('Publisher', F.regexp_replace('Publisher', '&amp;', 'and'))
df_books = df_books.withColumn('Publisher', F.regexp_replace('Publisher', '[^a-zA-Z ]', ''))

while (df_books.select('Publisher').filter(df_books['Publisher'].contains('  ')).count() > 0):
       df_books = df_books.withColumn('Publisher', F.regexp_replace('Publisher', '  ', ' '))


df_books = df_books.withColumn('Publisher', trim(col('Publisher')))
df_books = df_books.withColumn('Publisher', upper(col('Publisher')))

---
**General Cleaning**

---

In [None]:
# Checking for nulls:

Dict_Null = {col:df_books.filter(df_books[col].isNull()).count() for col in df_books.columns}
Dict_Null

{'ISBN': 0,
 'Book-Title': 0,
 'Book-Author': 0,
 'Year-Of-Publication': 0,
 'Publisher': 0}

In [None]:
# Checking for 'UNKNOWN' values: 

Dict_UNKNOWN = {col:df_books.filter(df_books[col] == 'UNKNOWN').count() for col in df_books.columns}
Dict_UNKNOWN

{'ISBN': 0,
 'Book-Title': 1,
 'Book-Author': 44,
 'Year-Of-Publication': 0,
 'Publisher': 2}

In [None]:
# Checking for some typical values that should be converted to 'Unknown' for consistency: 

Dict_NA = {col:df_books.filter(df_books[col] == 'NA').count() for col in df_books.columns}
Dict_NA

{'ISBN': 0,
 'Book-Title': 0,
 'Book-Author': 6,
 'Year-Of-Publication': 0,
 'Publisher': 2}

In [None]:
# Checking for some typical values that should be converted to 'Unknown' for consistency: 

Dict_NotAvailable = {col:df_books.filter(df_books[col] == 'NOT AVAILABLE').count() for col in df_books.columns}
Dict_NotAvailable

{'ISBN': 0,
 'Book-Title': 0,
 'Book-Author': 9,
 'Year-Of-Publication': 0,
 'Publisher': 0}

In [None]:
# Checking for some typical values that should be converted to 'Unknown' for consistency: 

Dict_NotApplicable = {col:df_books.filter(df_books[col] == 'NOT APPLICABLE NA').count() for col in df_books.columns}
Dict_NotApplicable

{'ISBN': 0,
 'Book-Title': 0,
 'Book-Author': 286,
 'Year-Of-Publication': 0,
 'Publisher': 0}

In [None]:
# Replacing typical expressions values with 'UNKNOWN' for 'Book-Author' column:

df_books = df_books.withColumn("Book-Author",
       when(col("Book-Author") == 'NA', 'UNKNOWN')
       .when(col("Book-Author") == 'NOT AVAILABLE', 'UNKNOWN')
       .when(col("Book-Author") == 'NOT APPLICABLE', 'UNKNOWN')
       .otherwise(col("Book-Author")))

while (df_books.select('Book-Author').filter(df_books['Book-Author'].contains('  ')).count() > 0):
       df_books = df_books.withColumn('Book-Author', F.regexp_replace('Book-Author', '  ', ' '))

In [None]:
# Replacing typical expressions values with 'UNKNOWN' for 'Publisher' column:

df_books = df_books.withColumn("Publisher",
       when(col("Publisher") == 'NA', 'UNKNOWN')
       .when(col("Publisher") == 'NOT APPLICABLE', 'UNKNOWN')
       .otherwise(col("Publisher")))

while (df_books.select('Publisher').filter(df_books['Publisher'].contains('  ')).count() > 0):
       df_books = df_books.withColumn('Publisher', F.regexp_replace('Publisher', '  ', ' '))

In [None]:
# Summary of counts and distinct counts for all columns:

df_books.select(df_books.columns).summary('count', 'count_distinct').show(truncate=False)

+--------------+------+----------+-----------+-------------------+---------+
|summary       |ISBN  |Book-Title|Book-Author|Year-Of-Publication|Publisher|
+--------------+------+----------+-----------+-------------------+---------+
|count         |270950|270950    |270950     |270950             |270950   |
|count_distinct|270950|238880    |97577      |101                |16178    |
+--------------+------+----------+-----------+-------------------+---------+



In [None]:
df_books.sample(fraction=0.1).show(truncate=False)

+----------+--------------------------------------------------------------------------------------+------------------+-------------------+---------------------------------------------------------------+
|ISBN      |Book-Title                                                                            |Book-Author       |Year-Of-Publication|Publisher                                                      |
+----------+--------------------------------------------------------------------------------------+------------------+-------------------+---------------------------------------------------------------+
|0001053744|PEARL AND SIR ORFEO                                                                   |J R R TOLKIEN     |1999               |TRAFALGAR SQUARE PUBLISHING                                    |
|0001848445|THE COAL HOUSE T/PB                                                                   |ANDREW TAYLOR     |1986               |HARPERCOLLINS PUBLISHERS                          

In [None]:
# Saving cleaned dataset:

df_books.coalesce(1).write.format('csv').options(header=True, delimiter=';').save('/content/drive/MyDrive/Spark/ProjectBookCrossing/Clean-Books.csv')



---
## **USERS DATASET**
---



In [3]:
# Install PyCountry

!pip install pycountry
import pycountry as pc

# Install python-levenshtein

!pip install python-levenshtein
import Levenshtein as lev

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pycountry
  Downloading pycountry-22.3.5.tar.gz (10.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.1/10.1 MB[0m [31m83.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: pycountry
  Building wheel for pycountry (pyproject.toml) ... [?25l[?25hdone
  Created wheel for pycountry: filename=pycountry-22.3.5-py2.py3-none-any.whl size=10681845 sha256=9ced6a02f371b678e45cb81a6ea6aa971afb9c867fde86e735df923901fe09d0
  Stored in directory: /root/.cache/pip/wheels/e2/aa/0f/c224e473b464387170b83ca7c66947b4a7e33e8d903a679748
Successfully built pycountry
Installing collected packages: pycountry
Successfully installed pycountry-22.3.5
Looking in indexes: https://pypi.o

In [None]:
# Defining a schema:

data_schema = StructType([StructField('User-ID', IntegerType(), True), 
               StructField('Location', StringType(), True),
               StructField('Age', IntegerType(), True)])

# Load Users dataset into a dataframe:

df_users = spark.read.options(delimiter=";", encoding='windows-1252' ,header=True).csv('/content/drive/MyDrive/Spark/ProjectBookCrossing/BX-Users.csv', schema = data_schema)


# First look at the data: 

df_users.sample(fraction=0.1).show(5, truncate=False)
print('Total rows in the tadaset: {}\n'.format(df_users.count()))
df_users.printSchema()

+-------+-------------------------------+----+
|User-ID|Location                       |Age |
+-------+-------------------------------+----+
|3      |moscow, yukon territory, russia|null|
|14     |mediapolis, iowa, usa          |null|
|16     |albuquerque, new mexico, usa   |null|
|22     |erfurt, thueringen, germany    |null|
|30     |anchorage, alaska, usa         |24  |
+-------+-------------------------------+----+
only showing top 5 rows

Total rows in the tadaset: 278859

root
 |-- User-ID: integer (nullable = true)
 |-- Location: string (nullable = true)
 |-- Age: integer (nullable = true)



---
**User-ID Column**

---

In [None]:
# Checking for duplicates:

df_users.select("User-ID").summary('count', 'count_distinct').show(truncate=False)

TotalCount = df_users.select("User-ID").summary('count').head()[1]
DistinctCount = df_users.select("User-ID").summary('count_distinct').head()[1]

if TotalCount == DistinctCount:
  print('No Duplicates')
else:
  print(TotalCount - DistinctCount, 'Duplicates')

# Validating that User-ID's are numeric values:
print('\n')

nonNumeric = df_users.filter(col('User-ID').rlike('[^0-9]')).count()
if nonNumeric == 0:
  print('User-ID values are numbers only')
else:
  print('There are {} non numeric User-ID values'.format(nonNumeric))

# Checking for nulls:
print('\n')

df_users.filter(df_users['User-ID'].isNull()).show()

NullCount = df_users.filter(df_users['User-ID'].isNull()).count()

if NullCount == 0:
  print('No null values')
else:
  print('Nulls: {} value\\s'.format(NullCount))

  # Checking for empty space:
print('\n')

df_users.filter(df_users['User-ID']=='').show()

EmptyCount = df_users.filter(df_users['User-ID']=='').count()

if EmptyCount == 0:
  print('No Empty space values')
else:
  print('Empty space: {} value\\s'.format(NullCount))

+--------------+-------+
|summary       |User-ID|
+--------------+-------+
|count         |278858 |
|count_distinct|278858 |
+--------------+-------+

No Duplicates


User-ID values are numbers only


+-------+--------+----+
|User-ID|Location| Age|
+-------+--------+----+
|   null|    NULL|null|
+-------+--------+----+

Nulls: 1 value\s


+-------+--------+---+
|User-ID|Location|Age|
+-------+--------+---+
+-------+--------+---+

No Empty space values


In [None]:
# Dropping nulls:

df_users = df_users.na.drop(subset='User-ID')

---
**Location Column**

---

First we'll split the column into 3 separate columns: City, State, Country and explore the data in each column.

In [None]:
# Uppercase:

df_users = df_users.withColumn("Location", upper(col('Location')))

# Splitting the column:

df_users = df_users.withColumn('City', split(df_users['Location'], ',').getItem(0)) \
       .withColumn('State', split(df_users['Location'], ',').getItem(1)) \
       .withColumn('Country', split(df_users['Location'], ',').getItem(2))

# Dropping the original 'Location' column:

df_users = df_users.drop("Location")


df_users.sample(0.1).show(5, truncate=False)

+-------+----+----------+-----------+-------+
|User-ID|Age |City      |State      |Country|
+-------+----+----------+-----------+-------+
|7      |null|WASHINGTON| DC        | USA   |
|10     |26  |ALBACETE  | WISCONSIN | SPAIN |
|35     |17  |GRAFTON   | WISCONSIN | USA   |
|41     |14  |SANTEE    | CALIFORNIA| USA   |
|58     |null|EDMONTON  | ALBERTA   | CANADA|
+-------+----+----------+-----------+-------+
only showing top 5 rows



In [None]:
# Peeking at punctuation marks and numbers that we'll need to clean out:

df_users.filter(col('City').rlike('[0-9!"#$%&()*+,-./:;<=>?@^_`{|}~]')).sample(0.1).show(10, truncate=False)
df_users.filter(col('State').rlike('[0-9!"#$%&()*+,-./:;<=>?@^_`{|}~]')).sample(0.1).show(10, truncate=False)
df_users.filter(col('Country').rlike('[0-9!"#$%&()*+,-./:;<=>?@^_`{|}~]')).sample(0.1).show(10, truncate=False)

+-------+----+-----------------+--------------------+----------+
|User-ID|Age |City             |State               |Country   |
+-------+----+-----------------+--------------------+----------+
|120    |13  |VILLENEUVE D`ASCQ| NORD               | FRANCE   |
|1648   |67  |MONT-SAINT-AIGNAN| NORMANDIE          | FRANCE   |
|2065   |16  |N/A              | N/A                | SINGAPORE|
|2197   |26  |N/A              | TRAVELLING         | CANADA   |
|2229   |27  |SODDY-DAISY      | TENNESSEE          | USA      |
|2279   |18  |COLOGNE/KÖLN     | NORDRHEIN-WESTFALEN| GERMANY  |
|2563   |26  |BERLIN-KREUZBERG | BERLIN             | GERMANY  |
|3203   |null|ST. CATHARINES   | ONTARIO            | CANADA   |
|3367   |30  |MEM-MARTINS      | N/A                | PORTUGAL |
|3702   |48  |FT.STEWART       | GEORGIA            | USA      |
+-------+----+-----------------+--------------------+----------+
only showing top 10 rows

+-------+----+------------+-------------------+---------------+


There are many issues with the City, State, and Country columns:
1. There are whitespaces.
2. There are City values which belong to State or Country, and vice versa.
3. There is a clutter of many punctuation marks.
4. There are numbers that need to be removed.
5. There are misspelling and intentional user incorrect values (i.e. 'SPACE').

In [None]:
# Replacing '&' with 'AND':
df_users = df_users.withColumn('City', F.regexp_replace('City', "&", "AND"))
df_users = df_users.withColumn('State', F.regexp_replace('State', "&", "AND"))
df_users = df_users.withColumn('Country', F.regexp_replace('Country', "&", "AND"))

# Replacing '-' with ' ':
df_users = df_users.withColumn('City', F.regexp_replace('City', "-", " "))
df_users = df_users.withColumn('State', F.regexp_replace('State', "-", " "))
df_users = df_users.withColumn('Country', F.regexp_replace('Country', "-", " "))

# Replacing "'" with '':
df_users = df_users.withColumn('City', F.regexp_replace('City', "'", ""))
df_users = df_users.withColumn('State', F.regexp_replace('State', "'", ""))
df_users = df_users.withColumn('Country', F.regexp_replace('Country', "'", ""))

# Replacing punctuation marks: 

df_users = df_users.withColumn('City', F.regexp_replace('City', '[!"#$%()*+,./:;<=>?@^_`{|}~]', ""))
df_users  = df_users.withColumn('State', F.regexp_replace('State', '[!"#$%()*+,./:;<=>?@^_`{|}~]', ""))
df_users = df_users.withColumn('Country', F.regexp_replace('Country', '[!"#$%()*+,./:;<=>?@^_`{|}~]', ""))

# Replacing numbers: 

df_users = df_users.withColumn('City', F.regexp_replace('City', '[0-9]', ""))
df_users  = df_users.withColumn('State', F.regexp_replace('State', '[0-9]', ""))
df_users = df_users.withColumn('Country', F.regexp_replace('Country', '[0-9]', ""))

# Trimming whitespace:

df_users = df_users.withColumn("City", trim(df_users.City))
df_users = df_users.withColumn("State", trim(df_users.State))
df_users = df_users.withColumn("Country", trim(df_users.Country))

# Removing extra spaces:

while (df_users.select('City').filter(df_users['City'].contains('  ')).count() > 0):
  df_users = df_users.withColumn('City', F.regexp_replace('City', '  ', ' '))

while (df_users.select('State').filter(df_users['State'].contains('  ')).count() > 0):
  df_users = df_users.withColumn('State', F.regexp_replace('State', '  ', ' '))

while (df_users.select('Country').filter(df_users['Country'].contains('  ')).count() > 0):
  df_users = df_users.withColumn('Country', F.regexp_replace('Country', '  ', ' '))


  # Replacing 'SAINT ' with 'ST ':
df_users = df_users.withColumn('City', F.regexp_replace('City', "SAINT ", "ST "))
df_users = df_users.withColumn('State', F.regexp_replace('State', "SAINT ", "ST "))
df_users = df_users.withColumn('Country', F.regexp_replace('Country', "SAINT ", "ST "))

# Replacing 'FORT ' with 'FT ':
df_users = df_users.withColumn('City', F.regexp_replace('City', "FORT ", "FT "))
df_users = df_users.withColumn('State', F.regexp_replace('State', "FORT ", "FT "))
df_users = df_users.withColumn('Country', F.regexp_replace('Country', "FORT ", "FT "))

# Replacing 'TOWNSHIP' with 'TWP':
df_users = df_users.withColumn('City', F.regexp_replace('City', "TOWNSHIP", "TWP"))
df_users = df_users.withColumn('State', F.regexp_replace('State', "TOWNSHIP", "TWP"))
df_users = df_users.withColumn('Country', F.regexp_replace('Country', "TOWNSHIP", "TWP"))


# Replacing 'UNKNOWN' 'NONE' 'NA' and '' with nulls:

df_users = df_users.withColumn('City', when(df_users['City']=='UNKNOWN', None).when(df_users['City']=='NONE', None)
                                      .when(df_users['City']=='NA', None).when(df_users['City']=='', None).otherwise(df_users['City']))
df_users = df_users.withColumn('State', when(df_users['State']=='UNKNOWN', None).when(df_users['State']=='NONE', None)
                                      .when(df_users['State']=='NA', None).when(df_users['State']=='', None).otherwise(df_users['State']))
df_users = df_users.withColumn('Country', when(df_users['Country']=='UNKNOWN', None).when(df_users['Country']=='NONE', None)
                                      .when(df_users['Country']=='NA', None).when(df_users['Country']=='', None).otherwise(df_users['Country']))

In [None]:
df_users.show(truncate=False)

+-------+----+--------------+---------------+--------------+
|User-ID|Age |City          |State          |Country       |
+-------+----+--------------+---------------+--------------+
|1      |null|NYC           |NEW YORK       |USA           |
|2      |18  |STOCKTON      |CALIFORNIA     |USA           |
|3      |null|MOSCOW        |YUKON TERRITORY|RUSSIA        |
|4      |17  |PORTO         |VNGAIA         |PORTUGAL      |
|5      |null|FARNBOROUGH   |HANTS          |UNITED KINGDOM|
|6      |61  |SANTA MONICA  |CALIFORNIA     |USA           |
|7      |null|WASHINGTON    |DC             |USA           |
|8      |null|TIMMINS       |ONTARIO        |CANADA        |
|9      |null|GERMANTOWN    |TENNESSEE      |USA           |
|10     |26  |ALBACETE      |WISCONSIN      |SPAIN         |
|11     |14  |MELBOURNE     |VICTORIA       |AUSTRALIA     |
|12     |null|FT BRAGG      |CALIFORNIA     |USA           |
|13     |26  |BARCELONA     |BARCELONA      |SPAIN         |
|14     |null|MEDIAPOLIS

In [None]:
# Checking how many nulls in each column:

CityNulls = df_users.filter(col('City').isNull()).count()
StateNulls = df_users.filter(col('State').isNull()).count()
CountryNulls = df_users.filter(col('Country').isNull()).count()

print("City Nulls: {}, State Nulls: {}, Country Nulls: {}".format(CityNulls, StateNulls, CountryNulls))

City Nulls: 765, State Nulls: 17457, Country Nulls: 4739


Uploading a file downloaded from data.world website containing 195 country names, 3401 state names and 132149 city names around the world.

In [None]:
# Load 'country-state-city' csv file:
data_schema = StructType([StructField('Country_Name', StringType(), True), 
               StructField('State_Name', StringType(), True),
               StructField('City_Name', StringType(), True)])

world = spark.read.options(delimiter=",", header=False).csv('/content/drive/MyDrive/Spark/ProjectBookCrossing/country-state-city.csv', schema = data_schema)

# >> Applying all cleaning transformations on world dataframe to achieve consistency:

# Uppercase:

world = world.withColumn("City_Name", upper(col('City_Name')))
world = world.withColumn("State_Name", upper(col('State_Name')))
world = world.withColumn("Country_Name", upper(col('Country_Name')))

# Replacing '&' with 'AND':

world = world.withColumn('City_Name', regexp_replace('City_Name', "&", "AND"))
world = world.withColumn('State_Name', regexp_replace('State_Name', "&", "AND"))
world = world.withColumn('Country_Name', regexp_replace('Country_Name', "&", "AND"))

# Replacing '-' with ' ':

world = world.withColumn('City_Name', regexp_replace('City_Name', "-", " "))
world = world.withColumn('State_Name', regexp_replace('State_Name', "-", " "))
world = world.withColumn('Country_Name', regexp_replace('Country_Name', "-", " "))

# Replacing "'" with '':

world = world.withColumn('City_Name', regexp_replace('City_Name', "'", ""))
world = world.withColumn('State_Name', regexp_replace('State_Name', "'", ""))
world = world.withColumn('Country_Name', regexp_replace('Country_Name', "'", ""))

# Replacing punctuation marks: 

world = world.withColumn('City_Name', F.regexp_replace('City_Name', '[!"#$%()*+,./:;<=>?@^_`{|}~]', ""))
world = world.withColumn('State_Name', F.regexp_replace('State_Name', '[!"#$%()*+,./:;<=>?@^_`{|}~]', ""))
world = world.withColumn('Country_Name', F.regexp_replace('Country_Name', '[!"#$%()*+,./:;<=>?@^_`{|}~]', ""))

# Replacing numbers: 

world = world.withColumn('City_Name', F.regexp_replace('City_Name', '[0-9]', ""))
world = world.withColumn('State_Name', F.regexp_replace('State_Name', '[0-9]', ""))
world = world.withColumn('Country_Name', F.regexp_replace('Country_Name', '[0-9]', ""))

# Trimming whitespace:

world = world.withColumn("City_Name", trim(world.City_Name))
world = world.withColumn("State_Name", trim(world.State_Name))
world = world.withColumn("Country_Name", trim(world.Country_Name))

# Removing extra spaces:

while (world.select('City_Name').filter(world['City_Name'].contains('  ')).count() > 0):
  world = world.withColumn('City_Name', F.regexp_replace('City_Name', '  ', ' '))

while (world.select('State_Name').filter(world['State_Name'].contains('  ')).count() > 0):
  world = world.withColumn('State_Name', F.regexp_replace('State_Name', '  ', ' '))

while (world.select('Country_Name').filter(world['Country_Name'].contains('  ')).count() > 0):
  world = world.withColumn('Country_Name', F.regexp_replace('Country_Name', '  ', ' '))

# Replacing 'SAINT with 'ST':

world = world.withColumn('City_Name', regexp_replace('City_Name', "SAINT ", "ST "))
world = world.withColumn('State_Name', regexp_replace('State_Name', "SAINT ", "ST "))
world = world.withColumn('Country_Name', regexp_replace('Country_Name', "SAINT ", "ST "))

# Replacing 'Fort' with 'FT':

world = world.withColumn('City_Name', regexp_replace('City_Name', "FORT ", "FT "))
world = world.withColumn('State_Name', regexp_replace('State_Name', "FORT ", "FT "))
world = world.withColumn('Country_Name', regexp_replace('Country_Name', "FORT ", "FT "))

# Replacing 'TOWNSHIP with 'TWP':

world = world.withColumn('City_Name', regexp_replace('City_Name', "TOWNSHIP", "TWP"))
world = world.withColumn('State_Name', regexp_replace('State_Name', "TOWNSHIP", "TWP"))
world = world.withColumn('Country_Name', regexp_replace('Country_Name', "TOWNSHIP", "TWP"))

world.sample(0.1).show(5, truncate=False)

+------------+----------+---------+
|Country_Name|State_Name|City_Name|
+------------+----------+---------+
|AFGHANISTAN |BALKH     |KHULM    |
|AFGHANISTAN |BAMYAN    |PANJĀB   |
|AFGHANISTAN |FARYAB    |ANDKHOY  |
|AFGHANISTAN |FARYAB    |MAYMANA  |
|AFGHANISTAN |HELMAND   |GERESHK  |
+------------+----------+---------+
only showing top 5 rows



---
**City Column**

---

In [None]:
# Preparing storage lists for cities not found by validation functions and a city dictionary to store validated cities map:

city_not_found_1 = []
city_not_found_2 = []
city_not_found_final = []
city_names_dict = {}

# Creating a list of cities need to be validated:

cities_list = df_users.rdd.map(lambda x:x.City).collect()
cities_list = list(set(cities_list))

# Creating a list of cities from the csv world file to validate city names against it:

world_cities_list = world.rdd.map(lambda x:x.City_Name).collect()
world_cities_list = list(set(world_cities_list))

In [None]:
# Defining functions for City names validation:

def city_in_file():

  '''Iterating and comparing city names to list of cities uploaded from csv file.
  Entering valid names into a dictionary'''

  for ct in cities_list:

    if ct in world_cities_list:
      try:
        city_names_dict[ct] = ct
      except:
        continue
    else:
      city_not_found_1.append(ct)


def check_pycountry_city():

  '''Iterating and comparing city names to pycountry.subdivisions.
  Entering valid names into a dictionary.'''

  for ct in city_not_found_1:
    try:
      city_names_dict[ct] = pc.subdivisions.lookup(ct).name.upper()
    except:
      city_not_found_2.append(ct)


def city_lev_distance():

  '''Iterating and comparing city names to list of cities uploaded from csv file, in terms of their Levenshtein distance.
  Transformation is conditioned by value length and distance to try and minimize wrong transformations. 
  Entering valid names into a dictionary.'''

  for ct in city_not_found_2:
    try:
      if len(ct) >= 7:
        for wct in world_cities_list:
          if lev.distance(ct, wct) == 1:
            city_names_dict[ct] = wct
            break
          else:
            city_not_found_final.append(ct)
      else:
        city_not_found_final.append(ct)
    except:
      city_not_found_final.append(ct)

In [None]:
# Calling the city functions and transforming values based on the dictionary created:

city_in_file()
check_pycountry_city()
city_lev_distance()

CityUDF = udf(lambda x : city_names_dict[x] if x in city_names_dict.keys() else 'UNKNOWN' , StringType())
df_users = df_users.withColumn('City_New', CityUDF(col('City')))

In [None]:
# Counting how many 'UNKNOWN' cities we ended up with:

df_users.filter(col('City_New')=='UNKNOWN').count()

20557

In [None]:
# Exploring which cities couldn't be verified:

df_users.filter(col('City_New')=='UNKNOWN').groupBy('City').count().sort(desc(col('count'))).show(truncate=False)

+-----------------+-----+
|City             |count|
+-----------------+-----+
|null             |765  |
|MÜNCHEN          |346  |
|ISTANBUL         |258  |
|BANGALORE        |157  |
|PALMA DE MALLORCA|136  |
|HONG KONG        |120  |
|GIJON            |117  |
|ZURICH           |114  |
|A CORUÑA         |109  |
|CAPITAL FEDERAL  |107  |
|DEN HAAG         |106  |
|NY               |101  |
|DUESSELDORF      |90   |
|BRUXELLES        |77   |
|NYC              |68   |
|MEXICO CITY      |67   |
|NUERNBERG        |56   |
|MUENCHEN         |54   |
|LA               |53   |
|MT PLEASANT      |53   |
+-----------------+-----+
only showing top 20 rows



In [None]:
# Exploring counts of cities that were verified:

df_users.groupBy('City_New').count().sort(desc('count')).show(truncate=False)

+-------------+-----+
|City_New     |count|
+-------------+-----+
|UNKNOWN      |20557|
|LONDON       |4110 |
|BARCELONA    |2673 |
|TORONTO      |2346 |
|MADRID       |1935 |
|SYDNEY       |1885 |
|MELBOURNE    |1871 |
|PORTLAND     |1870 |
|VANCOUVER    |1707 |
|CHICAGO      |1569 |
|SEATTLE      |1542 |
|NEW YORK     |1448 |
|MILANO       |1390 |
|SAN DIEGO    |1348 |
|SAN FRANCISCO|1314 |
|BERLIN       |1309 |
|OTTAWA       |1304 |
|HOUSTON      |1243 |
|PARIS        |1191 |
|LOS ANGELES  |1048 |
+-------------+-----+
only showing top 20 rows



In [None]:
# Exploring the original City column vs the transformed City_New column (in the context of the State and Country columns) to examine the transformations:

df_users.sample(0.1).where((df_users['City']!=df_users['City_New']) & (df_users['City_New']!='UNKNOWN')).show(30, truncate=False)

+-------+----+-------------+----------------------------+--------------+-------------+
|User-ID|Age |City         |State                       |Country       |City_New     |
+-------+----+-------------+----------------------------+--------------+-------------+
|104    |47  |FOXBORO      |MASSACHUSETTS               |USA           |ROXBORO      |
|1307   |null|BRIDGENORTH  |ONTARIO                     |CANADA        |BRIDGNORTH   |
|1767   |null|BISTRITA     |BISTRITA N                  |ROMANIA       |BISTRICA     |
|2488   |65  |TAUBATE      |SAO PAULO                   |BRAZIL        |TAUBATÉ      |
|2511   |31  |THESSALONIKI |null                        |GREECE        |THESSALONÍKI |
|3456   |null|BIRMUNGHAM   |ALABAMA                     |USA           |BIRMINGHAM   |
|3974   |38  |DUSSELDORF   |NORDRHEIN WESTFALEN         |GERMANY       |DÜSSELDORF   |
|4496   |26  |CORDOBA      |CORDOBA                     |ARGENTINA     |CORDOVA      |
|4620   |50  |CASTELAR     |null           

In [None]:
# Dropping the original City column and renaming the City_New column:

df_users = df_users.drop('City')
df_users= df_users.withColumnRenamed('City_New', 'City')

In [None]:
# Dropping city lists and dictionaries to free-up RAM:

del city_not_found_1, city_not_found_2, city_not_found_final, city_names_dict, cities_list, world_cities_list

---
**State Column**

---

In [None]:
# Filling null or empty 'State' cells with 'City' values, as it might help turn null state values by their corresponding city values:

df_users = df_users.withColumn('State', when(df_users['State'].isNull(), df_users['City']).otherwise(df_users['State']))

df_users = df_users.withColumn('State', when(df_users['State'] == '', df_users['City']).otherwise(df_users['State']))

In [None]:
# Preparing storage lists for states not found by validation functions and a state dictionary to store validated states map:

state_not_found_1 = []
state_not_found_2 = []
state_not_found_final = []
state_names_dict = {}

# Creating a list of states need to be validated:

states_list = df_users.rdd.map(lambda x:x.State).collect()
states_list = list(set(states_list))

# Creating a list of states from the csv world file to validate state names against it:
world_states_list = world.rdd.map(lambda x:x.State_Name).collect()
world_states_list = list(set(world_states_list))

In [None]:
# Defining functions for State names validation:

def check_pycountry_state():

  '''Iterating and comparing state names to pycountry.subdivisions.
  Entering valid names into a dictionary.'''

  for st in states_list:
    temp_list = []

    try:
      country_code = pc.countries.search_fuzzy(st)[0].alpha_2
      for i in range(len(list(pc.subdivisions.get(country_code=country_code)))):
        name = list(pc.subdivisions.get(country_code=country_code))[i].name.upper()
        temp_list.append(name)
      if st in temp_list:
        state_names_dict[st] = st
    except:
      state_not_found_1.append(st)


def state_in_file():

  '''Iterating and comparing state names to list of states uploaded from csv file.
  Entering valid names into a dictionary'''

  for st in state_not_found_1:

    if st in world_states_list:
      try:
        state_names_dict[st] = st
      except:
        state_not_found_2.append(st)
    else:
      state_not_found_2.append(st)


def state_lev_distance():

  '''Iterating and comparing state names to list of states uploaded from csv file, in terms of their Levenshtein distance.
  Transformation is conditioned by value length and distance to try and minimize wrong transformations. 
  Entering valid names into a dictionary.'''

  for st in state_not_found_2:
    try:
      if len(st) >= 7:
        for wst in world_states_list:
          if lev.distance(st, wst) == 1:
            state_names_dict[st] = wst
            break
          else:
            continue
    except:
      state_not_found_final.append(st)

In [None]:
# Calling the state functions and transforming values based on the dictionary created:

check_pycountry_state()
state_in_file()
state_lev_distance()


StateUDF = udf(lambda x : state_names_dict[x] if x in state_names_dict.keys() else 'UNKNOWN' , StringType())
df_users = df_users.withColumn('State_New', StateUDF(col('State')))

In [None]:
# Counting how many 'UNKNOWN' states we ended up with:

df_users.filter(col('State_New')=='UNKNOWN').count()

50312

In [None]:
# Exploring which states couldn't be verified:

df_users.filter(col('State_New')=='UNKNOWN').groupBy('State').count().sort(desc(col('count'))).show(truncate=False)

+-------------------+-----+
|State              |count|
+-------------------+-----+
|NORDRHEIN WESTFALEN|3946 |
|GEORGIA            |3362 |
|UNKNOWN            |2393 |
|BADEN WUERTTEMBERG |1978 |
|BARCELONA          |1825 |
|CATALUNYA          |1229 |
|LONDON             |1001 |
|RHEINLAND PFALZ    |754  |
|SINGAPORE          |748  |
|WALES              |671  |
|DC                 |629  |
|ZUID HOLLAND       |510  |
|NOORD HOLLAND      |418  |
|STOCKHOLM          |373  |
|NRW                |371  |
|ITALY              |348  |
|YUKON TERRITORY    |292  |
|NEWFOUNDLAND       |290  |
|CATALUÑA           |284  |
|SHANGHAI           |265  |
+-------------------+-----+
only showing top 20 rows



In [None]:
# Exploring counts of states that were verified:

df_users.groupBy('State_New').count().sort(desc('count')).show(truncate=False)

+----------------+-----+
|State_New       |count|
+----------------+-----+
|UNKNOWN         |50312|
|CALIFORNIA      |19901|
|ENGLAND         |10692|
|ONTARIO         |8739 |
|TEXAS           |8393 |
|NEW YORK        |7891 |
|FLORIDA         |7028 |
|PENNSYLVANIA    |6085 |
|ILLINOIS        |5877 |
|WASHINGTON      |5835 |
|BRITISH COLUMBIA|5403 |
|OHIO            |4685 |
|MICHIGAN        |4588 |
|OREGON          |4323 |
|VIRGINIA        |4312 |
|MASSACHUSETTS   |3971 |
|MISSOURI        |3925 |
|NORTH CAROLINA  |3731 |
|NEW JERSEY      |3603 |
|VICTORIA        |3402 |
+----------------+-----+
only showing top 20 rows



In [None]:
# Exploring the original State column vs the transformed State_New column (in the context of the City and Country columns) to examine the transformations:

df_users.sample(0.9).where((df_users['State']!=df_users['State_New']) & (df_users['State_New']!='UNKNOWN')).show(30, truncate=False)

+-------+----+----------------+-----------+-------------------------+-----------------+
|User-ID|Age |State           |Country    |City                     |State_New        |
+-------+----+----------------+-----------+-------------------------+-----------------+
|206    |60  |ILE DE FRANCE   |FRANCE     |VERSAILLES               |ÎLE DE FRANCE    |
|341    |50  |ILE DE FRANCE   |FRANCE     |UNKNOWN                  |ÎLE DE FRANCE    |
|525    |29  |ILE DE FRANCE   |FRANCE     |ISSY LES MOULINEAUX      |ÎLE DE FRANCE    |
|577    |null|KHOZESTAN       |IRAN       |ANDIMESHK                |KHUZESTAN        |
|712    |29  |BASELLAND       |SWITZERLAND|PRATTELN                 |BASEL LAND       |
|1442   |36  |ILE DE FRANCE   |FRANCE     |PARIS                    |ÎLE DE FRANCE    |
|1581   |null|SHANANXI        |CHINA      |UNKNOWN                  |SHAANXI          |
|2224   |23  |ILE DE FRANCE   |FRANCE     |PARIS                    |ÎLE DE FRANCE    |
|2416   |21  |SEVILLE         |S

In [None]:
# Dropping the original State column and renaming the State_New column:

df_users = df_users.drop('State')
df_users= df_users.withColumnRenamed('State_New', 'Region')

In [None]:
# Dropping states lists and dictionaries to free-up RAM:

del state_not_found_1, state_not_found_2, state_not_found_final, state_names_dict, states_list, world_states_list

---
**Country Column**

---

In [None]:
# Filling null or empty 'Country' cells with either 'State' or 'City' values, as it might help turn null country values by their corresponding state values:

df_users = df_users.withColumn('Country', when(df_users['Country'].isNull(), df_users['Region']).otherwise(df_users['Country']))

df_users = df_users.withColumn('Country', when(df_users['Country'] == '', df_users['Region']).otherwise(df_users['Country']))

In [None]:
# Preparing storage lists for countries not found by validation functions and a country dictionary to store validated countries map:

country_not_found_1 = []
country_not_found_2 =[]
country_not_found_final = []
country_names_dict = {}

# Creating a list of countries need to be validated:

countries_list = df_users.rdd.map(lambda x:x.Country).collect()
countries_list = list(set(countries_list))

# Creating a list of countries from the csv world file to validate country names against it:
world_countries_list = world.rdd.map(lambda x:x.Country_Name).collect()
world_countries_list = list(set(world_countries_list))

In [None]:
# Defining functions for Country names validation:

def check_pycountry_country():

  '''Iterating and comparing country names to pycountry.countries.
  Entering valid names into a dictionary.'''

  for cn in countries_list:

    try:
      country_names_dict[cn] = (pc.countries.search_fuzzy(cn))[0].name.upper()
    except:
      try:
        country_names_dict[cn] = (pc.countries.lookup(pc.subdivisions.lookup(cn).code))[0].name.upper()
      except:
        country_not_found_1.append(cn)


def country_in_file():

  '''Iterating and comparing country names to list of countries uploaded from csv file.
  Entering valid names into a dictionary'''

  for cn in country_not_found_1:

    if cn in world_countries_list:
      try:
        country_names_dict[cn] = cn
      except:
        continue
    else:
      country_not_found_2.append(cn)


def country_lev_distance():

  '''Iterating and comparing country names to list of countries uploaded from csv file, in terms of their Levenshtein distance.
  Transformation is conditioned by value length and distance to try and minimize wrong transformations. 
  Entering valid names into a dictionary.'''

  for cn in country_not_found_2:
    try:
      if len(cn) >= 5:
        for wcn in world_countries_list:
          if lev.distance(cn, wcn) == 1:
            country_names_dict[cn] = wcn
            break
          else:
            country_not_found_final.append(cn)
    except:
      country_not_found_final.append(cn)

In [None]:
# Calling the country functions and transforming values based on the dictionary created:

check_pycountry_country()
country_in_file()
country_lev_distance()


CountryUDF = udf(lambda x : country_names_dict[x] if x in country_names_dict.keys() else 'UNKNOWN' , StringType())
df_users = df_users.withColumn('Country_New', CountryUDF(col('Country')))

In [None]:
# Counting how many 'UNKNOWN' countries we ended up with:

df_users.filter(col('Country_New')=='UNKNOWN').count()

4294

In [None]:
# Exploring which countries couldn't be verified:

df_users.filter(col('Country_New')=='UNKNOWN').groupBy('Country').count().sort(desc(col('count'))).show(truncate=False)

+-----------------+-----+
|Country          |count|
+-----------------+-----+
|UNKNOWN          |2998 |
|ENGLAND          |232  |
|YUGOSLAVIA       |174  |
|ESPAÑA           |69   |
|SCOTLAND         |54   |
|LA FRANCE        |34   |
|LITALIA          |30   |
|DEUTSCHLAND      |28   |
|UAE              |21   |
|BURMA            |18   |
|CARIBBEAN SEA    |15   |
|BADEN WÜRTTEMBERG|14   |
|ST LOUIS         |13   |
|EAST AFRICA      |12   |
|COTE DIVOIRE     |11   |
|ALDERNEY         |10   |
|CATALONIA        |9    |
|VIENNA           |8    |
|ÖÐ¹Ú             |8    |
|LA ARGENTINA     |8    |
+-----------------+-----+
only showing top 20 rows



In [None]:
# Exploring counts of countries that were verified:

df_users.groupBy('Country_New').count().sort(desc('count')).show(truncate=False)

+--------------+------+
|Country_New   |count |
+--------------+------+
|UNITED STATES |140327|
|CANADA        |21767 |
|UNITED KINGDOM|18473 |
|GERMANY       |17123 |
|SPAIN         |13317 |
|AUSTRALIA     |11873 |
|ITALY         |11566 |
|UNKNOWN       |4294  |
|FRANCE        |3495  |
|PORTUGAL      |3396  |
|NEW ZEALAND   |3147  |
|NETHERLANDS   |3077  |
|SWITZERLAND   |1772  |
|BRAZIL        |1695  |
|CHINA         |1480  |
|SWEDEN        |1452  |
|INDIA         |1300  |
|AUSTRIA       |1155  |
|MALAYSIA      |1123  |
|ARGENTINA     |1089  |
+--------------+------+
only showing top 20 rows



In [None]:
# Exploring the original Country column vs the transformed Country_New column (in the context of the City and State columns) to examine the transformations:

df_users.sample(0.9).where((df_users['Country']!=df_users['Country_New']) & (df_users['Country_New']!='UNKNOWN')).show(30, truncate=False)

+-------+----+----------------+--------------+--------------+------------------+
|User-ID|Age |Country         |City          |Region        |Country_New       |
+-------+----+----------------+--------------+--------------+------------------+
|1      |null|USA             |UNKNOWN       |NEW YORK      |UNITED STATES     |
|2      |18  |USA             |STOCKTON      |CALIFORNIA    |UNITED STATES     |
|3      |null|RUSSIA          |MOSCOW        |UNKNOWN       |RUSSIAN FEDERATION|
|6      |61  |USA             |SANTA MONICA  |CALIFORNIA    |UNITED STATES     |
|7      |null|USA             |WASHINGTON    |UNKNOWN       |UNITED STATES     |
|9      |null|USA             |GERMANTOWN    |TENNESSEE     |UNITED STATES     |
|12     |null|USA             |FT BRAGG      |CALIFORNIA    |UNITED STATES     |
|16     |null|USA             |ALBUQUERQUE   |NEW MEXICO    |UNITED STATES     |
|17     |null|USA             |CHESAPEAKE    |VIRGINIA      |UNITED STATES     |
|23     |null|USA           

In [None]:
# Dropping the original Country column and renaming the Country_New column:

df_users = df_users.drop('Country')
df_users= df_users.withColumnRenamed('Country_New', 'Country')

In [None]:
# Dropping country lists and dictionaries to free-up RAM:

del country_not_found_1, country_not_found_2, country_not_found_final, country_names_dict, countries_list, world_countries_list

---
**Age Column**

---

In [None]:
# Viewing the 'Age' column:

df_users.select('Age').show(10, truncate=False)

+----+
|Age |
+----+
|null|
|18  |
|null|
|17  |
|null|
|61  |
|null|
|null|
|null|
|26  |
+----+
only showing top 10 rows



In [None]:
# How many nulls in the 'Age' column?

print('Nulls count: ', df_users.filter(df_users['Age'].isNull()).count())
print('Nulls percentage: ', (df_users.filter(df_users['Age'].isNull()).count() / df_users.select('Age').count()), '%')

Nulls count:  110762
Nulls percentage:  0.3971985741847105 %


In [None]:
# What's the min and max age?
df_users.agg({'Age' : 'min'}).show()
df_users.agg({'Age' : 'max'}).show()

+--------+
|min(Age)|
+--------+
|       0|
+--------+

+--------+
|max(Age)|
+--------+
|     244|
+--------+



Age 0 doesn't make sense, nor doe's age 244. We'll assume acceptable min age for book reviews is 8 and max acceptable age is 95.  

In [None]:
print('Unreasonable values count: ', df_users.filter((df_users['Age'] < 8) | (df_users['Age'] > 95)).count())
print('Unreasonable values percentage: ', (df_users.filter((df_users['Age'] < 8) | (df_users['Age'] > 95)).count() / df_users.select('Age').count()), '%')

Unreasonable values count:  1338
Unreasonable values percentage:  0.00479814098932073 %


About 40% of the age column are nulls or unreasonable values.

In [None]:
# Setting nulls and unreasonable values to -1 (equivalent to Unknown):

df_users = df_users.withColumn('Age', when(df_users['Age'] < 8, -1)
                                      .when(df_users['Age'] > 95, -1)
                                      .when(df_users['Age'].isNull(), -1)
                                      .otherwise(df_users['Age']))

In [None]:
df_users.show(10, truncate=False)

+-------+---+------------+----------+------------------+
|User-ID|Age|City        |Region    |Country           |
+-------+---+------------+----------+------------------+
|1      |-1 |UNKNOWN     |NEW YORK  |UNITED STATES     |
|2      |18 |STOCKTON    |CALIFORNIA|UNITED STATES     |
|3      |-1 |MOSCOW      |UNKNOWN   |RUSSIAN FEDERATION|
|4      |17 |PORTO       |UNKNOWN   |PORTUGAL          |
|5      |-1 |FARNBOROUGH |UNKNOWN   |UNITED KINGDOM    |
|6      |61 |SANTA MONICA|CALIFORNIA|UNITED STATES     |
|7      |-1 |WASHINGTON  |UNKNOWN   |UNITED STATES     |
|8      |-1 |TIMMINS     |ONTARIO   |CANADA            |
|9      |-1 |GERMANTOWN  |TENNESSEE |UNITED STATES     |
|10     |26 |ALBACETE    |WISCONSIN |SPAIN             |
+-------+---+------------+----------+------------------+
only showing top 10 rows



In [None]:
# Saving cleaned dataset:

df_users.coalesce(1).write.format('csv').options(header=True, delimiter=';').save('/content/drive/MyDrive/Spark/ProjectBookCrossing/Clean-Users.csv')

---
## **RATINGS DATASET**
---

In [None]:
# Install isbnlib:

!pip install isbnlib
import isbnlib as ISBN

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting isbnlib
  Downloading isbnlib-3.10.12-py2.py3-none-any.whl (66 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m66.5/66.5 KB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: isbnlib
Successfully installed isbnlib-3.10.12


In [None]:
# Defining a schema:

data_schema = StructType([StructField('User-ID', IntegerType(), True), 
               StructField('ISBN', StringType(), True),
               StructField('Book-Rating', IntegerType(), True)])


# Loading the Ratings dataset into a dataframe:

df_ratings = spark.read.options(delimiter='""', header=True).csv('/content/drive/MyDrive/Spark/ProjectBookCrossing/BX-Book-Ratings.csv')

In [None]:
df_ratings.show(5, truncate=False)

+---------+----------+---+-----------+---+
|"User-ID;|ISBN      |;  |Book-Rating|",,|
+---------+----------+---+-----------+---+
|"276725; |034545104X|;  |0          |",,|
|"276726; |0155061224|;  |5          |",,|
|"276727; |0446520802|;  |0          |",,|
|"276729; |052165615X|;  |3          |",,|
|"276729; |0521795028|;  |6          |",,|
+---------+----------+---+-----------+---+
only showing top 5 rows



In [None]:
# Dropping columns:

df_ratings = df_ratings.drop(';', '",,')
df_ratings.show(5, truncate=False)

+---------+----------+-----------+
|"User-ID;|ISBN      |Book-Rating|
+---------+----------+-----------+
|"276725; |034545104X|0          |
|"276726; |0155061224|5          |
|"276727; |0446520802|0          |
|"276729; |052165615X|3          |
|"276729; |0521795028|6          |
+---------+----------+-----------+
only showing top 5 rows



In [None]:
# Viewing the schema:

print('There are {} rows in the dataset.\n'.format(df_ratings.count()))

df_ratings.printSchema()

There are 1048575 rows in the dataset.

root
 |-- "User-ID;: string (nullable = true)
 |-- ISBN: string (nullable = true)
 |-- Book-Rating: string (nullable = true)



---
**User-ID Column**

---

In [None]:
# Renaming 'User-ID':

df_ratings = df_ratings.withColumnRenamed('"User-ID;', 'User-ID')
df_ratings.show(5, truncate=False)

+--------+----------+-----------+
|User-ID |ISBN      |Book-Rating|
+--------+----------+-----------+
|"276725;|034545104X|0          |
|"276726;|0155061224|5          |
|"276727;|0446520802|0          |
|"276729;|052165615X|3          |
|"276729;|0521795028|6          |
+--------+----------+-----------+
only showing top 5 rows



In [None]:
# Replacing '"' with '' and ';' with '' per 'User-ID':

df_ratings = df_ratings.withColumn('User-ID', F.regexp_replace('User-ID', '"', ''))
df_ratings = df_ratings.withColumn('User-ID', F.regexp_replace('User-ID', ';', ''))
df_ratings = df_ratings.withColumn('User-ID', trim('User-ID'))
df_ratings.show(5, truncate=False)

+-------+----------+-----------+
|User-ID|ISBN      |Book-Rating|
+-------+----------+-----------+
|276725 |034545104X|0          |
|276726 |0155061224|5          |
|276727 |0446520802|0          |
|276729 |052165615X|3          |
|276729 |0521795028|6          |
+-------+----------+-----------+
only showing top 5 rows



In [None]:
# Checking if there are non-numeric characters in 'User-ID' values:

NonNum = df_ratings.select('User-ID').filter(df_ratings['User-ID'].rlike("[^0-9]")).count()
print('There are {} non-numeric values.\n'.format(NonNum))

# Checking if there are values in User-ID longer than expected length = 6:

TooLong = df_ratings.select('User-ID').filter(length(df_ratings['User-ID']) > 6).count()
print('There are {} values longer than expected length (6).\n'.format(TooLong))

# Checking if there are nulls in 'User-ID' values:

NullNum = df_ratings.select('User-ID').filter(df_ratings['User-ID'].isNull()).count()
print('There are {} null values.\n'.format(NullNum))

There are 0 non-numeric values.

There are 0 values longer than expected length (6).

There are 0 null values.



In [None]:
# Casting 'User-ID' to IntegerType:

df_ratings = df_ratings.withColumn("User-ID", df_ratings['User-ID'].cast('int'))
df_ratings.printSchema()

root
 |-- User-ID: integer (nullable = true)
 |-- ISBN: string (nullable = true)
 |-- Book-Rating: string (nullable = true)



---
**ISBN Column**

---

In [None]:
# Uppercase column ISBN:

df_ratings = df_ratings.withColumn('ISBN', upper(col('ISBN')))

# Trimming column ISBN:

df_ratings = df_ratings.withColumn('ISBN', trim(col('ISBN')))

In [None]:
# Checking for nulls or empty spaces:

NumNull = df_ratings.filter(col('ISBN').isNull()).count()
NumEmpt = df_ratings.filter(col('ISBN')=='').count()

print('There are {} Nulls and {} empty spaces in ISBN column'.format(NumNull, NumEmpt))

There are 0 Nulls and 0 empty spaces in ISBN column


In [None]:
# Checking if there are characters in 'ISBN' that are not digits or the letter 'X':

InvalidChar = df_ratings.filter(df_ratings['ISBN'].rlike("[^0-9Xx]")).count()
print("There are {} ISBN's that include invalid characters.".format(InvalidChar))
df_ratings.filter(df_ratings['ISBN'].rlike("[^0-9X]")).show(truncate=False)

There are 2203 ISBN's that include invalid characters.
+-------+-------------+-----------+
|User-ID|ISBN         |Book-Rating|
+-------+-------------+-----------+
|276762 |B0000BLD7X   |0          |
|276762 |N3453124715  |4          |
|276884 |B158991965   |6          |
|276929 |2.02.032126.2|0          |
|276929 |2.264.03602.8|0          |
|276959 |680ISBN359623|6          |
|277305 |O6712345670  |7          |
|278054 |88O6166255   |8          |
|278418 |0451E64100   |0          |
|278491 |01420.01740  |10         |
|278559 |DITISEENSOORT|0          |
|183    |100940/86    |9          |
|183    |10622/86     |0          |
|183    |10745/85     |0          |
|183    |10756/85     |0          |
|183    |127420/98    |8          |
|183    |13440/86     |7          |
|183    |15019/87     |7          |
|183    |16560/87     |10         |
|183    |16785/87     |7          |
+-------+-------------+-----------+
only showing top 20 rows



In [None]:
# Extracting all invalid characters:

df_ratings = df_ratings.withColumn('ISBN', F.regexp_replace('ISBN', '[^0-9X]', ''))

In [None]:
# Checking if there are 'ISBN's in different length than expected length of 10:

InvalidLength = df_ratings.filter(length(col('ISBN')) != 10).count()
print("There are {} ISBN's that are invalid length.".format(InvalidLength))
df_ratings.filter(length(col('ISBN')) != 10).show(10, truncate=False)

There are 9503 ISBN's that are invalid length.
+-------+------------+-----------+
|User-ID|ISBN        |Book-Rating|
+-------+------------+-----------+
|276745 |342310538   |10         |
|276762 |342662429   |0          |
|276762 |00007X      |0          |
|276856 |20103389    |0          |
|276861 |344242529   |9          |
|276875 |00273755    |7          |
|276875 |014366020444|8          |
|276875 |88741800047 |8          |
|276884 |899792145   |0          |
|276884 |158991965   |6          |
+-------+------------+-----------+
only showing top 10 rows



In [None]:
def validate_isbn(x):

  try:
    x = ISBN.get_isbnlike(x)
    if ISBN.is_isbn13(x[0]) is True:
      return ISBN.to_isbn10(x[0])
    elif ISBN.is_isbn10(x[0]) is True:
      return x[0]
    else:
      return None
  except:
    return None


isbnUDF = udf(lambda x: validate_isbn(x))
df_ratings = df_ratings.withColumn('ISBN', isbnUDF(col('ISBN')))

In [None]:
# How many ISBN's haven't been validated? 

NullISBN = df_ratings.filter(col('ISBN').isNull()).count()
print('There are {} invalid ISBN found and transformed to Nulls.'.format(NullISBN))

There are 12730 invalid ISBN found and transformed to Nulls.


In [None]:
# Dropping null ISBN's:

df_ratings = df_ratings.dropna(subset=['ISBN'])

In [None]:
# Creating a list of validated ISBN from the Books data file in order to use it in a function to validate ISBN's in the Ratings data file.

ISBN_list = df_books.rdd.map(lambda x:x.ISBN).collect()

In [None]:
# Transform ISBN columns by searching for them in the books data file and if not found, return ISBN as 'UNKNOWN':

FindisbnUDF = udf(lambda x: x if x in ISBN_list else 'Unknown', StringType())
df_ratings = df_ratings.withColumn('ISBN_New', FindisbnUDF(col('ISBN')))


In [None]:
df_ratings.show(truncate=False)

+-------+----------+-----------+----------+
|User-ID|ISBN      |Book-Rating|ISBN_New  |
+-------+----------+-----------+----------+
|276725 |034545104X|0          |034545104X|
|276726 |0155061224|5          |0155061224|
|276727 |0446520802|0          |0446520802|
|276729 |052165615X|3          |052165615X|
|276729 |0521795028|6          |0521795028|
|276733 |2080674722|0          |2080674722|
|276736 |3257224281|8          |Unknown   |
|276737 |0600570967|6          |Unknown   |
|276744 |038550120X|7          |038550120X|
|276746 |0425115801|0          |0425115801|
|276746 |0449006522|0          |0449006522|
|276746 |0553561618|0          |0553561618|
|276746 |055356451X|0          |055356451X|
|276746 |0786013990|0          |0786013990|
|276746 |0786014512|0          |0786014512|
|276747 |0060517794|9          |0060517794|
|276747 |0451192001|0          |0451192001|
|276747 |0609801279|0          |0609801279|
|276747 |0671537458|9          |0671537458|
|276747 |0679776818|8          |

---
**Book-Rating Column**

---

In [None]:
# Checking for nulls in 'Book-Rating' values:

Nnulls = df_ratings.select('Book-Rating').filter(df_ratings['Book-Rating'].isNull()).count()
print('There are {} null ratings.'.format(Nnulls))

There are 0 null ratings.


In [None]:
# Checking that all rating values are numeric:

NonNumer = df_ratings.filter(col('Book-Rating').rlike('[^0-9]')).count()
print('There are {} non-numeric rating values.'.format(NonNumer))

There are 0 non-numeric rating values.


In [None]:
# Casting 'Book-Rating' to IntegerType:

df_ratings = df_ratings.withColumn("Book-Rating", df_ratings['Book-Rating'].cast('int'))
df_ratings.printSchema()

root
 |-- User-ID: integer (nullable = true)
 |-- ISBN: string (nullable = true)
 |-- Book-Rating: integer (nullable = true)
 |-- ISBN_New: string (nullable = true)



In [None]:
# Checking for duplicates:

NumDup = df_ratings.groupBy(['User-ID', 'ISBN', 'Book-Rating']).count().filter('count > 1').count()
print('There are {} duplicates.'.format(NumDup))
df_ratings.groupBy(['User-ID', 'ISBN', 'Book-Rating']).count().filter('count > 1').show(truncate=False)

There are 17 duplicates.
+-------+----------+-----------+-----+
|User-ID|ISBN      |Book-Rating|count|
+-------+----------+-----------+-----+
|56178  |0553275690|7          |2    |
|247490 |0425180638|5          |2    |
|6092   |0330258648|0          |2    |
|11676  |8433966324|0          |2    |
|84784  |0140177396|8          |2    |
|141971 |1550544683|8          |2    |
|175886 |0722184492|0          |2    |
|179075 |0533132681|9          |2    |
|115490 |0345443284|0          |2    |
|212898 |0451195663|0          |2    |
|164292 |0446610100|10         |2    |
|144131 |0006144942|0          |2    |
|216444 |0006121837|0          |2    |
|225087 |0440202043|0          |2    |
|14229  |3927658316|8          |2    |
|91931  |0770422632|0          |2    |
|183669 |0060911131|10         |2    |
+-------+----------+-----------+-----+



In [None]:
# Drop duplicates:

df_ratings = df_ratings.dropDuplicates()

In [None]:
# Check unique ratings in 'Book-Rating':

df_ratings.groupBy('Book-Rating').count().sort(asc('Book-Rating')).show(truncate=False)

+-----------+------+
|Book-Rating|count |
+-----------+------+
|0          |644256|
|1          |1559  |
|2          |2443  |
|3          |5349  |
|4          |7875  |
|5          |46305 |
|6          |33220 |
|7          |68965 |
|8          |93702 |
|9          |61313 |
|10         |70841 |
+-----------+------+



---
**Explicit-Implicit Ratings**

---

*   We can see that 'Book-Rating' is between 0-10.
*   One way of distinguishing types of users feedback is whether they are explicit or implicit.
*   **Explicit Ratings - when the users actively provide information about their prefence of an item by rating it.**
*   In this dataset, ratings between 1-10 given by users are explicit ratings.
*   **Implicit Ratings - when users preferences are derived from observing their behavior.** 
*   In this dataset, ratings equal to 0 are implicit ratings.
*   We'll consider this when analizing the data (in Tableau), by analizing explicit/implicit ratings seperately.









In [None]:
# Dropping the original ISBN column:

df_ratings = df_ratings.drop('ISBN')

In [None]:
# Saving cleaned datasets:

df_ratings.coalesce(1).write.format('csv').options(header=True, delimiter=';').save('/content/drive/MyDrive/Spark/ProjectBookCrossing/Clean-Ratings.csv')