In [1]:
import findspark
findspark.init("/opt/spark")

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import *

In [2]:
accessKeyId='dataops'
secretAccessKey='Ankara06'

In [3]:
# create a SparkSession
spark = SparkSession.builder \
.appName("tmdb-silver") \
.master("local[2]") \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0,io.delta:delta-core_2.12:2.4.0") \
.config("fs.s3a.access.key", accessKeyId) \
.config("fs.s3a.secret.key", secretAccessKey) \
.config("fs.s3a.path.style.access", True) \
.config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
.config("fs.s3a.endpoint", "http://minio:9000") \
.config("spark.sql.debug.maxToStringFields", 1000) \
.getOrCreate()



:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ca0cfc6d-0e94-4213-a3f3-4ef3d39ff77f;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.0 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.375 in central
	found io.delta#delta-core_2.12;2.4.0 in central
	found io.delta#delta-storage;2.4.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
downloading https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar ...
	[SUCCESSFUL ] org.apache.hadoop#hadoop-aws;3.2.0!hadoop-aws.jar (135ms)
downloading https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.4.0/delta-core_2.12-2.4.0.jar ...
	[SUCCESSFUL ] io.delta#delta-core_2.12;2.4.0!delta-core_2.12.jar (312ms)
downloading https://repo1.maven.org/maven2/com/amazonaws/aws-java-

In [4]:
cast = DeltaTable.forPath(spark, "s3a://tmdb-silver/cast").toDF()

crew = DeltaTable.forPath(spark, "s3a://tmdb-silver/crew").toDF()

movies = DeltaTable.forPath(spark, "s3a://tmdb-silver/movies").toDF()

genres = DeltaTable.forPath(spark, "s3a://tmdb-silver/genres").toDF()

keywords = DeltaTable.forPath(spark, "s3a://tmdb-silver/keywords").toDF()

production_companies = DeltaTable.forPath(spark, "s3a://tmdb-silver/production_companies").toDF()

production_countries = DeltaTable.forPath(spark, "s3a://tmdb-silver/production_countries").toDF()

spoken_languages = DeltaTable.forPath(spark, "s3a://tmdb-silver/spoken_languages").toDF()


25/04/26 05:28:00 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


In [5]:
# Which is the highest-grossing movie starring Tom Cruise?

cast_df = cast.select("movie_id", "character", "name")
movies_cast = movies.join(cast_df, "movie_id")
tom_cruise = movies_cast.filter(movies_cast["name"] == "Tom Cruise")
top_tom_cruise = tom_cruise.orderBy(desc("revenue")).select("title", "revenue", "character", "name").limit(1)
top_tom_cruise.toPandas()


                                                                                

Unnamed: 0,title,revenue,character,name
0,Mission: Impossible - Ghost Protocol,694713380.0,Ethan Hunt,Tom Cruise


In [6]:
# What is the relationship between movie genres and box office revenue?

genre_rev = genres.join(movies.select("movie_id", "revenue"), "movie_id")
avg_rev_by_genre = genre_rev.groupBy("name").avg("revenue").orderBy(desc("avg(revenue)"))

avg_rev_by_genre.toPandas()


                                                                                

Unnamed: 0,name,avg(revenue)
0,Animation,225693000.0
1,Adventure,208660200.0
2,Fantasy,193354200.0
3,Family,162345500.0
4,Science Fiction,152456500.0
5,Action,141213100.0
6,War,84155870.0
7,Thriller,81044290.0
8,Mystery,78300930.0
9,Comedy,71289500.0


In [7]:
# What is the relationship between release dates and revenue?
movies_yearly = movies.withColumn("year", year("release_date"))
rev_by_year = movies_yearly.groupBy("year").avg("revenue").orderBy("year")

rev_by_year.toPandas()

                                                                                

Unnamed: 0,year,avg(revenue)
0,,0.000000e+00
1,1916.0,8.394751e+06
2,1925.0,2.200000e+07
3,1927.0,6.504220e+05
4,1929.0,2.179000e+06
...,...,...
86,2013.0,1.013485e+08
87,2014.0,1.013466e+08
88,2015.0,1.054399e+08
89,2016.0,1.390496e+08


In [8]:
# Does a director always work with the same crew?

