Big Data Project- Adam, Ami, Gal


# Contents 

1. Connecting to Spark Agent & to drive in order to uplaod files, creating pathways for files

2. Preparing Books Dataframe & creating the DIM version

3. preparing Users Dataframe & creating the DIM version

4. Preparing Rating Dataframe & creating FACT version

5. Writing new DIMS & FACT to CSV format

6. Extras

##1. Connection!

In [None]:

!apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null   # !apt-get --> install java
!wget -q https://dlcdn.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
!tar xf spark-3.3.0-bin-hadoop3.tgz 
!pip install -q findspark 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop3"
import findspark
findspark.init("spark-3.3.0-bin-hadoop3") 
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
from pyspark.sql import Row
from pyspark.sql import functions as f



In [None]:
#Installing a library that confirms an ISBN as valid

!pip install isbnlib

from isbnlib import is_isbn10, is_isbn13


In [None]:

#Connecting to Google Drive
from google.colab import drive
drive.mount('/content/drive')


In [None]:
#Setting Up personal Folder Paths' Data Files reside (Remove '#' where relevant)

#Adam's File location
FolderPath = '/content/drive/MyDrive/Big Data Project'

#Ami's File location: 
#FolderPath =  '/content/drive/Othercomputers/Ami computer /Ami/טכניון/big data/Big Data Project/data'

#Gal's File location: 
#FolderPath = 

In [None]:
# reading files

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

#uploaded in standard format
BookRatings_DF = spark.read.csv(FolderPath + '/BX-Book-Ratings.csv', header = True, inferSchema= True)

#Best way to upload file without losing data, parsing via a delimiter before reading   
Books_DF = (spark.read.option("delimiter", ';"').option("header", True).format("csv").load(FolderPath + '/BX-Books.csv'))

#Users CSV is missing headers, creating schema with headers before the read
schema = StructType([
    StructField("UserstoSPLIT", StringType(), True),
    StructField("State", StringType(), True),
    StructField("AgeSPLIT", StringType(), True)])

Users_DF = spark.read.csv(FolderPath + '/BX-Users.csv', header = True, schema = schema) # Using schema made above 
       

In [None]:
#Lets Have a look at the mess
BookRatings_DF.show(truncate = False)
Books_DF.show()
Users_DF.show()


##2. Preparing Book Dataframe

In [None]:
#Cleaning Books DataFrame

#More Libraries
import re
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col,isnan,when,count



# IMPORTANT, special character '&' created a delimiter in wrong place, this little fix saved 5000+ rows of data!

for C1 in Books_DF.columns:
  BooksDF_1 = Books_DF.withColumn(C1, f.regexp_replace(C1, '&amp;', 'and'))



#Getting rid of junk characters
for C1 in BooksDF_1.columns:
  BooksDF_1 = BooksDF_1.withColumn(C1, f.regexp_replace(C1, '[^0-9a-zA-Z $]+', ''))

BooksDF_1 = BooksDF_1.select([f.col(col).alias(re.sub("-","_",col)) for col in BooksDF_1.columns])

BooksDF_1 = BooksDF_1.select([f.col(col).alias(re.sub("[^0-9a-zA-Z_$]+","",col)) for col in BooksDF_1.columns])

#Trimming
from pyspark.sql.functions import trim
for C1 in BooksDF_1.columns:
  BooksDF_1 = BooksDF_1.withColumn(C1, trim(col(C1)))

#Getting rid of empty values
for c1 in BooksDF_1.columns:
  BooksDF_1 = BooksDF_1.withColumn(c1, when(col(c1) == '', 'unknown').otherwise(col(c1)))

#Finding out how many nulls in dataset
#for some reason only works with import before code
print('Null Count Before Clean')
from pyspark.sql.functions import col,isnan,when,count
BooksDF_1.select([count(when(isnan(C1) | col(C1).isNull(), C1)).alias(C1) for C1 in BooksDF_1.columns]
   ).show()

