## Extract - Transform - Load

#### this project' main objective is to show and explain how create an ETL with pySpark.

In [32]:
from pyspark.sql import SparkSession
from pyspark.sql import functions

In [33]:
spark = SparkSession.builder.appName("BooksReviewSession") \
    .config("spark.jars",
            "/Users/chipanatica/Library/Application Support/JetBrains/DataGrip2021.2/jdbc-drivers/PostgreSQl/42.2.22/postgresql-42.2.22.jar") \
    .getOrCreate()

#### Because I'm working locally I should create a Spark session specify the postgres driver due to I will to Extract and load data from this database.

## Extract

In [30]:
def get_dataset(table_name: str):
    """
    :var table_name: specific name from table
    """
    return spark.read.format('jdbc') \
    .option('url', 'jdbc:postgresql://localhost:5432/test') \
    .option('dbtable', f'public.{table_name}') \
    .option('user', 'postgres') \
    .option('password', '') \
    .option('driver', 'org.postgresql.Driver') \
    .load()

def save_dataset(db, table_name: str):
    db.write.format('jdbc') \
    .option('url', 'jdbc:postgresql://localhost:5432/test') \
    .option('dbtable', f'public.{table_name}') \
    .option('user', 'postgres') \
    .option('password', '') \
    .option('driver', 'org.postgresql.Driver') \
    .save()

#### I created two function to load and save data. In both case I must specify the database' url with auth parameters.

In [14]:
book_dataset = get_dataset('books')
user_dataset = get_dataset('book_user')
rating_dataset = get_dataset('book_rating')

## Transform

In [10]:
def show_basic_information(db):
    print(f'Rows: {db.count()} - Columns: {len(db.columns)}')


def show_nulls(db):
    db.select([
        functions.count(
            functions.when(functions.isnan(c) | functions.col(c).isNull(), c)
        ).alias(c) for c in db.columns]
    ).show()

#### I created other two functions: First to show detail about dataset and second function to show null by each column.

In [11]:
show_basic_information(book_dataset)
show_nulls(book_dataset)

Rows: 270494 - Columns: 8


[Stage 7:>                                                          (0 + 1) / 1]

+----+----------+-----------+-------------------+---------+-----------+-----------+-----------+
|isbn|Book-Title|Book-Author|Year-Of-Publication|Publisher|Image-URL-S|Image-URL-M|Image-URL-L|
+----+----------+-----------+-------------------+---------+-----------+-----------+-----------+
|   0|         0|          0|                  0|        0|          0|          0|          0|
+----+----------+-----------+-------------------+---------+-----------+-----------+-----------+



                                                                                

In [13]:
book_dataset.show(truncate=False)

+----------+--------------------------------------------------------------------------------------------------+--------------------+-------------------+---------------------------+------------------------------------------------------------+------------------------------------------------------------+------------------------------------------------------------+
|isbn      |Book-Title                                                                                        |Book-Author         |Year-Of-Publication|Publisher                  |Image-URL-S                                                 |Image-URL-M                                                 |Image-URL-L                                                 |
+----------+--------------------------------------------------------------------------------------------------+--------------------+-------------------+---------------------------+------------------------------------------------------------+-------------------------------

In [15]:
user_dataset.show(truncate=False)

+------+--------------------------------------+----+
|UserID|Location                              |Age |
+------+--------------------------------------+----+
|1     |nyc, new york, usa                    |NULL|
|2     |stockton, california, usa             |18  |
|3     |moscow, yukon territory, russia       |NULL|
|4     |porto, v.n.gaia, portugal             |17  |
|5     |farnborough, hants, united kingdom    |NULL|
|6     |santa monica, california, usa         |61  |
|7     |washington, dc, usa                   |NULL|
|8     |timmins, ontario, canada              |NULL|
|9     |germantown, tennessee, usa            |NULL|
|10    |albacete, wisconsin, spain            |26  |
|11    |melbourne, victoria, australia        |14  |
|12    |fort bragg, california, usa           |NULL|
|13    |barcelona, barcelona, spain           |26  |
|14    |mediapolis, iowa, usa                 |NULL|
|15    |calgary, alberta, canada              |NULL|
|16    |albuquerque, new mexico, usa          

