<img width="200" style="float:left" 
     src="https://upload.wikimedia.org/wikipedia/commons/f/f3/Apache_Spark_logo.svg" />

# Sections
* [Description](#0)
* [1. Setup](#1)
  * [1.1 Start Hadoop](#1.1)  
  * [1.2 Set Global Config](#1.2)
  * [1.3 Create SparkSession](#1.3)
* [2. Lab](#2)
  * [2.1 Check Files](#2.1)
  * [2.2 Read Bronze DataFrames](#2.3)
  * [2.3 Transform Bronze DataFrames](#2.3)
  * [2.4 Write DataFrames to Silver](#2.4)
  * [2.5 All at once](#2.5)
* [3. TearDown](#3)
  * [3.1 Stop Hadoop](#3.1)

<a id='0'></a>
## Description
<p>
<div>The goals for this lab are:</div>
<ul>    
    <li>Get familiar with Spark DataFrames API</li>
    <li>Apply some transformations using Spark DataFrames API</li>
    <li>Promote data from bronze to silver layer in the datalake using Spark DataFrames API</li>
</ul>    
</p>

<a id='1'></a>
## 1. Setup

Since we are going to process data stored from HDFS let's start the service

<a id='1.1'></a>
### 1.1 Start Hadoop

Start Hadoop <a href="http://localhost:2024/">here </a>

<p>
<img style="width:48px" src="https://cdn.iconscout.com/icon/free/png-256/free-hadoop-226007.png" /> 
</p>

<a id='1.2'></a>
### 1.2 Set Global Config

I'm changing pandas max column width property to improve data displaying

In [None]:
import pandas as pd
pd.set_option('display.max_colwidth', None)

In [None]:
import numpy as np
np.bool = np.bool_

In [None]:
#current notebook name
notebook_name = __session__.replace('.ipynb','')[__session__.rfind('/')+1:] 

In [None]:
# HDFS base paths
hdfs_lakehouse_base_path = 'hdfs://localhost:9000/lakehouse/'
hdfs_warehouse_base_path = 'hdfs://localhost:9000/warehouse'

<a id='1.3'></a>
### 1.3 Create SparkSession
By setting this environment variable we can include extra libraries in our Spark cluster

In [None]:
import os
dependencies = ["org.apache.spark:spark-avro_2.12:3.5.0",
                "io.delta:delta-iceberg_2.12:3.0.0"]
os.environ['PYSPARK_SUBMIT_ARGS']= f"--packages {','.join(dependencies)} pyspark-shell"
os.environ['PYARROW_IGNORE_TIMEZONE'] = 'true'

The first thing always is to create the SparkSession

In [None]:
from pyspark.sql.session import SparkSession

spark = (SparkSession.builder
    .appName(notebook_name)
    .config("spark.log.level","ERROR")
    .config("spark.sql.warehouse.dir",hdfs_warehouse_base_path)
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .enableHiveSupport()
    .getOrCreate()
)

<a id='2'></a>
## 2. Lab

<a id='2.1'></a>
### 2.1 Check  Files

Check you have the data ready in HDFS

http://localhost:50070/explorer.html#/lakehouse/bronze/movielens

<a id='2.2'></a>
### 2.2 Read Bronze DataFrames

In [None]:
movies_brz = (spark.read
              .option("header","true")
              .option("escape","\"")
              .csv(f"{hdfs_lakehouse_base_path}/bronze/movielens/movies/"))

In [None]:
movies_brz.printSchema()

In [None]:
movies_brz.limit(10).toPandas()

In [None]:
ratings_brz = spark.read.option("header","true").csv(f"{hdfs_lakehouse_base_path}/bronze/movielens/ratings/")
ratings_brz.limit(10).toPandas()

In [None]:
ratings_brz.printSchema()

In [None]:
links_brz = spark.read.option("header","true").csv(f"{hdfs_lakehouse_base_path}/bronze/movielens/links/")
links_brz.show(10,False)

In [None]:
links_brz.printSchema()

In [None]:
trailers_brz = spark.read.option("header","true").csv(f"{hdfs_lakehouse_base_path}/bronze/movielens/trailers/")
trailers_brz.limit(10).show(truncate=False)

In [None]:
trailers_brz.printSchema()

<a id='2.3'></a>
### 2.3 Transform Bronze DataFrames

In [None]:
from pyspark.sql.functions import *

@udf("array<string>")
def parse_title(t:str):
    import re
    titleRegex = re.compile(r'^(.+)\((\d{4})\)$')
    m = titleRegex.search(t.strip())
    if m:
        title,year= m.groups()
        return [title.strip(),year.strip()]
    else:
        return [t,None]
    


movies_slv = movies_brz.select(
    col("movieId").cast("bigint"),
    parse_title(col("title"))[0].alias("title"),
    parse_title(col("title"))[1].cast("integer").alias("year"),
    split("genres","\|").alias("genres")
    )

movies_slv.toPandas()

In [None]:
movies_slv.where(col("year").isNull()).toPandas()

In [None]:
# there are some problems
#movies_std.where(col("year").isNull()).toPandas()
movies_slv = movies_slv.withColumn("genres",array_remove(col("genres"),"(no genres listed)"))
movies_slv.where(col("year").isNull()).toPandas()

In [None]:
movies_slv.printSchema()

In [None]:
from pyspark.sql.functions import *
ratings_slv = ratings_brz.select(
    col("userId").cast("bigint"),
    col("movieId").cast("bigint"),
    col("rating").cast("double"),    
    to_timestamp(from_unixtime("timestamp")).alias("timestamp")
)
ratings_slv.toPandas()

In [None]:
ratings_slv.printSchema()

In [None]:
from pyspark.sql.functions import *

links_slv = links_brz.select(
    col("movieId").cast("bigint"),
    col("imdbId"),    
    concat(lit("http://www.imdb.com/title/tt"),col("imdbId"),lit("/")).alias("imdbUrl"),
    col("tmdbId"),    
    concat(lit("https://www.themoviedb.org/movie/"),col("imdbId"),lit("/")).alias("tmdbUrl")
)
links_slv.toPandas()

In [None]:
links_slv.printSchema()

In [None]:
from pyspark.sql.functions import *

trailers_slv = trailers_brz.select(
    col("movieId").cast("bigint"),
    col("youtubeId"),    
    concat(lit("https://www.youtube.com/embed/"),col("youtubeId"),lit("/")).alias("youtubeUrl")
)
trailers_slv.toPandas()

In [None]:
trailers_slv.printSchema()

<a id='2.4'></a>
### 2.4 Write DataFrames to Silver

In [None]:
spark.sql("DROP SCHEMA IF EXISTS movielens CASCADE")
spark.sql("CREATE SCHEMA IF NOT EXISTS movielens")

In [None]:
(movies_slv.write
           .format("delta")
           .mode("overwrite")
           .option("path",f"{hdfs_lakehouse_base_path}/silver/movielens/movies/")
           .saveAsTable("movielens.movies"))

(ratings_slv.write
            .format("delta")
            .mode("overwrite")
            .option("path",f"{hdfs_lakehouse_base_path}/silver/movielens/ratings/")
            .saveAsTable("movielens.ratings"))

(links_slv.write
          .format("delta")
          .mode("overwrite")
          .option("path",f"{hdfs_lakehouse_base_path}/silver/movielens/links/")
          .saveAsTable("movielens.links"))

(trailers_slv.write
             .format("delta")
             .mode("overwrite")
             .option("path",f"{hdfs_lakehouse_base_path}/silver/movielens/trailers/")
             .saveAsTable("movielens.trailers"))

In [None]:
%%sparksql
show databases

In [None]:
%%sparksql
use movielens

In [None]:
%%sparksql
show tables

In [None]:
%%sparksql
select *
from movielens.movies
limit 10

<a id='2.5'></a>
### 2.5 All at once

Dividing your code in multiple cells is **NOT the way** you would do it for a production workload.<br/>
We've been using variables, printing schema and data (toPandas and show) just to check our transformations are correct<br/>
You would code the application like this:

In [None]:
spark.sql("DROP SCHEMA IF EXISTS movielens CASCADE")
spark.sql("CREATE SCHEMA IF NOT EXISTS movielens")

In [None]:
from pyspark.sql.functions import *

@udf("array<string>")
def parse_title(t:str):
    import re
    titleRegex = re.compile(r'^(.+)\((\d{4})\)$')
    m = titleRegex.search(t.strip())
    if m:
        title,year= m.groups()
        return [title.strip(),year.strip()]
    else:
        return [t,None]

#movies
(spark.read
     .option("header","true")
     .option("escape","\"")
     .csv(f"{hdfs_lakehouse_base_path}/bronze/movielens/movies/")
     .select(
        col("movieId").cast("bigint"),
        parse_title(col("title"))[0].alias("title"),
        parse_title(col("title"))[1].cast("integer").alias("year"),
        split("genres","\|").alias("genres")
     )
     .write
     .format("delta") 
     .mode("overwrite")
     .option("path",f"{hdfs_lakehouse_base_path}/silver/movielens/movies/")
     .saveAsTable("movielens.movies")
)
  
# ratings
(spark.read
    .option("header","true")
    .csv(f"{hdfs_lakehouse_base_path}/bronze/movielens/ratings/")
    .select(
        col("userId").cast("bigint"),
        col("movieId").cast("bigint"),
        col("rating").cast("double"),    
        to_timestamp(from_unixtime("timestamp")).alias("timestamp")
    )
    .write
    .format("delta")
    .mode("overwrite")
    .option("path",f"{hdfs_lakehouse_base_path}/silver/movielens/ratings/")
    .saveAsTable("movielens.ratings")
)

#links
(spark.read
     .option("header","true")
     .csv(f"{hdfs_lakehouse_base_path}/bronze/movielens/links/")
     .select(
        col("movieId").cast("bigint"),
        col("imdbId"),    
        concat(lit("http://www.imdb.com/title/tt"),col("imdbId"),lit("/")).alias("imdbUrl"),
        col("tmdbId"),    
        concat(lit("https://www.themoviedb.org/movie/"),col("imdbId"),lit("/")).alias("tmdbUrl"))
     .write
     .format("delta")
     .mode("overwrite")
     .option("path",f"{hdfs_lakehouse_base_path}/silver/movielens/links/")
     .saveAsTable("movielens.links")
)

#trailers
(spark.read
     .option("header","true")
     .csv(f"{hdfs_lakehouse_base_path}/bronze/movielens/trailers/")
     .select(
        col("movieId").cast("bigint"),
        col("youtubeId"),    
        concat(lit("https://www.youtube.com/embed/"),col("youtubeId"),lit("/")).alias("youtubeUrl"))
     .write
     .format("delta")
     .mode("overwrite")
     .option("path",f"{hdfs_lakehouse_base_path}/silver/movielens/trailers/")
     .saveAsTable("movielens.trailers")
)

In [None]:
%%sparksql
select *
from movielens.movies
limit 10

<a id='3'></a>
## 3. Tear Down

Once we complete the the lab we can stop all the services

<a id='3.1'></a>
### 3.1 Stop Hadoop
Stop Hadoop <a href="http://localhost:2024/">here </a>
<p>
<img style="width:48px" src="https://cdn.iconscout.com/icon/free/png-256/free-hadoop-226007.png" /> 
</p>