#Fill Nulls with 'Unkown' because values are qualitative 
BooksDF_1 = BooksDF_1.fillna('Unknown')

#No Nulls
print('Null Count after Clean')
from pyspark.sql.functions import col,isnan,when,count
BooksDF_1.select([count(when(isnan(C1) | col(C1).isNull(), C1)).alias(C1) for C1 in BooksDF_1.columns]
   ).show()

#Lowecase All
for C1 in BooksDF_1.columns:
    BooksDF_1 = BooksDF_1.withColumn(C1, f.lower(f.col(C1)))


#Check Nulls are filled with unknown
print('After filling empty values as unknown, we count the database again')
print('Unknown Count')
from pyspark.sql.functions import col,isnan,when,count
BooksDF_1.select([count(when(col(C1)== 'unknown', C1)).alias(C1) for C1 in BooksDF_1.columns]
   ).show()

BooksforFilter = BooksDF_1 

# 4000 rows have '0' as publish year
Filter= BooksforFilter.filter( \
    col("Year_Of_Publication") < 1000 \
  ) 
print('Count of Years with improbable values 0-1000')
Filter.describe().show(truncate=False)

#Turning unusual years to unknown 
BooksDF_1 = BooksDF_1.withColumn('Year_Of_Publication', when((col('Year_Of_Publication') < 200) | \
                                                              (~col('Year_Of_Publication').rlike("^[0-9]*$")), 'unknown') \
                                                             .otherwise(col('Year_Of_Publication')))
                                                          
#Columns which have only numeric values but should be limited to alphabetic, changed to 'unknown'
for C1 in BooksDF_1.columns:
  if C1 not in ('ISBN', 'Year_Of_Publication'):
    BooksDF_1 = BooksDF_1.withColumn(C1, when(col(C1).rlike("^[0-9]*$"), 'unknown').otherwise(col(C1)))


####################################################################################################################################################

# ISBN VALIDITY SCREANING

####################################################################################################################################################

ValidISBN = []
InvalidISBN = []

row_listBOOKS = BooksDF_1.collect()

for i in row_listBOOKS:
  if (is_isbn10(i.__getitem__('ISBN')) == True):
    ValidISBN.append(i)
  else:
    InvalidISBN.append(i) 
print('#########################################################################################################################')
print("After Running our state of the art high tech algorythim, we now know how many bookshave valid and invalid ISBN's")
print('')
print(f'There are {len(ValidISBN)} Books with Valid ISBN')
print(f'There are {len(InvalidISBN)} Books with Invalid ISBN')

InvalidISBN_ONLY = []
for i in InvalidISBN:
  InvalidISBN_ONLY.append(i.__getitem__('ISBN')) 

BooksDF_1 = BooksDF_1.filter(~col('ISBN').isin(InvalidISBN_ONLY))
print('')
print('')
print(f'There should be {len(ValidISBN)} books with valid ISBN in the updated database')
print('')

print('Here is a count of the total rows in the updated Books dataframe')
print('')
print('#####  ' + f'{BooksDF_1.count()}' + '  #####' )

#CREATING DIM_BOOKS

print('Creating DIM_BOOKS...........')

BooksDF_1 = BooksDF_1.drop('Image_URL_S','Image_URL_M','Image_URL_L')

BooksDF_1 = BooksDF_1.withColumnRenamed('ISBN', 'BookBK')
print('')
print('')

from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window

BooksDF_1 = BooksDF_1.withColumn(
    "BookSK",
    row_number().over(Window.orderBy(monotonically_increasing_id()))+100
)

BooksDF_1 = BooksDF_1.select('BookSK', 'BookBK', 'Book_Title', 'Book_Author', 'Year_Of_Publication', 'Publisher')

DIM_BOOKS = BooksDF_1
#Se Tout
print('Final Look at the DIM')


DIM_BOOKS.show()
DIM_BOOKS.describe().show()


##3. Preparing Users Dataframe 

In [None]:
#Preparing Users Dataframe

