# Spark Installation

In [None]:
# 1) First: install Java, Spark and and run a local Spark session by just running this on Google Colab:
!apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null   # !apt-get --> install java
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz  # !wget  --> download file from url
!tar xf spark-3.1.2-bin-hadoop3.2.tgz  # !tar --> like unzip 
!pip install -q findspark  # !pip  --> instal a package, we cant import a library without installing it first, most libraries that we used were already installed
# This are INSTALLATION COMMANDS IN LINUX that we run in our collab space, it's similar to downloading programs an installing them on our computers
# installs Apache Spark 2.4.7, Java 8, and Findspark, a library that makes it easy for Python to find Spark

# 2) Second: set the locations where Spark and Java are installed to let know Colab where to find it.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

# 3) Third: import spark libraries and use them
import findspark
findspark.init("spark-3.1.2-bin-hadoop3.2") # SPARK_HOME
from pyspark.sql import SparkSession

# Create the session - We need to remember to close it at the end
# The session is basically our connection to Spark layer in the Hadoop ecosystem
spark = SparkSession.builder.appName("walmart").getOrCreate()
from pyspark.sql import Row
from pyspark.sql import functions

[33m0% [Working][0m            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
[33m0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (91.18[0m                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,622 B]
[33m0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (91.18[0m[33m0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (91.18[0m                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
[33m0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (91.18[0m[33m0% [2 InRelease gpgv 3,622 B] [Connecting to archive.ubuntu.com] [Connecting to[0m                                                                          

In [None]:
import pyspark.sql.functions as f

# Functions

In [None]:
from pyspark.sql import functions as F

def split_column(df, col_name, col_delimiter=';', new_cols_names=None):
  new_col_name = col_name.replace('\"', '').strip()
  if new_cols_names is None:
    new_cols = new_col_name.split(col_delimiter)
    new_cols = [c.replace('-', '_').lower() for c in new_cols]
    new_col_exprs = [f'`{new_col_name}`[{i}] as `{new_cols[i]}`' for i in range(len(new_cols))]
  else:
    new_col_exprs = [f'`{new_col_name}`[{i}] as `{new_cols_names[i]}`' for i in range(len(new_cols_names))]

  new_df = df.select(*[c for c in df.columns if c != col_name], F.split(col_name, col_delimiter).alias(new_col_name))\
    .selectExpr('*', *new_col_exprs)
  
  return new_df.select(*[c for c in new_df.columns if c != new_col_name])
    
def remove_prefix(df, columns, prefix='\"'):
  return df.selectExpr(*[c for c in df.columns if c not in columns], *[f'regexp_replace({c}, \'^{prefix}\', \'\') as {c}' for c in columns])

def remove_suffix(df, columns, suffix='\"'):
  return df.selectExpr(*[c for c in df.columns if c not in columns], *[f'regexp_replace({c}, \'{suffix}$\', \'\') as {c}' for c in columns])

# Data Loading & Processing

In [None]:
from google.colab import drive

drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
books_rating_df = spark.read.csv('/content/drive/MyDrive/BDA/big data project/BX-Book-Ratings.csv', sep=';', header=True, inferSchema=True)
books_rating_clean_df = split_column(books_rating_df, books_rating_df.columns[0])\
      .selectExpr('user_id', 'isbn', '`""Book-Rating""",,` as book_rating')
books_rating_clean_df = remove_prefix(books_rating_clean_df, columns=['user_id'])
books_rating_clean_df = remove_prefix(books_rating_clean_df, columns=['isbn', 'book_rating'], prefix='\"\"')
books_rating_clean_df = remove_suffix(books_rating_clean_df, columns=['isbn'], suffix='\"\"')
books_rating_clean_df = remove_suffix(books_rating_clean_df, columns=['book_rating'], suffix='\"\"\",,')
books_rating_clean_df = books_rating_clean_df.selectExpr('int(user_id) as user_id', 'isbn', 'int(book_rating) as book_rating')
books_rating_clean_df = books_rating_clean_df.select(f.regexp_replace (f.col("user_id"), "[^0-9]", "").alias("user_id"), "isbn", "book_rating")
books_rating_clean_df = books_rating_clean_df.select(f.regexp_replace (f.col("isbn"), "[^0-9x]", "").alias("isbn"), "user_id", "book_rating")
books_rating_clean_df = books_rating_clean_df.where ("length(isbn) =10 or length(isbn) = 13") 


from pyspark.sql.functions import *
 
books_rating_clean_df = books_rating_clean_df.withColumn('isbn', trim(books_rating_clean_df.isbn))

from pyspark.sql.functions import col, when

def blank_as_null(x):
    return when(col(x) != "", col(x)).otherwise(None)

books_rating_clean_df = books_rating_clean_df.withColumn("isbn", blank_as_null("isbn"))

books_rating_clean_df = books_rating_clean_df.na.fill({"book_rating": "0" }) 


books_rating_clean_df.printSchema()
books_rating_clean_df.show()


root
 |-- isbn: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- book_rating: integer (nullable = true)

+----------+-------+-----------+
|      isbn|user_id|book_rating|
+----------+-------+-----------+
|0155061224| 276726|          5|
|0446520802| 276727|          0|
|0521795028| 276729|          6|
|2080674722| 276733|          0|
|3257224281| 276736|          8|
|0600570967| 276737|          6|
|0425115801| 276746|          0|
|0449006522| 276746|          0|
|0553561618| 276746|          0|
|0786013990| 276746|          0|
|0786014512| 276746|          0|
|0060517794| 276747|          9|
|0451192001| 276747|          0|
|0609801279| 276747|          0|
|0671537458| 276747|          9|
|0679776818| 276747|          8|
|0943066433| 276747|          7|
|1570231028| 276747|          0|
|1885408226| 276747|          7|
|0747558167| 276748|          6|
+----------+-------+-----------+
only showing top 20 rows



In [None]:
books_df = spark.read.csv('/content/drive/MyDrive/BDA/big data project/BX-Books.csv', sep=';', header=True, inferSchema=True)
books_clean_df = books_df.selectExpr('ISBN as isbn', '`Book-Title` as book_title', '`Book-Author` as book_author',
                                     'int(`Year-Of-Publication`) as year_of_publication', 'Publisher as publisher')
books_clean_df = books_clean_df.select(f.regexp_replace (f.col("isbn"), "[^0-9x]", "").alias("isbn"),"year_of_publication", "book_title", "book_author", "publisher")
books_clean_df = books_clean_df.selectExpr ("isbn","book_title","book_author","CASE WHEN year_of_publication = 0  THEN '1900' ELSE year_of_publication END as year_of_publication","publisher")

from pyspark.sql.functions import udf

def ascii_ignore(x):
    return x.encode('ascii', 'ignore').decode('ascii')

ascii_udf = udf(ascii_ignore)



books_clean_df = books_clean_df.withColumn("book_title", ascii_udf('book_title'))
books_clean_df = books_clean_df.withColumn("book_author", ascii_udf('book_author'))
books_clean_df = books_clean_df.withColumn("publisher", ascii_udf('publisher'))


books_clean_df = remove_prefix(books_clean_df, columns=['book_title', 'book_author', 'publisher'], prefix='@')
books_clean_df = remove_prefix(books_clean_df, columns=['book_title', 'book_author', 'publisher'], prefix='\$')
books_clean_df = remove_prefix(books_clean_df, columns=['book_title', 'book_author', 'publisher'], prefix='-')
books_clean_df = remove_prefix(books_clean_df, columns=['book_title', 'book_author', 'publisher'], prefix='-')
books_clean_df = remove_suffix(books_clean_df, columns=['book_title', 'book_author', 'publisher'], suffix='@')
books_clean_df = remove_suffix(books_clean_df, columns=['book_title', 'book_author', 'publisher'], suffix='-')

books_clean_df = remove_suffix(books_clean_df, columns=['book_author', 'publisher'], suffix='\\\?')
books_clean_df = remove_suffix(books_clean_df, columns=['book_author', 'publisher'], suffix='"')
books_clean_df = remove_prefix(books_clean_df, columns=['book_author', 'publisher'], prefix='"')
books_clean_df = remove_prefix(books_clean_df, columns=['book_author', 'publisher'], prefix='\\\?')

books_clean_df = remove_prefix(books_clean_df, columns=['book_title', 'book_author', 'publisher'], prefix='@')
books_clean_df = remove_prefix(books_clean_df, columns=['book_title', 'book_author', 'publisher'], prefix='\$')
books_clean_df = remove_prefix(books_clean_df, columns=['book_title', 'book_author', 'publisher'], prefix='-')
books_clean_df = remove_prefix(books_clean_df, columns=['book_title', 'book_author', 'publisher'], prefix='-')
books_clean_df = remove_suffix(books_clean_df, columns=['book_title', 'book_author', 'publisher'], suffix='@')
books_clean_df = remove_suffix(books_clean_df, columns=['book_title', 'book_author', 'publisher'], suffix='-')

books_clean_df = remove_suffix(books_clean_df, columns=['book_author', 'publisher'], suffix='\\\?')
books_clean_df = remove_suffix(books_clean_df, columns=['book_author', 'publisher'], suffix='"')
books_clean_df = remove_prefix(books_clean_df, columns=['book_author', 'publisher'], prefix='"')
books_clean_df = remove_prefix(books_clean_df, columns=['book_author', 'publisher'], prefix='\\\?')

books_clean_df = remove_prefix(books_clean_df, columns=['book_title', 'book_author', 'publisher'], prefix='@')
books_clean_df = remove_prefix(books_clean_df, columns=['book_title', 'book_author', 'publisher'], prefix='\$')
books_clean_df = remove_prefix(books_clean_df, columns=['book_title', 'book_author', 'publisher'], prefix='-')
books_clean_df = remove_prefix(books_clean_df, columns=['book_title', 'book_author', 'publisher'], prefix='-')
books_clean_df = remove_suffix(books_clean_df, columns=['book_title', 'book_author', 'publisher'], suffix='@')
books_clean_df = remove_suffix(books_clean_df, columns=['book_title', 'book_author', 'publisher'], suffix='-')

books_clean_df = remove_suffix(books_clean_df, columns=['book_author', 'publisher'], suffix='\\\?')
books_clean_df = remove_suffix(books_clean_df, columns=['book_author', 'publisher'], suffix='"')
books_clean_df = remove_prefix(books_clean_df, columns=['book_author', 'publisher'], prefix='"')
books_clean_df = remove_prefix(books_clean_df, columns=['book_author', 'publisher'], prefix='\\\?')

books_clean_df.printSchema()
books_clean_df.show()

root
 |-- isbn: string (nullable = true)
 |-- year_of_publication: string (nullable = true)
 |-- book_title: string (nullable = true)
 |-- book_author: string (nullable = true)
 |-- publisher: string (nullable = true)

+----------+-------------------+--------------------+--------------------+--------------------+
|      isbn|year_of_publication|          book_title|         book_author|           publisher|
+----------+-------------------+--------------------+--------------------+--------------------+
|0195153448|               2002| Classical Mythology|  Mark P. O. Morford|Oxford University...|
|0002005018|               2001|        Clara Callan|Richard Bruce Wright|HarperFlamingo Ca...|
|0060973129|               1991|Decision in Normandy|        Carlo D'Este|     HarperPerennial|
|0374157065|               1999|Flu: The Story of...|    Gina Bari Kolata|Farrar Straus Giroux|
|0393045218|               1999|The Mummies of Ur...|     E. J. W. Barber|W. W. Norton &amp...|
|0399135782| 

In [None]:
books_clean_df.filter(books_clean_df.isbn.isNull()).show()


+----+-------------------+----------+-----------+---------+
|isbn|year_of_publication|book_title|book_author|publisher|
+----+-------------------+----------+-----------+---------+
+----+-------------------+----------+-----------+---------+



In [None]:
users_df = spark.read.csv('/content/drive/MyDrive/BDA/big data project/BX-Users.csv', sep=';',header=True, inferSchema=True)
users_clean_df = users_df.selectExpr('int(`User-ID`) as user_id', 'Location as location', 'int(Age) as age')
users_clean_df = split_column(users_clean_df, 'location', col_delimiter=',', new_cols_names=['city', 'district', 'country'])
users_clean_df = users_clean_df.selectExpr('user_id', "CASE WHEN age > 120 or age < -1 THEN '-1' ELSE age END as age",
                                           "regexp_replace(city, '[^a-zA-Z]+', '') as city",
                                           "regexp_replace(district, '[^a-zA-Z]+', '') as district",
                                           "CASE WHEN trim(country) = '' THEN 'unknown' ELSE trim(country) END as country")
users_clean_df = users_clean_df.selectExpr('user_id', 'age', "CASE WHEN city = '' or city = 'na' THEN 'unknown' ELSE city END as city",
                                    "CASE WHEN district = '' or district = 'na' THEN 'unknown' ELSE district END as district",'country')

countries_df = spark.read.csv('/content/drive/MyDrive/BDA/big data project/WORLD COUNTRIES.csv.xls', sep=';', header=False, inferSchema=True)
countries_df = countries_df.select('_c0')
countries_df = countries_df.selectExpr('lower(_c0) as country_output')

users_clean_df = users_clean_df.join(countries_df, users_clean_df['country']==countries_df['country_output'], how='left')\
  .where("country = 'unknown' or country_output is not null").select('user_id', 'age', 'city', 'district', 'country')


users_clean_df = users_clean_df.withColumn('city', trim(users_clean_df.city))

users_clean_df = users_clean_df.withColumn("city", blank_as_null("city"))

users_clean_df = users_clean_df.withColumn('district', trim(users_clean_df.district))

users_clean_df = users_clean_df.withColumn("district", blank_as_null("district"))

users_clean_df = users_clean_df.na.fill({"user_id": "-1", "age": "-1","city": "unknown","district": "unknown", "country":"unknown" }) 

users_clean_df.printSchema()
users_clean_df.show()

root
 |-- user_id: integer (nullable = true)
 |-- age: string (nullable = false)
 |-- city: string (nullable = false)
 |-- district: string (nullable = false)
 |-- country: string (nullable = false)

+-------+---+------------+--------------+--------------+
|user_id|age|        city|      district|       country|
+-------+---+------------+--------------+--------------+
|      1| -1|         nyc|       newyork|           usa|
|      2| 18|    stockton|    california|           usa|
|      3| -1|      moscow|yukonterritory|        russia|
|      4| 17|       porto|        vngaia|      portugal|
|      5| -1| farnborough|         hants|united kingdom|
|      6| 61| santamonica|    california|           usa|
|      7| -1|  washington|            dc|           usa|
|      8| -1|     timmins|       ontario|        canada|
|      9| -1|  germantown|     tennessee|           usa|
|     10| 26|    albacete|     wisconsin|         spain|
|     11| 14|   melbourne|      victoria|     australia|
| 

In [None]:
users_clean_df.filter(users_clean_df.city.isNull() | (users_clean_df.city == "") | (users_clean_df.district == "")).show()

+-------+---+----+--------+-------+
|user_id|age|city|district|country|
+-------+---+----+--------+-------+
+-------+---+----+--------+-------+



In [None]:
users_clean_df.where("city = 'na'").show()

+-------+---+----+--------+-------+
|user_id|age|city|district|country|
+-------+---+----+--------+-------+
+-------+---+----+--------+-------+



In [None]:
books_rating_clean_df.toPandas().to_csv("books_rating_clean_df.csv", sep=';', index = False)
books_clean_df.toPandas().to_csv("books_clean_df.csv", sep=';', index = False)
users_clean_df.toPandas().to_csv("user_clean_df.csv", sep=';', index = False)

In [None]:
spark.stop()