# ETL

### Importing the required libraries

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, collect_set, flatten, array_distinct

### Defining the input and output file paths

In [5]:
INPUT_FILE = '../data/people.json'
OUTPUT_FILE = '../data/people.parquet'

### Creating a Spark session

In [6]:
spark = SparkSession.builder.appName("ETL").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

### Reading the JSON file

##### **multiLine=True** is used to read the JSON file with multiple lines to avoid the **_corrupted_record** error.

In [7]:
extract_df = spark.read.json(INPUT_FILE, multiLine=True)
extract_df.printSchema()

root
 |-- age: long (nullable = true)
 |-- city: string (nullable = true)
 |-- friends: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- hobbies: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



In [8]:
extract_df.show(truncate=True)

+---+----------+--------------------+---+--------+
|age|      city|             friends| id|    name|
+---+----------+--------------------+---+--------+
| 78|    Sydney|[{[Watching Sport...|  0|  Elijah|
| 97| Melbourne|[{[Watching Sport...|  1|    Noah|
| 48|  Adelaide|[{[Reading, Volun...|  2|     Evy|
| 39|     Perth|[{[Watching Sport...|  3|  Oliver|
| 95|     Perth|[{[Movie Watching...|  4| Michael|
| 19|  Brisbane|[{[Painting, Tele...|  5| Michael|
| 76|    Sydney|[{[Genealogy, Coo...|  6|   Lucas|
| 25|    Hobart|[{[Music, Golf], ...|  7|Michelle|
| 61|    Sydney|[{[Bicycling, Ski...|  8|   Emily|
| 33|Launceston|[{[Traveling, Bic...|  9|    Liam|
+---+----------+--------------------+---+--------+



### Exploding the friends column

In [9]:
explode_df = extract_df.withColumn('friends', explode(col('friends')))
explode_df.printSchema()

root
 |-- age: long (nullable = true)
 |-- city: string (nullable = true)
 |-- friends: struct (nullable = true)
 |    |-- hobbies: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- name: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



In [10]:
explode_df.show(truncate=False)

+---+---------+--------------------------------------------------------------+---+-------+
|age|city     |friends                                                       |id |name   |
+---+---------+--------------------------------------------------------------+---+-------+
|78 |Sydney   |{[Watching Sports, Reading, Skiing & Snowboarding], Michelle} |0  |Elijah |
|78 |Sydney   |{[Traveling, Video Games], Robert}                            |0  |Elijah |
|97 |Melbourne|{[Watching Sports, Skiing & Snowboarding, Collecting], Oliver}|1  |Noah   |
|97 |Melbourne|{[Running, Music, Woodworking], Olivia}                       |1  |Noah   |
|97 |Melbourne|{[Woodworking, Calligraphy, Genealogy], Robert}               |1  |Noah   |
|97 |Melbourne|{[Walking, Church Activities], Ava}                           |1  |Noah   |
|97 |Melbourne|{[Music, Church Activities], Michael}                         |1  |Noah   |
|97 |Melbourne|{[Martial Arts, Painting, Jewelry Making], Michael}           |1  |Noah   |

### Flattening the friends column

##### 1. **collect_set** collects unique values of the friends' names and hobbies, including fields within arrays.
##### 2. **flatten** combines the friends' hobbies arrays into a single array.
##### 3. **array_distinct** removes duplicate values from the friends' hobbies array.


In [11]:
flatten_df = explode_df.groupby('id', 'name', 'age', 'city').agg(collect_set('friends.name').alias('friends_name'), array_distinct(flatten(collect_set('friends.hobbies'))).alias('friends_hobbies'))
flatten_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- city: string (nullable = true)
 |-- friends_name: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- friends_hobbies: array (nullable = false)
 |    |-- element: string (containsNull = true)



In [12]:
flatten_df.show(truncate=False)

+---+--------+---+----------+--------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id |name    |age|city      |friends_name                          |friends_hobbies                                                                                                                                                              |
+---+--------+---+----------+--------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|6  |Lucas   |76 |Sydney    |[John]                                |[Genealogy, Cooking, Socializing, Yoga]                                                                                                                                      |
|1  |Noah    |97 |Melbourne 

### Exploding the friends_name and friends_hobbies columns

##### **explode** is used to explode the friends' names and hobbies arrays.

In [13]:
explode_array_df = flatten_df.withColumn('friends_name', explode(col('friends_name'))).withColumn('friends_hobbies', explode(col('friends_hobbies')))
explode_array_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- city: string (nullable = true)
 |-- friends_name: string (nullable = false)
 |-- friends_hobbies: string (nullable = true)



In [14]:
explode_array_df.show(truncate=False, n=100)

+---+-------+---+---------+------------+---------------------+
|id |name   |age|city     |friends_name|friends_hobbies      |
+---+-------+---+---------+------------+---------------------+
|6  |Lucas  |76 |Sydney   |John        |Genealogy            |
|6  |Lucas  |76 |Sydney   |John        |Cooking              |
|6  |Lucas  |76 |Sydney   |John        |Socializing          |
|6  |Lucas  |76 |Sydney   |John        |Yoga                 |
|1  |Noah   |97 |Melbourne|Robert      |Walking              |
|1  |Noah   |97 |Melbourne|Robert      |Church Activities    |
|1  |Noah   |97 |Melbourne|Robert      |Watching Sports      |
|1  |Noah   |97 |Melbourne|Robert      |Skiing & Snowboarding|
|1  |Noah   |97 |Melbourne|Robert      |Collecting           |
|1  |Noah   |97 |Melbourne|Robert      |Martial Arts         |
|1  |Noah   |97 |Melbourne|Robert      |Painting             |
|1  |Noah   |97 |Melbourne|Robert      |Jewelry Making       |
|1  |Noah   |97 |Melbourne|Robert      |Running        

### Ordering the DataFrame by the id column

In [15]:
oder_df = explode_array_df.orderBy('id', ascending=True)
oder_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- city: string (nullable = true)
 |-- friends_name: string (nullable = false)
 |-- friends_hobbies: string (nullable = true)



In [16]:
oder_df.show(truncate=False)

+---+------+---+---------+------------+---------------------+
|id |name  |age|city     |friends_name|friends_hobbies      |
+---+------+---+---------+------------+---------------------+
|0  |Elijah|78 |Sydney   |Robert      |Watching Sports      |
|0  |Elijah|78 |Sydney   |Robert      |Reading              |
|0  |Elijah|78 |Sydney   |Robert      |Skiing & Snowboarding|
|0  |Elijah|78 |Sydney   |Robert      |Traveling            |
|0  |Elijah|78 |Sydney   |Robert      |Video Games          |
|0  |Elijah|78 |Sydney   |Michelle    |Watching Sports      |
|0  |Elijah|78 |Sydney   |Michelle    |Reading              |
|0  |Elijah|78 |Sydney   |Michelle    |Skiing & Snowboarding|
|0  |Elijah|78 |Sydney   |Michelle    |Traveling            |
|0  |Elijah|78 |Sydney   |Michelle    |Video Games          |
|1  |Noah  |97 |Melbourne|Michael     |Collecting           |
|1  |Noah  |97 |Melbourne|Ava         |Calligraphy          |
|1  |Noah  |97 |Melbourne|Michael     |Martial Arts         |
|1  |Noa

### Converting the column names to uppercase

In [17]:
uppercase_df = oder_df.select([col(c).alias(c.upper()) for c in oder_df.columns])
uppercase_df.printSchema()

root
 |-- ID: long (nullable = true)
 |-- NAME: string (nullable = true)
 |-- AGE: long (nullable = true)
 |-- CITY: string (nullable = true)
 |-- FRIENDS_NAME: string (nullable = false)
 |-- FRIENDS_HOBBIES: string (nullable = true)



In [18]:
uppercase_df.show(truncate=False)

+---+------+---+---------+------------+---------------------+
|ID |NAME  |AGE|CITY     |FRIENDS_NAME|FRIENDS_HOBBIES      |
+---+------+---+---------+------------+---------------------+
|0  |Elijah|78 |Sydney   |Robert      |Watching Sports      |
|0  |Elijah|78 |Sydney   |Robert      |Reading              |
|0  |Elijah|78 |Sydney   |Robert      |Skiing & Snowboarding|
|0  |Elijah|78 |Sydney   |Robert      |Traveling            |
|0  |Elijah|78 |Sydney   |Robert      |Video Games          |
|0  |Elijah|78 |Sydney   |Michelle    |Watching Sports      |
|0  |Elijah|78 |Sydney   |Michelle    |Reading              |
|0  |Elijah|78 |Sydney   |Michelle    |Skiing & Snowboarding|
|0  |Elijah|78 |Sydney   |Michelle    |Traveling            |
|0  |Elijah|78 |Sydney   |Michelle    |Video Games          |
|1  |Noah  |97 |Melbourne|Michael     |Collecting           |
|1  |Noah  |97 |Melbourne|Ava         |Calligraphy          |
|1  |Noah  |97 |Melbourne|Michael     |Martial Arts         |
|1  |Noa

### Writing the DataFrame to a Parquet file

In [19]:
uppercase_df.write.parquet(OUTPUT_FILE)

                                                                                

### Reading the Parquet file

In [20]:
read_parquet_df = spark.read.parquet(OUTPUT_FILE)
read_parquet_df.printSchema()

root
 |-- ID: long (nullable = true)
 |-- NAME: string (nullable = true)
 |-- AGE: long (nullable = true)
 |-- CITY: string (nullable = true)
 |-- FRIENDS_NAME: string (nullable = true)
 |-- FRIENDS_HOBBIES: string (nullable = true)



In [21]:
read_parquet_df.show(truncate=False)

+---+------+---+---------+------------+---------------------+
|ID |NAME  |AGE|CITY     |FRIENDS_NAME|FRIENDS_HOBBIES      |
+---+------+---+---------+------------+---------------------+
|0  |Elijah|78 |Sydney   |Robert      |Watching Sports      |
|0  |Elijah|78 |Sydney   |Robert      |Reading              |
|0  |Elijah|78 |Sydney   |Robert      |Skiing & Snowboarding|
|0  |Elijah|78 |Sydney   |Robert      |Traveling            |
|0  |Elijah|78 |Sydney   |Robert      |Video Games          |
|0  |Elijah|78 |Sydney   |Michelle    |Watching Sports      |
|0  |Elijah|78 |Sydney   |Michelle    |Reading              |
|0  |Elijah|78 |Sydney   |Michelle    |Skiing & Snowboarding|
|0  |Elijah|78 |Sydney   |Michelle    |Traveling            |
|0  |Elijah|78 |Sydney   |Michelle    |Video Games          |
|1  |Noah  |97 |Melbourne|Robert      |Walking              |
|1  |Noah  |97 |Melbourne|Robert      |Church Activities    |
|1  |Noah  |97 |Melbourne|Robert      |Watching Sports      |
|1  |Noa