from pyspark.sql.functions import col,isnan,when,count

#Splitting Columns
UsersDF_1 = Users_DF.withColumn('User_ID', f.split(Users_DF['UserstoSPLIT'], ';').getItem(0)) \
       .withColumn('City', f.split(Users_DF['UserstoSPLIT'], ';').getItem(1)) \
       .withColumn('Country', f.split(Users_DF['AgeSPLIT'], ';').getItem(0)) \
       .withColumn('Age', f.split(Users_DF['AgeSPLIT'], ';').getItem(1))


#4000 rows of data were lost because they written on other columns (Columns D,E,F in CSV) with missing user ID's

#Drop the column from before
UsersDF_1 = UsersDF_1.drop('UserstoSPLIT','AgeSPLIT')

#Trimming leading and trailing spaces 

for C1 in UsersDF_1.columns:
  UsersDF_1 = UsersDF_1.withColumn(C1, trim(col(C1)))


#Important, countries and cities with & would become concatenated when junk characters are fixed
for C1 in UsersDF_1.columns:
  UsersDF_1 = UsersDF_1.withColumn(C1, f.regexp_replace(C1, ' & ', ' and '))

#Tiding up the columns
UsersDF_1 = UsersDF_1.select('User_ID', 'City', 'State', 'Country', 'Age')

#Getting rid of junk characters

for C1 in UsersDF_1.columns:
  UsersDF_1 = UsersDF_1.withColumn(C1, f.regexp_replace(C1, '[^0-9a-zA-Z $]+', ''))

for C1 in UsersDF_1.columns:
  UsersDF_1 = UsersDF_1.withColumn(C1, f.regexp_replace(C1, 'NULL', ''))

#Trim Twice to be nice
for C1 in UsersDF_1.columns:
  UsersDF_1 = UsersDF_1.withColumn(C1, trim(col(C1)))

print('UsersDF_1 before transformation')
UsersDF_1.show(truncate=False)

#Finding out how many nulls in dataset
print('Null count before clean')
UsersDF_1.select([count(when(isnan(C1) |(col(C1) == 'NULL')  | (col(C1) == 'na') | (col(C1) == '') | col(C1).isNull(), C1)).alias(C1) for C1 in UsersDF_1.columns]
   ).show()

#Getting rid of empty values and nulls
for C1 in UsersDF_1.columns:
  UsersDF_1 = UsersDF_1.withColumn(C1, when((col(C1) == '') | (col(C1) == 'na') | (col(C1) == 0) | \
                                            (col(C1) == 'NULL') | (col(C1) == ' ') | (col(C1).isNull()), 'unknown').otherwise(col(C1)))

#puting unknown on age above 250
UsersDF_1 = UsersDF_1.withColumn(C1, when((col('Age') > 250), 'unknown').otherwise(col('Age')))
  

#Small Fix remove first row with text columns
UsersDF_1 = UsersDF_1.where(col('User_ID') != 'UserID')

#Data clening
UsersDF_1 = UsersDF_1.withColumn('City', when(~(col('City').rlike("^[a-zA-Z ]*$")),'unknown').otherwise(col('City')))
UsersDF_1 = UsersDF_1.withColumn('State', when(~(col('State').rlike("^[a-zA-Z ]*$")),'unknown').otherwise(col('State')))
UsersDF_1 = UsersDF_1.withColumn('Country', when(~(col('Country').rlike("^[a-zA-Z ]*$")),'unknown').otherwise(col('Country')))



#fix find cities with alternative forms that all refer to the same city\country under one name, 
UsersDF_1 = UsersDF_1.withColumn('City', when((col('City') == 'brighton') | (col('City') == 'brighton and hove') | (col('City') == 'hove') , 'brighton and hove').otherwise(col('City')))
UsersDF_1 = UsersDF_1.withColumn('City', when((col('City') == 'sandiego') | (col('City') == 'san diego'), 'san diego').otherwise(col('City')))
UsersDF_1 = UsersDF_1.withColumn('Country', when((col('Country') == 'brasil') | (col('Country') == 'brazil'), 'brazil').otherwise(col('Country')))

