In [0]:
from pyspark.sql.functions import abs
from pyspark.sql.types import StringType, DoubleType, StructField, StructType, TimestampType, ArrayType, LongType
from pyspark.sql import SparkSession
from pyspark.sql.functions import row_number, col, desc, explode, when, trim
from pyspark.sql.window import Window

from pyspark.sql.functions import to_timestamp

In [0]:

def read_files_to_bronze():
    json_directory = "dbfs:/FileStore/tables/bronze/"

    movie_schema = StructType([
    StructField("Id", LongType(), True),
    StructField("Title", StringType(), True),
    StructField("Overview", StringType(), True),
    StructField("Tagline", StringType(), True),
    StructField("Budget", DoubleType(), True),
    StructField("Revenue", DoubleType(), True),
    StructField("ImdbUrl", StringType(), True),
    StructField("TmdbUrl", StringType(), True),
    StructField("PosterUrl", StringType(), True),
    StructField("BackdropUrl", StringType(), True),
    StructField("OriginalLanguage", StringType(), True),
    StructField("ReleaseDate", StringType(), True),
    StructField("RunTime", LongType(), True),
    StructField("Price", DoubleType(), True),
    StructField("CreatedDate", StringType(), True),
    StructField("UpdatedDate", StringType(), True),
    StructField("UpdatedBy", StringType(), True),
    StructField("CreatedBy", StringType(), True),
    StructField("genres", ArrayType(StructType([
        StructField("id", LongType(), True),
        StructField("name", StringType(), True)
        ])), True)
    ])

    root_schema = StructType([
        StructField("movie", ArrayType(movie_schema), True)
    ])
    
    movie_df = (spark.read
            .option("multiline","true")
            .option("schema", "root_schema")
            #  .option("inferSchema","true")
            .json(json_directory + "*.json")
    )

    # flatten json file
    bronze_df = (movie_df.select(explode(col("movie")).alias("movie"))
                    .select("movie.Id", "movie.Title", "movie.Overview", "movie.Tagline", "movie.Budget", "movie.Revenue",
                            "movie.ImdbUrl", "movie.TmdbUrl", "movie.PosterUrl", "movie.BackdropUrl",
                            "movie.OriginalLanguage", 
                            to_timestamp("movie.ReleaseDate", "yyyy-MM-dd'T'HH:mm:ss").alias("ReleaseDate"),
                            "movie.RunTime", "movie.Price",
                            to_timestamp("movie.CreatedDate", "yyyy-MM-dd'T'HH:mm:ss.SSSSSSS").alias("CreatedDate"),
                            "movie.UpdatedDate", "movie.UpdatedBy", "movie.CreatedBy",
                            explode("movie.genres").alias("genre"))
                    .select("Id", "Title", "Overview", "Tagline", "Budget", "Revenue", "ImdbUrl", "TmdbUrl",
                            "PosterUrl", "BackdropUrl", "OriginalLanguage", "ReleaseDate", "RunTime",
                            "Price", "CreatedDate",col("genre.id").alias("genre_id"), "genre.name"))
    return bronze_df


In [0]:
def process_bronze_to_silver():
    bronze_df = read_files_to_bronze()
    bronze_df = bronze_df.withColumn("quarantined", col("RunTime") < 0)

    quarantined_df = bronze_df.filter(col("quarantined") == True)
    quarantined_df.write.format("delta").save("dbfs:/FileStore/tables/bronze/quarantine/area", mode='overwrite')

    clean_df = bronze_df.filter(col("quarantined") == False)
    clean_df = clean_df.withColumn("Budget", when(col("Budget") < 1000000.0, 1000000.0).otherwise(col("Budget")))

    silver_movies_df = clean_df.unionByName(quarantined_df)

    # creat genres_lookup_df
    genres_lookup_df = (silver_movies_df
                    .select("genre_id", "name")
                    .filter(trim(col("name")) != "")
                    .distinct()
                    )
    
    genres_lookup_df.write.format("delta").mode("overwrite").save("dbfs:/FileStore/tables/silver/genres_lookup")
    
    # movie_genres_df
    movie_genres_df = silver_movies_df.select("Id", "genre_id")
    movie_genres_df.write.format("delta").mode("overwrite").save("dbfs:/FileStore/tables/silver/movie_genres")
    
    # create original_languages_df
    original_languages_df = silver_movies_df.select("OriginalLanguage").distinct()
    original_languages_df.write.format("delta").mode("overwrite").save("dbfs:/FileStore/tables/silver/original_languages")

    # create m:m relationship
    silver_movies_df = silver_movies_df.select("Id", "Title", "Overview", "Tagline", "Budget", "Revenue",
                                               "ImdbUrl", "TmdbUrl", "PosterUrl", "BackdropUrl",
                                               "OriginalLanguage", "ReleaseDate", "RunTime", "Price",
                                               "CreatedDate", "UpdatedDate", "UpdatedBy", "CreatedBy")
    
    bronze_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("dbfs:/FileStore/tables/bronze/movies")

    # 使用 partitionBy() 和 orderBy() 函数确保每部电影只出现一次
    silver_movies_df = silver_movies_df.withColumn("row_num", row_number().over(Window.partitionBy("Id").orderBy(col("CreatedDate").desc())))
    silver_movies_df = silver_movies_df.filter(col("row_num") == 1).drop("row_num")

    # write movie table to silver
    silver_movies_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("dbfs:/FileStore/tables/silver/movies")

process_bronze_to_silver()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-1569639608351509>, line 45[0m
[1;32m     42[0m     [38;5;66;03m# write movie table to silver[39;00m
[1;32m     43[0m     silver_movies_df[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m)[38;5;241m.[39moption([38;5;124m"[39m[38;5;124moverwriteSchema[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m)[38;5;241m.[39msave([38;5;124m"[39m[38;5;124mdbfs:/FileStore/tables/silver/movies[39m[38;5;124m"[39m)
[0;32m---> 45[0m process_bronze_to_silver()

File [0;32m<command-1569639608351509>, line 6[0m, in [0;36mprocess_bronze_to_silver[0;34m()[0m
[1;32m      3[0m bronze_df [38;5;241m=[39m bronze_df[38;5;241m.[39mw