In [25]:
import findspark
findspark.init("/opt/spark")

In [26]:
from pyspark.sql import SparkSession, functions as F
from pyspark import SparkContext
import configparser

#### accessKeyId,secretAccessKey'i db_conn'dan okuyacagız.

In [27]:
config = configparser.RawConfigParser()

config.read('/dataops/db_conn')
accessKeyId = config.get('DB', 'user_name')
secretAccessKey = config.get('DB', 'password')

In [28]:
spark = SparkSession.builder \
.appName("final project") \
.master("local[2]") \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0,io.delta:delta-core_2.12:2.4.0,io.delta:delta-storage:2.4.0") \
.config("spark.hadoop.fs.s3a.access.key", accessKeyId) \
.config("spark.hadoop.fs.s3a.secret.key", secretAccessKey) \
.config("spark.hadoop.fs.s3a.path.style.access", True) \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate() 

## **read from minio**

### **- credits data**

In [29]:
df_cred = spark.read \
.option("header", "true") \
.option("inferSchema",True) \
.load('s3a://tmdb-bronze/credits')

In [30]:
df_cred.limit(5).show()

+--------+--------------------+--------------------+--------------------+--------------------+
|movie_id|               title|                cast|                crew|          event_time|
+--------+--------------------+--------------------+--------------------+--------------------+
|   19995|              Avatar|[{"cast_id": 242,...|[{"credit_id": "5...|2023-11-18 01:13:...|
|     285|Pirates of the Ca...|[{"cast_id": 4, "...|[{"credit_id": "5...|2023-11-18 01:13:...|
|  206647|             Spectre|[{"cast_id": 1, "...|[{"credit_id": "5...|2023-11-18 01:13:...|
|   49026|The Dark Knight R...|[{"cast_id": 2, "...|[{"credit_id": "5...|2023-11-18 01:13:...|
|   49529|         John Carter|[{"cast_id": 5, "...|[{"credit_id": "5...|2023-11-18 01:13:...|
+--------+--------------------+--------------------+--------------------+--------------------+



In [32]:
df_cred.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- crew: string (nullable = true)
 |-- event_time: timestamp_ntz (nullable = true)



##### notlar:
cast ve crew sütunları --> json structured.

In [33]:
from pyspark.sql.functions import from_json, schema_of_json

In [34]:
cast_schema = schema_of_json(df_cred.select("cast").limit(1).collect()[0][0])
crew_schema = schema_of_json(df_cred.select("crew").limit(1).collect()[0][0])

In [35]:
df1_cred = df_cred.withColumn("cast", from_json(df_cred["cast"], cast_schema)) \
.withColumn("crew", from_json(df_cred["crew"], crew_schema))

In [36]:
df1_cred.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- cast: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cast_id: long (nullable = true)
 |    |    |-- character: string (nullable = true)
 |    |    |-- credit_id: string (nullable = true)
 |    |    |-- gender: long (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- order: long (nullable = true)
 |-- crew: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- credit_id: string (nullable = true)
 |    |    |-- department: string (nullable = true)
 |    |    |-- gender: long (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- job: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- event_time: timestamp_ntz (nullable = true)



In [37]:
from pyspark.sql.functions import explode, col, format_number

df_cast = df1_cred.select("movie_id", "title", explode("cast").alias("cast"), "event_time")
df_crew = df1_cred.select("movie_id", "title", explode("crew").alias("crew"), "event_time")

In [38]:
df_cast.limit(5).show()

+--------+------+--------------------+--------------------+
|movie_id| title|                cast|          event_time|
+--------+------+--------------------+--------------------+
|   19995|Avatar|{242, Jake Sully,...|2023-11-18 01:13:...|
|   19995|Avatar|{3, Neytiri, 52fe...|2023-11-18 01:13:...|
|   19995|Avatar|{25, Dr. Grace Au...|2023-11-18 01:13:...|
|   19995|Avatar|{4, Col. Quaritch...|2023-11-18 01:13:...|
|   19995|Avatar|{5, Trudy Chacon,...|2023-11-18 01:13:...|
+--------+------+--------------------+--------------------+



In [39]:
df_cast.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- cast: struct (nullable = true)
 |    |-- cast_id: long (nullable = true)
 |    |-- character: string (nullable = true)
 |    |-- credit_id: string (nullable = true)
 |    |-- gender: long (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- order: long (nullable = true)
 |-- event_time: timestamp_ntz (nullable = true)



In [40]:
df_crew.limit(5).show()

+--------+------+--------------------+--------------------+
|movie_id| title|                crew|          event_time|
+--------+------+--------------------+--------------------+
|   19995|Avatar|{52fe48009251416c...|2023-11-18 01:13:...|
|   19995|Avatar|{539c47ecc3a36810...|2023-11-18 01:13:...|
|   19995|Avatar|{54491c89c3a3680f...|2023-11-18 01:13:...|
|   19995|Avatar|{54491cb70e0a2674...|2023-11-18 01:13:...|
|   19995|Avatar|{539c4a4cc3a36810...|2023-11-18 01:13:...|
+--------+------+--------------------+--------------------+



In [41]:
df_crew.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- crew: struct (nullable = true)
 |    |-- credit_id: string (nullable = true)
 |    |-- department: string (nullable = true)
 |    |-- gender: long (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- job: string (nullable = true)
 |    |-- name: string (nullable = true)
 |-- event_time: timestamp_ntz (nullable = true)



#### **cast table**

In [42]:
pre_cast = df_cast.select(
    col("movie_id"),
    col("title"),
    col("cast.cast_id").alias("cast_id"),
    col("cast.character").alias("character"),
    col("cast.credit_id").alias("credit_id"),
    col("cast.gender").alias("gender"),
    col("cast.id").alias("id"),
    col("cast.name").alias("name"))

In [43]:
pre_cast.limit(5).show()

+--------+------+-------+-------------------+--------------------+------+-----+------------------+
|movie_id| title|cast_id|          character|           credit_id|gender|   id|              name|
+--------+------+-------+-------------------+--------------------+------+-----+------------------+
|   19995|Avatar|    242|         Jake Sully|5602a8a7c3a368553...|     2|65731|   Sam Worthington|
|   19995|Avatar|      3|            Neytiri|52fe48009251416c7...|     1| 8691|       Zoe Saldana|
|   19995|Avatar|     25|Dr. Grace Augustine|52fe48009251416c7...|     1|10205|  Sigourney Weaver|
|   19995|Avatar|      4|      Col. Quaritch|52fe48009251416c7...|     2|32747|      Stephen Lang|
|   19995|Avatar|      5|       Trudy Chacon|52fe48009251416c7...|     1|17647|Michelle Rodriguez|
+--------+------+-------+-------------------+--------------------+------+-----+------------------+



In [44]:
pre_cast.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- cast_id: long (nullable = true)
 |-- character: string (nullable = true)
 |-- credit_id: string (nullable = true)
 |-- gender: long (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



In [45]:
# null value check
for column in pre_cast.columns:
 col_count = pre_cast.filter( F.col(column).isNull() ).count()
 if ( col_count >= 0):
     print("{} has {} null values.".format(column, col_count))

movie_id has 0 null values.
title has 0 null values.


                                                                                

cast_id has 0 null values.


                                                                                

character has 0 null values.


                                                                                

credit_id has 0 null values.


                                                                                

gender has 0 null values.


[Stage 27:>                                                         (0 + 2) / 2]

id has 0 null values.


[Stage 30:>                                                         (0 + 2) / 2]

name has 0 null values.


                                                                                

In [46]:
# hidden null check
for column in pre_cast.columns:
 col_count = pre_cast.filter( F.col(column) == "").count()
 if ( col_count >= 0):
     print("{} has {} null values.".format(column, col_count))

movie_id has 0 null values.
title has 0 null values.
cast_id has 0 null values.


                                                                                

character has 683 null values.
credit_id has 0 null values.
gender has 0 null values.
id has 0 null values.
name has 0 null values.


In [47]:
cast = pre_cast

#### **crew table**

In [48]:
pre_crew = df_crew.select(
    col("movie_id"),
    col("title"),
    col("crew.credit_id").alias("credit_id"),
    col("crew.department").alias("department"),
    col("crew.gender").alias("gender"),
    col("crew.id").alias("id"),
    col("crew.job").alias("job"),
    col("crew.name").alias("name"))

In [49]:
pre_crew.limit(5).show()

+--------+------+--------------------+----------+------+----+--------------------+-----------------+
|movie_id| title|           credit_id|department|gender|  id|                 job|             name|
+--------+------+--------------------+----------+------+----+--------------------+-----------------+
|   19995|Avatar|52fe48009251416c7...|   Editing|     0|1721|              Editor|Stephen E. Rivkin|
|   19995|Avatar|539c47ecc3a36810e...|       Art|     2| 496|   Production Design|      Rick Carter|
|   19995|Avatar|54491c89c3a3680fb...|     Sound|     0| 900|      Sound Designer|Christopher Boyes|
|   19995|Avatar|54491cb70e0a26748...|     Sound|     0| 900|Supervising Sound...|Christopher Boyes|
|   19995|Avatar|539c4a4cc3a36810c...|Production|     1|1262|             Casting|        Mali Finn|
+--------+------+--------------------+----------+------+----+--------------------+-----------------+



In [50]:
pre_crew.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- credit_id: string (nullable = true)
 |-- department: string (nullable = true)
 |-- gender: long (nullable = true)
 |-- id: long (nullable = true)
 |-- job: string (nullable = true)
 |-- name: string (nullable = true)



In [51]:
# null value check
for column in pre_crew.columns:
 col_count = pre_crew.filter( F.col(column).isNull() ).count()
 if ( col_count >= 0):
     print("{} has {} null values.".format(column, col_count))

movie_id has 0 null values.
title has 0 null values.


                                                                                

credit_id has 0 null values.


                                                                                

department has 0 null values.


                                                                                

gender has 0 null values.


                                                                                

id has 0 null values.


                                                                                

job has 0 null values.


[Stage 75:>                                                         (0 + 2) / 2]

name has 0 null values.




In [52]:
# hidden null check
for column in pre_crew.columns:
 col_count = pre_crew.filter( F.col(column) == "").count()
 if ( col_count >= 0):
     print("{} has {} null values.".format(column, col_count))

                                                                                

movie_id has 0 null values.
title has 0 null values.


                                                                                

credit_id has 0 null values.
department has 0 null values.
gender has 0 null values.
id has 0 null values.


                                                                                

job has 0 null values.
name has 0 null values.


                                                                                

In [53]:
crew = pre_crew

### **-movies data**

In [54]:
df_movies = spark.read \
.option("header", "true") \
.option("inferSchema",True) \
.load('s3a://tmdb-bronze/movies')

In [55]:
import pandas as pd
pd.set_option('display.max_columns', None)

In [56]:
df_movies.printSchema()

root
 |-- budget: long (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: long (nullable = true)
 |-- keywords: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: long (nullable = true)
 |-- runtime: double (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: long (nullable = true)
 |-- event_time: timestamp_ntz (nullable = true)



In [57]:
genres_schema = schema_of_json(df_movies.select("genres").limit(1).collect()[0][0])
keywords_schema = schema_of_json(df_movies.select("keywords").limit(1).collect()[0][0])
prod_comp_schema=schema_of_json(df_movies.select("production_companies").limit(1).collect()[0][0])
prod_countries_schema=schema_of_json(df_movies.select("production_countries").limit(1).collect()[0][0])
spoken_lang_schema=schema_of_json(df_movies.select("spoken_languages").limit(1).collect()[0][0])

In [58]:
df1_movies = df_movies.withColumn("genres", from_json(df_movies["genres"], genres_schema)) \
.withColumn("keywords", from_json(df_movies["keywords"], keywords_schema)) \
.withColumn("production_companies", from_json(df_movies["production_companies"], prod_comp_schema)) \
.withColumn("production_countries", from_json(df_movies["production_countries"], prod_countries_schema)) \
.withColumn("spoken_languages", from_json(df_movies["spoken_languages"], spoken_lang_schema)) 

In [59]:
df1_movies.printSchema()

root
 |-- budget: long (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: long (nullable = true)
 |-- keywords: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- production_companies: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- production_countries: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- iso_3166_1: string (nullable = true)
 |    |    |-- name: str

In [60]:
pre_movies = df1_movies.select("id", "title", "budget", "homepage", "original_language", "original_title", "overview", "popularity", "release_date", "revenue", "runtime", "status", "tagline","vote_average", "vote_count")
pre_genres = df1_movies.select("id", explode("genres").alias("genres"))
pre_keywords = df1_movies.select("id", explode("keywords").alias("keywords"))
pre_production_companies = df1_movies.select("id", explode("production_companies").alias("production_companies"))
pre_production_countries = df1_movies.select("id", explode("production_countries").alias("production_countries"))
pre_spoken_languages = df1_movies.select("id", explode("spoken_languages").alias("spoken_languages"))

#### **movies table**

In [61]:
pre1_movies = pre_movies.select(
    col("id").alias("movie_id"),
    col("title"),
    col("budget"),
    col("homepage"),
    col("original_language"),
    col("original_title"),
    col("overview"),
    col("popularity"),
    col("release_date"),
    col("revenue"),
    col("runtime"),
    col("status"),
    col("tagline"),
    col("vote_average"),
    col("vote_count"))

In [62]:
pre1_movies.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- budget: long (nullable = true)
 |-- homepage: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: long (nullable = true)
 |-- runtime: double (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: long (nullable = true)



In [63]:
from pyspark.sql.types import *

movies = pre1_movies.withColumn("movie_id", F.col("movie_id").cast(StringType())) \
.withColumn("budget", col("budget").cast(DoubleType())) \
.withColumn("popularity", col("popularity").cast(FloatType())) \
.withColumn("release_date", F.to_date(F.col("release_date"),"yyyy-MM-dd")) \
.withColumn("revenue", col("revenue").cast(DoubleType())) \
.withColumn("runtime", col("runtime").cast(IntegerType())) \
.withColumn("vote_average", col("vote_average").cast(FloatType())) \
.withColumn("vote_count", col("vote_count").cast(IntegerType()))

In [64]:
movies.printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- budget: double (nullable = true)
 |-- homepage: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: float (nullable = true)
 |-- release_date: date (nullable = true)
 |-- revenue: double (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- vote_average: float (nullable = true)
 |-- vote_count: integer (nullable = true)



#### **genres table**

In [65]:
pre1_genres = pre_genres.select(
    col("id").alias("movie_id"),
    col("genres.id").alias("id"),
    col("genres.name").alias("name"))

In [66]:
pre1_genres.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



In [67]:
genres = pre1_genres.withColumn("movie_id", F.col("movie_id").cast(StringType())) \
.withColumn("id", col("id").cast(IntegerType()))

In [68]:
genres.printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)



#### **keywords table**

In [69]:
pre1_keywords = pre_keywords.select(
    col("id").alias("movie_id"),
    col("keywords.id").alias("id"),
    col("keywords.name").alias("name"))

In [70]:
pre1_keywords.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



In [71]:
keywords = pre1_keywords.withColumn("movie_id", F.col("movie_id").cast(StringType())) \
.withColumn("id", col("id").cast(IntegerType()))

In [72]:
keywords.printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)



#### **production_companies table**

In [73]:
pre1_production_companies = pre_production_companies.select(
    col("id").alias("movie_id"),
    col("production_companies.id").alias("id"),
    col("production_companies.name").alias("name"))

In [74]:
pre1_production_companies.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



In [75]:
production_companies = pre1_production_companies.withColumn("movie_id", F.col("movie_id").cast(StringType())) \
.withColumn("id", col("id").cast(IntegerType()))

In [76]:
production_companies.printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)



#### **production_countries table**

In [77]:
pre1_production_countries = pre_production_countries.select(
    col("id").alias("movie_id"),
    col("production_countries.iso_3166_1").alias("iso_3166_1"),
    col("production_countries.name").alias("name"))

In [78]:
pre1_production_countries.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- iso_3166_1: string (nullable = true)
 |-- name: string (nullable = true)



In [79]:
production_countries = pre1_production_countries.withColumn("movie_id", F.col("movie_id").cast(StringType()))

In [80]:
production_countries.printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- iso_3166_1: string (nullable = true)
 |-- name: string (nullable = true)



#### **spoken_languages table**

In [81]:
pre1_spoken_languages = pre_spoken_languages.select(
    col("id").alias("movie_id"),
    col("spoken_languages.iso_639_1").alias("iso_639_1"),
    col("spoken_languages.name").alias("name"))

In [82]:
pre1_spoken_languages.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- iso_639_1: string (nullable = true)
 |-- name: string (nullable = true)



In [83]:
spoken_languages = pre1_spoken_languages.withColumn("movie_id", F.col("movie_id").cast(StringType()))

In [84]:
spoken_languages.printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- iso_639_1: string (nullable = true)
 |-- name: string (nullable = true)



## **write to minio in the form deltatable**

In [186]:
table_dict = {'cast': cast, 'crew': crew, 'movies': movies, 'genres': genres, 'keywords': keywords, 'production_companies': production_companies, 'production_countries': production_countries, 'spoken_languages': spoken_languages}

for table_name, table in table_dict.items():
    deltaPath = f"s3a://tmdb-silver/{table_name}"
    
    table.write \
    .mode("overwrite") \
    .format("delta") \
    .save(deltaPath)

                                                                                