In [1]:
import findspark
findspark.init("/opt/spark")

In [2]:
from pyspark.sql.functions import explode, from_json, explode_outer
from pyspark.sql.functions import col
from pyspark.sql.types import *
from delta.tables import *

In [3]:
accessKeyId='dataops9'
secretAccessKey='Ankara06'

# create a SparkSession
spark = SparkSession.builder \
.appName("credits-silver") \
.master("local[2]") \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0,io.delta:delta-core_2.12:2.4.0") \
.config("fs.s3a.access.key", accessKeyId) \
.config("fs.s3a.secret.key", secretAccessKey) \
.config("fs.s3a.path.style.access", True) \
.config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
.config("fs.s3a.endpoint", "http://minio:9000") \
.config("spark.sql.debug.maxToStringFields", 1000) \
.getOrCreate()



:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a1607949-ca53-486e-848a-d8f898c40760;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.0 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.375 in central
	found io.delta#delta-core_2.12;2.4.0 in central
	found io.delta#delta-storage;2.4.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 307ms :: artifacts dl 10ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.375 from central in [default]
	io.delta#delta-core_2.12;2.4.0 from central in [default]
	io.delta#delta-storage;2.4.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	org.apache.hadoop#hadoop-aws;3.2.0 from central in [default]
	------------------------------------

In [4]:
df_credits = spark.read.format("parquet") \
.option("header", True) \
.option("inferSchema", True) \
.option("quote", "\"") \
.option("escape", "\"") \
.load("s3a://tmdb-bronze/credit/")

df_credits.printSchema()
df_credits1 = df_credits

24/03/23 03:34:03 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

root
 |-- movie_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- crew: string (nullable = true)
 |-- event_time: timestamp_ntz (nullable = true)



In [5]:
json_cast = ArrayType(StructType([
    StructField("cast_id", IntegerType()),
    StructField("character", StringType()),
    StructField("credit_id", StringType()),
    StructField("gender", IntegerType()),
    StructField("id", IntegerType()),
    StructField("name", StringType())
]))
#cast sütunundaki json değerlerini from_json ile  yukarıda belirtilen Json şemasına göre çözüp yeni bir sütun oluşturdum.
df_credits1 = df_credits1.withColumn("cast", from_json(col("cast"), json_cast))


json_crew = ArrayType(StructType([
    StructField("credit_id", StringType()),
    StructField("department", StringType()),
    StructField("gender", IntegerType()),
    StructField("id", IntegerType()),
    StructField("job", StringType()),
    StructField("name", StringType())
]))

df_credits1 = df_credits1.withColumn("crew", from_json(col("crew"), json_crew))

df_credits1.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- cast: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cast_id: integer (nullable = true)
 |    |    |-- character: string (nullable = true)
 |    |    |-- credit_id: string (nullable = true)
 |    |    |-- gender: integer (nullable = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- crew: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- credit_id: string (nullable = true)
 |    |    |-- department: string (nullable = true)
 |    |    |-- gender: integer (nullable = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- job: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- event_time: timestamp_ntz (nullable = true)



In [6]:
#explode_outer ile iç içe geçmiş arrayi düzleştirdim veistenen değerleri credits_cast tablosunda gösterdim/ explode_outerda boş veya null durumundaki satırlar korunur
credits_cast = df_credits1.select("movie_id", "title", explode_outer("cast").alias("cast"))
credits_cast = credits_cast.select("movie_id", "title", "cast.cast_id", "cast.character", "cast.credit_id", "cast.gender", "cast.id", "cast.name")
credits_cast = credits_cast.withColumn("movie_id", col("movie_id").cast("string"))

# id nulls must be imputed with -9999.
credits_cast = credits_cast.fillna({'credit_id': 0000000000})
credits_cast.printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- cast_id: integer (nullable = true)
 |-- character: string (nullable = true)
 |-- credit_id: string (nullable = false)
 |-- gender: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)



In [7]:
credits_cast.count()

                                                                                

20578

In [8]:
un_credits_cast = credits_cast.dropDuplicates(['movie_id', 'title','cast_id', 'character', 'credit_id','id', 'name'])

In [9]:
un_credits_cast.count()

                                                                                

20578

In [10]:
cast_deltaPath = "s3a://tmdb-silver/credits_cast"
cast_delta = DeltaTable.forPath(spark, cast_deltaPath)

In [11]:
cast_delta.alias("cast") \
    .merge(un_credits_cast.alias("cast_new"), "cast.movie_id = cast_new.movie_id AND cast.credit_id = cast_new.credit_id") \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

                                                                                

In [12]:
credits_crew = df_credits1.select("movie_id", "title", explode_outer("crew").alias("crew"))
credits_crew = credits_crew.select("movie_id", "title","crew.credit_id","crew.department", "crew.gender", "crew.id", "crew.job", "crew.name")
credits_crew = credits_crew.withColumn("movie_id", col("movie_id").cast("string"))

In [13]:
# id nulls must be imputed with -9999.
credits_crew = credits_crew.fillna({'credit_id': 0000000000})

In [14]:
credits_crew.count()

35083

In [15]:
un_credits_crew = credits_crew.dropDuplicates(['movie_id', 'id','credit_id'])

In [16]:
un_credits_crew.count()

                                                                                

35083

In [17]:
crew_deltaPath = 's3a://tmdb-silver/credits_crew'
crew_delta = DeltaTable.forPath(spark, crew_deltaPath)

In [18]:
crew_delta.alias("crew") \
    .merge(un_credits_crew.alias("crew_new"), "crew.movie_id = crew_new.movie_id AND crew.credit_id = crew_new.credit_id") \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

                                                                                

In [26]:
df1_de = spark.read.format("delta") \
.option("header", True) \
.option("inferSchema", True) \
.option("quote", "\"") \
.option("escape", "\"") \
.load('s3a://tmdb-silver/credits_cast')

In [27]:
df1_de.count()

                                                                                

106300

In [28]:
df1_de.printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- cast_id: integer (nullable = true)
 |-- character: string (nullable = true)
 |-- credit_id: integer (nullable = true)
 |-- gender: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)



In [29]:
df11_credits = spark.read.format("delta") \
.option("header", True) \
.option("inferSchema", True) \
.option("quote", "\"") \
.option("escape", "\"") \
.load("s3a://tmdb-silver/credits_crew")

In [30]:
df11_credits.printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- credit_id: string (nullable = true)
 |-- department: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- name: string (nullable = true)



In [31]:
un_credits_crew.printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- credit_id: string (nullable = false)
 |-- department: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- name: string (nullable = true)



In [17]:
un_credits_crew.count()

                                                                                

129609

In [33]:
crew_delta.toDF().printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- credit_id: string (nullable = true)
 |-- department: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- name: string (nullable = true)