#Removing Rows that dont have a userID
UsersDF_1 = UsersDF_1.filter(col("User_ID") != 'unknown') 

print('Showing Dataframe after adding unknowns')
UsersDF_1.show(truncate=False)

UsersDF_1.describe().show(truncate=False)


#Finding out how many nulls in dataset
#for some reason only works with import before code
print('Null count after dealing with nulls')

UsersDF_1.select([count(when(isnan(C1) |(col(C1) == 'NULL') | col(C1).isNull(), C1)).alias(C1) for C1 in UsersDF_1.columns]
   ).show()

#Finding out how many unknown in dataset
print('unknown count After replacing nulls to unknown')
UsersDF_1.select([count(when(col(C1) == 'unknown', C1)).alias(C1) for C1 in UsersDF_1.columns]
   ).show()   

#Creating DIM_Users

print('Creating DIM_USERS...........')

from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window

UsersDF_1 = UsersDF_1.withColumnRenamed('User_ID', 'UserBK') \


UsersDF_1 = UsersDF_1.withColumn(
    "UserSK",
    row_number().over(Window.orderBy(monotonically_increasing_id()))+100
)

UsersDF_1 = UsersDF_1.select('UserSK', 'UserBK', 'City', 'State', 'Country', 'Age')

DIM_USERS = UsersDF_1




print('Final look at DIM USERS')
DIM_USERS.show()
DIM_USERS.describe().show()  
   

##4. Preparing Rating DataFrame 




In [None]:

#Cleaning BookRating DataFrame

from pyspark.sql.functions import col,isnan,when,count
#Splitting Columns
BookRatingsDF_1 = BookRatings_DF.withColumn('User_ID', f.split(BookRatings_DF['"User-ID;""ISBN"";""Book-Rating"""'], ';').getItem(0)) \
       .withColumn('ISBN', f.split(BookRatings_DF['"User-ID;""ISBN"";""Book-Rating"""'], ';').getItem(1)) \
       .withColumn('Book_Rating', f.split(BookRatings_DF['"User-ID;""ISBN"";""Book-Rating"""'], ';').getItem(2))

#Dropping Old Column
BookRatingsDF_1 = BookRatingsDF_1.drop('"User-ID;""ISBN"";""Book-Rating"""', '_c1', '_c2') 

#Getting rid of junk characters
for C1 in BookRatingsDF_1.columns:
  BookRatingsDF_1 = BookRatingsDF_1.withColumn(C1, f.regexp_replace(C1, '[^0-9a-zA-Z$]+', ''))

#Trimming
for C1 in BookRatingsDF_1.columns:
  BookRatingsDF_1 = BookRatingsDF_1.withColumn(C1, trim(col(C1)))

#Finding out how many nulls in dataset
print('Null count before before replacing nulls with column mean')
BookRatingsDF_1.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in BookRatingsDF_1.columns]
   ).show()

#Nulls only in 'Book Rating' Column, also checked unique values to see rating is from 0-10 only
#Replacing nulls in 'Book Rating'
avgdf = BookRatingsDF_1.agg({'Book_Rating': 'mean'})
avgdf = avgdf.withColumn("avg(Book_Rating)",avgdf["avg(Book_Rating)"].cast(StringType()))
avg = avgdf.rdd.map(lambda x: x[0]).collect()
BookRatingsDF_1 = BookRatingsDF_1.fillna(avg[0], subset=['Book_Rating'])


#Checikng if any aplphabetic-only values in strictly numeric columns, in this case all of them. 
print('Stats counting Alphabetic values')
BookRatingsDF_1.select([count(when(col(c).rlike("^[a-zA-Z]*$"), c)).alias(c) for c in BookRatingsDF_1.columns]
   ).show()



####################################################################################################################################################