directors = crew.filter(crew["job"] == "Director").select("movie_id", "name").withColumnRenamed("name", "director")
crew_no_directors = crew.filter(crew["job"] != "Director").select("movie_id", "name").distinct()

dir_crew = directors.join(crew_no_directors, "movie_id")
dir_crew_count = dir_crew.groupBy("director").agg(F.countDistinct("name").alias("unique_crew_count"))

dir_crew_count.toPandas()

                                                                                

Unnamed: 0,director,unique_crew_count
0,Aleksey German,10
1,Rob Bowman,253
2,Will Gluck,102
3,John Milius,79
4,Jim Jarmusch,7
...,...,...
2449,Eric Lavaine,2
2450,Mamoru Hosoda,1
2451,Andrucha Waddington,3
2452,Daniel Hsia,1


In [9]:
# As a crew, which movie sets and roles Ahmet has been involved in throughout his career?

cast_ahmet = cast.filter((col("name")).like("%Ahmet%")) \
    .select("movie_id", "name", "character") \
    .withColumnRenamed("character", "role")

crew_ahmet = crew.filter((col("name")).like("%Ahmet%")) \
    .select("movie_id", "name", "job") \
    .withColumnRenamed("job", "role")

ahmet_combined = cast_ahmet.unionByName(crew_ahmet)

ahmet_roles = ahmet_combined.join(movies.select("movie_id", "title"), "movie_id") \
    .select("title", "name", "role")

ahmet_roles.toPandas()

Unnamed: 0,title,name,role
0,Once in a Lifetime: The Extraordinary Story of...,Ahmet Ertegun,Himself
1,Taken 2,Ahmet Orhan Ozcam,Taxi Driver Kim
2,Ready to Rumble,Ahmet Zappa,Cashier
3,Darkness Falls,Ahmet Ahmet,Title Designer


In [10]:
# Tom Cruise toplam kariyer hasılatı?
cast_df = cast.select("movie_id", "name")
movies_cast = movies.join(cast_df, "movie_id")
tom_cruise = movies_cast.filter(movies_cast["name"] == "Tom Cruise")
total_revenue = tom_cruise.agg(F.sum("revenue").alias("total_revenue"))
total_revenue.toPandas()

Unnamed: 0,total_revenue
0,8993388000.0


In [11]:
# En çok hasılat hangi yıl?
movies_with_year = movies.withColumn("year", F.year("release_date"))
revenue_by_year = movies_with_year.groupBy("year").agg(F.sum("revenue").alias("total_revenue"))
top_year = revenue_by_year.orderBy(F.desc("total_revenue")).limit(1)
top_year.toPandas()

Unnamed: 0,year,total_revenue
0,2012,24141710000.0


In [12]:
# Bir oyuncunun oynadıgı film sayısı?
actor_movie_counts = cast.groupBy("name").agg(F.countDistinct("movie_id").alias("movie_count"))
top_actors = actor_movie_counts.orderBy(F.desc("movie_count")).limit(10)
top_actors.toPandas()

                                                                                

Unnamed: 0,name,movie_count
0,Samuel L. Jackson,67
1,Robert De Niro,57
2,Bruce Willis,51
3,Matt Damon,48
4,Morgan Freeman,46
5,,43
6,Steve Buscemi,43
7,Liam Neeson,41
8,Owen Wilson,40
9,Johnny Depp,40


In [13]:
# Which movie features the highest number of spoken languages?

lang_count = spoken_languages.groupBy("movie_id") \
    .agg(countDistinct("iso_639_1").alias("language_count"))

movies_lang = lang_count.join(movies.select("movie_id", "title"), "movie_id")

most_languages = movies_lang.orderBy(col("language_count").desc()).limit(1)

most_languages.select("title", "language_count").toPandas()

                                                                                

Unnamed: 0,title,language_count
0,2012,9


In [14]:
top_movie_id = most_languages.select("movie_id").first()["movie_id"]

spoken_languages.filter(col("movie_id") == top_movie_id) \
    .select("iso_639_1", "name") \
    .dropDuplicates() \
    .toPandas()

Unnamed: 0,iso_639_1,name
0,la,Latin
1,en,English
2,pt,Português
3,bo,
4,it,Italiano
5,fr,Français
6,ru,Pусский
7,zh,普通话
8,hi,हिन्दी
