In [0]:
rawPath = f"dbfs:/FileStore/tables/*.json"
bronzePath = f"/movie/bronzes22"
silverPath = f"/movie/silver22"
silverGenrePath = f"/movie/silver_genre22"
silverMovieGenrePath = f"/movie/silver_movie_genre22"
silverOriginalLanguagePath = f"/movie/silver_originalLanguage22"
silverQuaTransPath = f"/movie/silver_qua_trans22"

In [0]:
from pyspark.sql.functions import col, explode
from pyspark.sql.functions import current_timestamp, lit, to_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType, LongType, DateType, MapType
from pyspark.sql.functions import from_json
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql import DataFrame
from delta.tables import DeltaTable
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window


In [0]:
def read_batch_raw(rawPath):
    return spark.read.option("multiLine", "true").json("dbfs:/FileStore/tables/*.json")


In [0]:
def read_batch_bronze(spark):
    return spark.read.table("bronze_movie2").filter("status= 'new'")

In [0]:
def transform_raw(rawDF):
    rawDF = rawDF.withColumn('movie', explode('movie'))
    return rawDF.select(
        to_json(col("movie")).alias("movie"),
        lit("antra_movieshops").alias("datasource"),
        current_timestamp().alias("ingesttime"),
        lit("new").alias("status"),
        current_timestamp().cast("date").alias("ingestdate"),
     )
    

In [0]:
def transform_bronze(bronzeDF): 
    json_schema = StructType(fields=[
        StructField('BackdropUrl', StringType(), True),
        StructField('Budget', StringType(), True),
        StructField('CreatedDate', DateType(), True),
        StructField('Id', IntegerType(), True),
        StructField('ImdbUrl', StringType(), True),
        StructField('OriginalLanguage', StringType(), True),
        StructField('Overview', StringType(), True),
        StructField('PosterUrl', StringType(), True),
        StructField('Price', DoubleType(), True),
        StructField('ReleaseDate', StringType(), True),
        StructField('Revenue', DoubleType(), True),
        StructField('RunTime', DoubleType(), True),
        StructField('Tagline', StringType(), True),
        StructField('Title', StringType(), True),
        StructField('TmdbUrl', StringType(), True),
        StructField(
            'genres', ArrayType(
                StructType([
                    StructField('id', IntegerType(), True),
                    StructField('name', StringType(), True)
                ])
            )
        )
    ])

    bronzeAugmentedDF = bronzeDF.withColumn(
        "nested_json", from_json(col("movie"), json_schema)
    )
    
    silver_movies = bronzeAugmentedDF.select(
        "movie",
        "nested_json.*")
    return silver_movies.select(
         'movie',
         'BackdropUrl',
         'Budget',
         'CreatedDate',
         'Id',
         'ImdbUrl',
         'OriginalLanguage',
         'Overview',
         'PosterUrl',
         'Price',
         col('ReleaseDate').alias('p_ReleasedDate'),
         'Revenue',
         'RunTime',
         'Tagline',
         'Title',
         'TmdbUrl',
         'genres',
    ).dropDuplicates()
    

In [0]:
def adjust_bronze(transformedBronzeDF, languageDF):
    adjustedBronzeDF = transformedBronzeDF.drop("genres")
    adjustedBronzeDF = adjustedBronzeDF.join(languageDF, "OriginalLanguage")
    return adjustedBronzeDF.drop("OriginalLanguage")

In [0]:
def generate_clean_and_quarantine_dataframes(transformedBronzeDF: DataFrame):
    return (transformedBronzeDF.filter("RunTime >= 0"),
            transformedBronzeDF.filter("RunTime < 0")
    )

In [0]:
def generate_genre_silver(transformedBronzeDF: DataFrame):
    genres = transformedBronzeDF.select(
        "genres"
    )
    silver_genre_exploded = (genres.withColumn(
    "genre_json", explode("genres"))
                         .drop("genres")
                         .dropDuplicates()
    )
    return silver_genre_exploded.select(
        col("genre_json.id").alias("genre_id"),
        col("genre_json.name").alias("genre_name")
    ).dropDuplicates(["genre_id"])

In [0]:
def generate_movie_genre_silver(transformedBronzeDF: DataFrame):
    genres = transformedBronzeDF.select(
        "Id",
        "genres"
    )
    
    silver_genre_exploded = (genres.withColumn(
        "genre_json", explode("genres"))
                             .drop("genres")
                             .dropDuplicates()
                            )
    silver_genre_exploded = silver_genre_exploded.select(
        col("Id").alias("movie_id"),
        col("genre_json.id").alias("genre_id")
    ).dropDuplicates()
    return silver_genre_exploded.withColumn("movie_genre_id", monotonically_increasing_id())

In [0]:
def generate_originalLanguage_silver(transformedBronzeDF: DataFrame):
    from pyspark.sql.functions import to_json
    languages = transformedBronzeDF.select(
        'OriginalLanguage'
    ).dropDuplicates()
    languages = languages.withColumn("language_id", row_number().over(Window.orderBy("OriginalLanguage")))
    return languages.select(
            "language_id",
            'OriginalLanguage'
    )

In [0]:
def batch_writer_bronze(dataframe: DataFrame, partition_column: str) -> DataFrame:
    return(
        dataframe.select(
            "movie", 
            "datasource",
            "ingesttime",
            "status", 
            col("ingestdate").alias(partition_column),
        )
      .write.format("delta")
      .mode("append")
      .partitionBy(partition_column)
    )
    