# ISBN VALIDITY SCREANING

####################################################################################################################################################

ValidISBN = []
InvalidISBN = []

row_listBOOKRATINGS = BookRatingsDF_1.collect()

for i in row_listBOOKRATINGS:
  if (is_isbn10(i.__getitem__('ISBN')) == True):
    ValidISBN.append(i)
  else:
    InvalidISBN.append(i) 
print('#########################################################################################################################')
print("After Running our state of the art high tech algorythim, we now know how many bookshave valid and invalid ISBN's")
print('')
print(f'There are {len(ValidISBN)} Bookratings with Valid ISBN')
print(f'There are {len(InvalidISBN)} Booksratings with Invalid ISBN')

InvalidISBN_ONLY = []
for i in InvalidISBN:
  InvalidISBN_ONLY.append(i.__getitem__('ISBN')) 

BookRatingsDF_1 = BookRatingsDF_1.filter(~col('ISBN').isin(InvalidISBN_ONLY))
print('')
print('')
print(f'There should be {len(ValidISBN)} Booksratings with valid ISBN in the updated database')
print('')

print('Here is a count of the total rows in the updated BookRating dataframe')
print('')
print('#####  ' + f'{BookRatingsDF_1.count()}' + '  #####' )
print('')



print('quick look at the cleaned dataset')
BookRatingsDF_1.show()


#CREATING FACT_Rating
print('Creating FACT_RATING...........')
BookRatingsDF_1 = BookRatingsDF_1.withColumnRenamed('User_ID', 'UserBK') \
.withColumnRenamed('ISBN', 'BookBK') \
.withColumnRenamed('Book_Rating', 'Rating') 

from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window

BookRatingsDF_1 = BookRatingsDF_1.withColumn(
    "RatingSK",
    row_number().over(Window.orderBy(monotonically_increasing_id()))+100
)

BookRatingsDF_1 = BookRatingsDF_1.select('RatingSK', 'UserBK', 'BookBK', 'Rating')


#Final FACT TABLE, joining table
UsersDF_1.createOrReplaceTempView("Users")
BooksDF_1.createOrReplaceTempView("Books")
BookRatingsDF_1.createOrReplaceTempView("Ratings")

FACT_RATING =spark.sql('''
Select RatingSK, BookSK, UserSK, Rating FROM Ratings R inner join 
Users U on R.UserBK = U.UserBK inner join Books B on R.BookBK = B.BookBK 
'''
)
#Info on Fact Table

print('Final look at Fact Table')
FACT_RATING.show(truncate=False)
FACT_RATING.describe().show(truncate=False)


##5. Writing the completed DIMS and FACT as CSV's


In [None]:
DIM_USERS.write.option("header",True).mode("overwrite").csv(f'{FolderPath}/dim_users')
DIM_BOOKS.write.option("header",True).mode("overwrite").csv(f'{FolderPath}/dim_books')
FACT_RATING.write.option("header",True).mode("overwrite").csv(f'{FolderPath}/fact_rating')

print(f'Files are ready and waiting in {FolderPath}')

##6. Extras 

In [None]:
#SQL Query to check other things too- here to check all years under 1000 do not show up in dataframe


BooksDF_1.createOrReplaceTempView("Books")
Query =spark.sql('''
Select Book_Title, Year_Of_Publication FROM Books
WHERE Year_Of_Publication < 1000
Order By Year_Of_Publication desc
'''
)

Query.show(truncate=False)

In [None]:
#Just A Simple Filter to Check things


SimpleFilter= BookRatingsDF_1.filter( \
    col("User_ID") == '130499' \
  ) 



SimpleFilter.show(truncate=False)


In [None]:
#Checking how much data was lost

print(f" {Users_DF.count()-UsersDF_1.count()} users rows deleted ")
print(f"{Books_DF.count()-BooksDF_1.count()} Books rows deleted")
print(f"{BookRatings_DF.count()-BookRatingsDF_1.count()} BookRatings rows deleted")
