<a href="https://colab.research.google.com/github/Atsoutse1/git-jedha-ats/blob/main/02_Tidying_up_Part_1_solutions_Databricks.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Tidying up our data - Part 1

# Learning objectives

- Manipulate deeply nested json and transform it into structured data ready to be loaded onto a Data Warehousing system

## Loading our data from S3

Because we expect you to know how to setup things so that you can load data from S3, we're doing it for you now.  
Make sure you go through our code and check that you were actually using our best practices.

---

On a side note, it is **tedious** to have to do this in every notebook, isn't it?  
And putting everything into a single notebook isn't a proper solution either.

The solution would be to store this configuration into an external file, that's what we would recommend.  
_Sadly, DataBricks doesn't work with regular python files, only notebooks..._

In [None]:
!pip install boto3

In [None]:
import boto3

In [None]:
# TODO: using your previously created constants:
#       - create a s3 resource
#       - and a s3 bucket
### BEGIN STRIP ###
ACCESS_KEY_ID = "AKIA3V3GLDX54DRFGYTB" # cle du compte student
SECRET_ACCESS_KEY = "xpGN4+hrbJTcyxjBGtiKEDpPo46g+wuTKzo6wDGe" # secret key du compte student

hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", ACCESS_KEY_ID)
hadoop_conf.set("fs.s3a.secret.key", SECRET_ACCESS_KEY)
hadoop_conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") 

BUCKET_NAME = "full-stack-bigdata-datasets"
PREFIX = "Big_Data/YOUTUBE"
SCHEME = "s3"

session = boto3.Session(
    region_name='eu-west-3',  # Datacenters located in Paris, FR
    aws_access_key_id=ACCESS_KEY_ID, 
    aws_secret_access_key=SECRET_ACCESS_KEY
)
s3 = session.resource('s3')

### END STRIP ###

In [None]:
# We create a S3 resource and a Bucket from this same resource
bucket = s3.Bucket(BUCKET_NAME)

Last time, we worked with one file at a time, that was ok for quick analysis, but for processing, it would be nice to handle all files at once.  
We just gotta make sure their schemas is the same or we might run into surprises.  
Since all the APIs calls have been made using the same version and settings of the API, we can expect thing to be right, but **always keep an eye open**.

In [None]:
# TODO: print out the list of files inside {BUCKET}/{PREFIX}
### BEGIN STRIP ###
all_keys = [obj.key for obj in bucket.objects.filter(Prefix=PREFIX)]
all_keys
### END STRIP ###

As you can see in the [documentation](https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.load) for the `.load(...)` method, it accepts as `path` argument, either a string or a list of string.

In [None]:
# TODO: write the filepath using SCHEME, BUCKET_NAME, PREFIX: `filepath`
### BEGIN STRIP ###
file_paths = "s3://full-stack-bigdata-datasets/Big_Data/YOUTUBE/songs.json"
file_paths
### END STRIP ###

In [None]:
# TODO: load all the files into a DataFrame: `df`
### BEGIN STRIP ###
df = spark.read.format('json').load(file_paths, multiline=True)
### END STRIP ###

In [None]:
# TODO: count the number of rows in your DataFrame, it should be 100
### BEGIN STRIP ###
df.count()
### END STRIP ###

Looking great! But there's more, what they're not telling you in the documentation is that you can use wildcards in your paths:
- `*`: replaces any string
- `?`: replaces a single letter

This pattern is called globbing, you can learn about it on [Wikipedia](https://en.wikipedia.org/wiki/Glob_(programming).

We'll just check we have the same number of rows with this method

In [None]:
# TODO: count the number of rows in the DataFrame
### BEGIN STRIP ###
df.count()
### END STRIP ###

Once again, we should have 100 rows. **If that's not the case, you have an issue, go back and fix it.**

## Tidying up

---

We have multiple issues with our data.  **It does not look like "tidy data" at all.**  
First, we have rows within rows...
And second, most of the data resides in deeply nested structure within the column items...

We will fix the former, then handle the latter in the next notebook.

### 1. Fixing the rows
Ever heard about `EXPLODE` in SQL?

Luckily for us, they're an equivalent in PySpark: `.explode(...)`, here's the link to the [documentation](https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.functions.explode).  
What `.explode(...)` does, it "Returns a new row for each element in the given array or map."

If you remember properly, that's exactly the kind of structures we have in the schema of our DataFrame for the `items` column.

In [None]:
# TODO: print out the schema of `df`
### BEGIN STRIP ###
df.printSchema()
### END STRIP ###

In [None]:
# TODO: import the PySpark SQL functions following usual convention
### BEGIN STRIP ###
from pyspark.sql import functions as F
### END STRIP ###

In [None]:
# TODO: use `.explode(...)` on the `items` column and count the number of results
### BEGIN STRIP ###
df.select(F.explode('items')).count()
### END STRIP ###

If you got 3907 rows, you've made it, congrats! :)  
We will use this as our new working DataFrame:
- just do the same thing, but this time save into a variable named `items_df`
- don't forget to give a proper alias to your newly compute column: `items`
- at the end, as a sanity check, make sure we have the right amount of columns in our new DataFrame

In [None]:
# TODO: follow previous instructions
### BEGIN STRIP ###
items_df = df.select(F.explode('items').alias('items'))
items_df.count()
### END STRIP ###

We're making progress, we now have one row per result (e.g. song)!

But each song is a deeply nested structure... We will take care of this in the following notebook.

In [None]:
# TODO: save the DataFrame as a parquet
### BEGIN STRIP ###
# items_exploded_path = "s3://full-stack-bigdata-datasets/Big_Data/YOUTUBE/items_exploded.parquet"
# items_df.write.parquet(items_exploded_path, mode='overwrite')
### END STRIP ###