#### In the User dataset, one of the transformation that I can do is split Location in three columns. So,

In [16]:
rating_dataset.show(truncate=False)

+------+----------+-----------+
|UserID|isbn      |Book-Rating|
+------+----------+-----------+
|276725|034545104X|0          |
|276726|0155061224|5          |
|276727|0446520802|0          |
|276729|052165615X|3          |
|276729|0521795028|6          |
|276733|2080674722|0          |
|276736|3257224281|8          |
|276737|0600570967|6          |
|276744|038550120X|7          |
|276745|342310538 |10         |
|276746|0425115801|0          |
|276746|0449006522|0          |
|276746|0553561618|0          |
|276746|055356451X|0          |
|276746|0786013990|0          |
|276746|0786014512|0          |
|276747|0060517794|9          |
|276747|0451192001|0          |
|276747|0609801279|0          |
|276747|0671537458|9          |
+------+----------+-----------+
only showing top 20 rows



In [20]:
user_dataset = user_dataset.withColumn('Location_Array', functions.split('Location', ','))
user_dataset.show(truncate=False)

+------+--------------------------------------+----+------------------------------------------+
|UserID|Location                              |Age |Location_Array                            |
+------+--------------------------------------+----+------------------------------------------+
|1     |nyc, new york, usa                    |NULL|[nyc,  new york,  usa]                    |
|2     |stockton, california, usa             |18  |[stockton,  california,  usa]             |
|3     |moscow, yukon territory, russia       |NULL|[moscow,  yukon territory,  russia]       |
|4     |porto, v.n.gaia, portugal             |17  |[porto,  v.n.gaia,  portugal]             |
|5     |farnborough, hants, united kingdom    |NULL|[farnborough,  hants,  united kingdom]    |
|6     |santa monica, california, usa         |61  |[santa monica,  california,  usa]         |
|7     |washington, dc, usa                   |NULL|[washington,  dc,  usa]                   |
|8     |timmins, ontario, canada        

#### Well, in the before code I transform the location column from string to list. The reason is that is more easy to work with list that with string in this scenario.

In [27]:
user_dataset = user_dataset\
    .withColumn('State', user_dataset.Location_Array[0])\
    .withColumn('City', user_dataset.Location_Array[1])\
    .withColumn('Country', user_dataset.Location_Array[2])\
    .drop('Location_Array')\
    .drop('Location')

user_dataset.show(truncate=False)

+------+----+--------------+----------------+---------------+
|UserID|Age |State         |City            |Country        |
+------+----+--------------+----------------+---------------+
|1     |NULL|nyc           | new york       | usa           |
|2     |18  |stockton      | california     | usa           |
|3     |NULL|moscow        | yukon territory| russia        |
|4     |17  |porto         | v.n.gaia       | portugal      |
|5     |NULL|farnborough   | hants          | united kingdom|
|6     |61  |santa monica  | california     | usa           |
|7     |NULL|washington    | dc             | usa           |
|8     |NULL|timmins       | ontario        | canada        |
|9     |NULL|germantown    | tennessee      | usa           |
|10    |26  |albacete      | wisconsin      | spain         |
|11    |14  |melbourne     | victoria       | australia     |
|12    |NULL|fort bragg    | california     | usa           |
|13    |26  |barcelona     | barcelona      | spain         |
|14    |

#### With the new Location_Array column  I created three new columns State - City - Country based on the Location_Array column. Next step was that I drop Location_Array and Location columns because I will don't need.

## Load

In [31]:
save_dataset(user_dataset, 'user')

                                                                                

#### With the second function that I created I save the new  user data information on new table. This is only for example and show how we can save data.
