## Define Variables

Assign path to DBFS, we will update 5 silver tables for movie casting activity.

In [0]:
bronzetable = "movie_classic_bronze"
silver_tablename = ['clean_genres_silver','clean_movie_silver','clean_OriginalLanguage_silver','clean_revenue_silver','clean_movie_genres_silver']

user = 'Han_Wu'
classicPipelinePath = f"/dbassignment1_v2/movie/user/"
bronzePath = classicPipelinePath + "bronze/"
silverPath1 = classicPipelinePath + "silver1/"
silverPath2 = classicPipelinePath + "silver2/"
silverPath3 = classicPipelinePath + "silver3/"
silverPath4 = classicPipelinePath + "silver4/"
silverPath5 = classicPipelinePath + "silver5/"
path_list = [silverPath1,silverPath2,silverPath3,silverPath4,silverPath5]

#silverQuarantinePath = classicPipelinePath + "silverQuarantine/"
#goldPath = classicPipelinePath + "gold/"

## Import Operation Functions

In [0]:
%run /Users/goodareny@gmail.com/movie_function

## Load Datasource
using defined function `read_batch_raw`

In [0]:
rawpath = 'dbfs:/FileStore/shared_uploads/goodareny@gmail.com'
rawDF = read_batch_raw(rawpath)

## Display the Raw

In [0]:
# display(rawDF)
rawDF.count() # got 9992 distinct orginal raw records

## Ingestion Metadata
using defined function `transform_raw` to add metadata: `datasource`, `ingesttime`, `status`, `p_ingestdate`. The `status` is `new` once imported.

In [0]:
transformedRawDF = transform_raw(rawDF) # raw dataframe with metadata

## WRITE Batch to a Bronze
use defined function `batch_writer` to write delta files into `bronzePath`

In [0]:
dbutils.fs.rm(bronzePath, recurse=True) # empty bronzepath to avoid duplicated ingestion for the begining

In [0]:
rawToBronzeWriter = batch_writer( transformedRawDF, partition_column="p_ingestdate") # mode is append
rawToBronzeWriter.save(bronzePath) 

## Register the Bronze Table in the Metastore
use `create_table` function to create bronze table from `bronzePath`

In [0]:
create_table(spark,bronzetable, bronzePath)

## Purge Raw File Path
manually delete updated raw

In [0]:
# dbutils.fs.rm(rawPath, recurse=True)

## Bronze to Silver Step

## Make Notebook Idempotent

In [0]:
dbutils.fs.rm(silverPath1, recurse=True)
dbutils.fs.rm(silverPath2, recurse=True)
dbutils.fs.rm(silverPath3, recurse=True)
dbutils.fs.rm(silverPath4, recurse=True)
dbutils.fs.rm(silverPath5, recurse=True)

## Load New Records from the Bronze Records
`read_batch_bronze` only loaded `new` from Bronze by default

In [0]:
bronzeDF = read_batch_bronze(bronzetable)
bronzeDF.count()

### Split the Bronze
use `bronze_silver_tracker` function, which splits Bronze into `silver_tracker_clean`, `silver_tracker_quarantine`

In [0]:
silver_tracker_clean, silver_tracker_quarantine = bronze_silver_tracker(bronzeDF)

### Write Clean Batch to a Silver Table
* `df_clean_to_silver` gives 5 silver dataframes: `genres`, `OriginalLanguage`,`movies`, `revenue`, `movie_genres`
* Then use ` silver_batch_writer` writes our silver delta files into different `silverPath`, default mode is `append`

In [0]:
silver_tracker_clean_genres, silver_tracker_clean_OriginalLanguage, silver_tracker_clean_movie, silver_tracker_clean_revenue, silver_tracker_clean_movie_genres = df_clean_to_silver(cleanrecords = silver_tracker_clean)

In [0]:
df_clean_list = [silver_tracker_clean_genres,silver_tracker_clean_movie,silver_tracker_clean_OriginalLanguage,silver_tracker_clean_revenue,silver_tracker_clean_movie_genres]

for i in range(0,5):
    silver_batch_writer(df_clean_list[i]).save(path_list[i])

## Register the Silver Tables in the Metastore
`create_table` for silver tables

In [0]:
for i in range(0,5):
  create_table(spark, silver_tablename[i], path_list[i])


## Update Bronze table to Reflect the Loads
`update_status` changes status to `loaded` for silver clean data and corresponding bronze data

In [0]:
update_status(spark, silver_tracker_clean, status="loaded", path=bronzePath)

## Update Quarantined records
`update_status` changes status to `quarantined` for silver quarantined data and corresponding bronze data

In [0]:
update_status(spark, silver_tracker_quarantine, status="quarantined", path=bronzePath)

## Handle Quarantined Records
So far, we got some quarantined records

In [0]:
%sql 
SELECT status, count(*) FROM movie_classic_bronze group by status

status,count(1)
quarantined,6
loaded,9986


## Quarantined Bronze Transformation
* `read_batch_bronze` selects quarantined data from bronze table and returns a dataframe to be cleaned
* then use `bronze_QuarantinedDF_transform` to clean data as required on `runtime`, `budget`
* use `bronze_QuarantinedDF_toSilver` to do transformation, resulting `5` silver dataframes, which uniquely combines both cleaned data from quarantined and data from clean silver

In [0]:
bronze_QuarantinedDF = read_batch_bronze(bronzetable, status = 'quarantined')
bronzeQuarantinedDF_tracker_trans = bronze_QuarantinedDF_transform(bronze_QuarantinedDF) # transfor quarantined data as requires

In [0]:
silver_bronzeQuarantinedDF_genres,silver_bronzeQuarantinedDF_OriginalLanguage,silver_bronzeQuarantinedDF_movie,silver_bronzeQuarantinedDF_revenue,silver_bronzeQuarantinedDF_movie_genre = bronze_QuarantinedDF_toSilver(bronzeQuarantinedDF_tracker_trans)

### Write Clean Quarantined Batch to Silver
` silver_batch_writer`overwirte silver tables, changing mode to `overwrite`

In [0]:
df_quarantined_list = [silver_bronzeQuarantinedDF_genres,silver_bronzeQuarantinedDF_movie,silver_bronzeQuarantinedDF_OriginalLanguage,silver_bronzeQuarantinedDF_revenue,silver_bronzeQuarantinedDF_movie_genre]
path_list = [silverPath1,silverPath2,silverPath3,silverPath4,silverPath5]

for i in range(0,5):
    silver_batch_writer(df_quarantined_list[i], mode = 'overwrite').save(path_list[i])

### Update Silver Tables from Clean Clean Quarantined Batch
`create_table` to update silver tables

In [0]:
for i in range(0,5):
  create_table(spark, silver_tablename[i], path_list[i])

## Update Quarantined records

In [0]:
update_status(spark, bronzeQuarantinedDF_tracker_trans, status="loaded", path=bronzePath)

In [0]:
%sql 
SELECT status, count(*) FROM movie_classic_bronze group by status

status,count(1)
loaded,9992