In [0]:
def batch_writer_silver(dataframe: DataFrame, partition_column: str) -> DataFrame:
    return (dataframe.select("*").drop("movie")
      .write.format("delta")
      .mode("append")
      .partitionBy("p_ReleasedDate")
    )

In [0]:
def batch_writer_silver_genre_movie_language(dataframe: DataFrame) -> DataFrame:
    return (dataframe.select("*")
      .write.format("delta")
      .mode("append")
    )

In [0]:
def create_table(dataframe, filePath):
    spark.sql(
        """
    DROP TABLE IF EXISTS "{dataframe}"
    """
    )

    spark.sql(
        f"""
    CREATE TABLE "{dataframe}"
    USING DELTA
    LOCATION "{filePath}"
    """
    )

In [0]:
def update_bronze_table_status(
    spark: SparkSession, bronzeTablePath: str, dataframe: DataFrame, status: str
) -> bool:

    bronzeTable = DeltaTable.forPath(spark, bronzePath)
    dataframeAugmented = dataframe.withColumn("status", lit(status))

    update_match = "bronze.movie = dataframe.movie"
    update = {"status": "dataframe.status"}

    (
        bronzeTable.alias("bronze")
        .merge(dataframeAugmented.alias("dataframe"), update_match)
        .whenMatchedUpdate(set=update)
        .execute()
    )

    return True

# Raw to Bronze

In [0]:
rawDF = read_batch_raw(rawPath)
transformedRawDF = transform_raw(rawDF)

rawToBronzeWriter = batch_writer_bronze(
  dataframe=transformedRawDF, partition_column="p_ingestdate"
)
rawToBronzeWriter.save(bronzePath)

spark.sql(
    """
DROP TABLE IF EXISTS bronze_movie2
"""
)

spark.sql(
    f"""
CREATE TABLE bronze_movie2
USING DELTA
LOCATION "{bronzePath}"
"""
)

# Bronze to Silver

In [0]:
bronzeDF = read_batch_bronze(spark)
transformedBronzeDF = transform_bronze(bronzeDF)

# genre silver table: genre_id, genre_name
silver_genre = generate_genre_silver(transformedBronzeDF)
bronzeToSilverWriter = batch_writer_silver_genre_movie_language(
    dataframe=silver_genre
)
bronzeToSilverWriter.save(silverGenrePath)
spark.sql(
    """
DROP TABLE IF EXISTS silver_genre2
"""
)

spark.sql(
    f"""
CREATE TABLE silver_genre2
USING DELTA
LOCATION "{silverGenrePath}"
"""
)

# movie genre silver table: junction table: Id, genre_id
silver_movie_genre = generate_movie_genre_silver(transformedBronzeDF)
bronzeToSilverWriter = batch_writer_silver_genre_movie_language(
    dataframe=silver_movie_genre
)
bronzeToSilverWriter.save(silverMovieGenrePath)
spark.sql(
    """
DROP TABLE IF EXISTS silver_movie_genre2
"""
)

spark.sql(
    f"""
CREATE TABLE silver_movie_genre2
USING DELTA
LOCATION "{silverMovieGenrePath}"
"""
)


# original language silver table: LanguageCode, LanguageName
silver_originalLanguage = generate_originalLanguage_silver(transformedBronzeDF)
bronzeToSilverWriter = batch_writer_silver_genre_movie_language(
    dataframe=silver_originalLanguage
)
bronzeToSilverWriter.save(silverOriginalLanguagePath)
spark.sql(
    """
DROP TABLE IF EXISTS silver_original_language2
"""
)

spark.sql(
    f"""
CREATE TABLE silver_original_language2
USING DELTA
LOCATION "{silverOriginalLanguagePath}"
"""
)

# adjust transformed bronze table by removing 'genre' 
# and changing OriginalLanguage with language_id
adjustedBronzeDF = adjust_bronze(transformedBronzeDF, silver_originalLanguage)

# clean and quarantine movie silver table
(silverCleanDF, silverQuarantineDF) = generate_clean_and_quarantine_dataframes(
    adjustedBronzeDF
)
bronzeToSilverWriter = batch_writer_silver(
    dataframe=silverCleanDF, partition_column="p_ReleaseDate"
)
bronzeToSilverWriter.save(silverPath)
spark.sql(
    """
DROP TABLE IF EXISTS silver_movie2
"""
)

spark.sql(
    f"""
CREATE TABLE silver_movie2
USING DELTA
LOCATION "{silverPath}"
"""
)

update_bronze_table_status(spark, bronzePath, silverCleanDF, "loaded")
update_bronze_table_status(spark, bronzePath, silverQuarantineDF, "quarantined")

# Silver Update: Correct Negative Runtime

In [0]:
bronzeQuarantinedDF = spark.read.table("bronze_movie2").filter("status= 'quarantined'")
TransformedQuarantinedDF = transform_bronze(bronzeQuarantinedDF)

In [0]:
from  pyspark.sql.functions import abs

TransformedQuarantinedDF.withColumn('Runtime', abs(col('Runtime'))).count()

In [0]:
bronzeToSilverWriter = batch_writer_silver(
    dataframe=TransformedQuarantinedDF, partition_column="p_ReleaseDate"
)
bronzeToSilverWriter.save(silverQuaTransPath)

# PROBLEM: How to insert into existing table?
spark.sql(
    f"""
INSERT INTO silver_movie2
USING DELTA
LOCATION "{silverQuaTransPath}"
"""
)