In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

spark = SparkSession.builder \
    .appName("Iceberg-MinIO") \
    .getOrCreate()



25/09/22 10:01:15 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:

spark.sql("CREATE NAMESPACE IF NOT EXISTS demo.imdb")
spark.sql("SHOW NAMESPACES IN demo").show()
spark.sql("SHOW TABLES IN demo.imdb").show()



+---------+
|namespace|
+---------+
|     imdb|
+---------+

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



In [3]:
spark.sql("""DROP TABLE IF EXISTS demo.imdb.title_basics""")
spark.sql("""DROP TABLE IF EXISTS demo.imdb.title_akas""")
spark.sql("""DROP TABLE IF EXISTS demo.imdb.title_crew""")
spark.sql("""DROP TABLE IF EXISTS demo.imdb.title_episode""")
spark.sql("""DROP TABLE IF EXISTS demo.imdb.title_ratings""")
spark.sql("""DROP TABLE IF EXISTS demo.imdb.title_principals""")
spark.sql("""DROP TABLE IF EXISTS demo.imdb.name_basics""")


DataFrame[]

In [4]:
spark.sql("""
CREATE TABLE IF NOT EXISTS demo.imdb.title_basics (
    title_id STRING,
    title_type STRING,
    primary_title STRING,
    original_title STRING,
    is_adult STRING,
    start_year STRING,
    end_year STRING,
    runtime_minutes STRING,
    genres STRING
) USING iceberg
""")

DataFrame[]

In [6]:
# Read a file from MinIO
title_df = spark.read \
    .option("header", True) \
    .option("sep", "\t") \
    .csv("s3a://data/imdb/year=2025/month=09/day=22/title.basics.tsv.gz")

title_rename_columns_df = title_df.withColumnsRenamed({"tconst": "title_id", "titleType": "title_type", "primaryTitle": "primary_title", "originalTitle": "original_title", "isAdult": "is_adult", "startYear": "start_year", "endYear": "end_year", "runtimeMinutes": "runtime_minutes"}) 

title_basics_set_null_values_df = title_rename_columns_df \
    .withColumn("end_year",when(col("end_year") == "\\N", None).otherwise(col("end_year")))


title_basics_set_null_values_df.schema
title_basics_set_null_values_df.show(5)

+---------+----------+--------------------+--------------------+--------+----------+--------+---------------+--------------------+
| title_id|title_type|       primary_title|      original_title|is_adult|start_year|end_year|runtime_minutes|              genres|
+---------+----------+--------------------+--------------------+--------+----------+--------+---------------+--------------------+
|tt0000001|     short|          Carmencita|          Carmencita|       0|      1894|    NULL|              1|   Documentary,Short|
|tt0000002|     short|Le clown et ses c...|Le clown et ses c...|       0|      1892|    NULL|              5|     Animation,Short|
|tt0000003|     short|        Poor Pierrot|      Pauvre Pierrot|       0|      1892|    NULL|              5|Animation,Comedy,...|
|tt0000004|     short|         Un bon bock|         Un bon bock|       0|      1892|    NULL|             12|     Animation,Short|
|tt0000005|     short|    Blacksmith Scene|    Blacksmith Scene|       0|      1893

In [8]:
title_basics_set_null_values_df.writeTo("demo.imdb.title_basics").createOrReplace()

                                                                                

In [10]:
spark.sql("SELECT * FROM demo.imdb.title_basics LIMIT 10").show()

+---------+----------+--------------------+--------------------+--------+----------+--------+---------------+--------------------+
| title_id|title_type|       primary_title|      original_title|is_adult|start_year|end_year|runtime_minutes|              genres|
+---------+----------+--------------------+--------------------+--------+----------+--------+---------------+--------------------+
|tt0000001|     short|          Carmencita|          Carmencita|       0|      1894|    NULL|              1|   Documentary,Short|
|tt0000002|     short|Le clown et ses c...|Le clown et ses c...|       0|      1892|    NULL|              5|     Animation,Short|
|tt0000003|     short|        Poor Pierrot|      Pauvre Pierrot|       0|      1892|    NULL|              5|Animation,Comedy,...|
|tt0000004|     short|         Un bon bock|         Un bon bock|       0|      1892|    NULL|             12|     Animation,Short|
|tt0000005|     short|    Blacksmith Scene|    Blacksmith Scene|       0|      1893

In [11]:
spark.sql("""
CREATE TABLE IF NOT EXISTS demo.imdb.title_akas (
    title_id STRING,
    ordering STRING,
    title STRING,
    region STRING,
    language STRING,
    types STRING,
    attributes STRING,
    is_original_title STRING
) USING iceberg
""")

DataFrame[]

In [12]:
title_akas_df = spark.read \
    .option("header", True) \
    .option("sep", "\t") \
    .csv("s3a://data/imdb/year=2025/month=09/day=22/title.akas.tsv.gz")

title_akas_rename_columns_df = title_akas_df.withColumnsRenamed({"titleId": "title_id", "isOriginalTitle": "is_original_title"}) 

title_akas_set_null_values_df = title_akas_rename_columns_df \
    .withColumn("region",when(col("region") == "\\N", None).otherwise(col("region"))) \
    .withColumn("language",when(col("language") == "\\N", None).otherwise(col("language"))) \
    .withColumn("types",when(col("types") == "\\N", None).otherwise(col("types"))) \
    .withColumn("attributes",when(col("attributes") == "\\N", None).otherwise(col("attributes"))) 

title_akas_set_null_values_df.schema
title_akas_set_null_values_df.show(5)

+---------+--------+--------------------+------+--------+-----------+-------------+-----------------+
| title_id|ordering|               title|region|language|      types|   attributes|is_original_title|
+---------+--------+--------------------+------+--------+-----------+-------------+-----------------+
|tt0000001|       1|          Carmencita|  NULL|    NULL|   original|         NULL|                1|
|tt0000001|       2|          Carmencita|    DE|    NULL|       NULL|literal title|                0|
|tt0000001|       3|          Carmencita|    US|    NULL|imdbDisplay|         NULL|                0|
|tt0000001|       4|Carmencita - span...|    HU|    NULL|imdbDisplay|         NULL|                0|
|tt0000001|       5|          Καρμενσίτα|    GR|    NULL|imdbDisplay|         NULL|                0|
+---------+--------+--------------------+------+--------+-----------+-------------+-----------------+
only showing top 5 rows



In [13]:
title_akas_set_null_values_df.writeTo("demo.imdb.title_akas").createOrReplace()

                                                                                

In [15]:
spark.sql("SELECT * FROM demo.imdb.title_akas LIMIT 10").show()

+---------+--------+--------------------+------+--------+-----------+-------------+-----------------+
| title_id|ordering|               title|region|language|      types|   attributes|is_original_title|
+---------+--------+--------------------+------+--------+-----------+-------------+-----------------+
|tt0000001|       1|          Carmencita|  NULL|    NULL|   original|         NULL|                1|
|tt0000001|       2|          Carmencita|    DE|    NULL|       NULL|literal title|                0|
|tt0000001|       3|          Carmencita|    US|    NULL|imdbDisplay|         NULL|                0|
|tt0000001|       4|Carmencita - span...|    HU|    NULL|imdbDisplay|         NULL|                0|
|tt0000001|       5|          Καρμενσίτα|    GR|    NULL|imdbDisplay|         NULL|                0|
|tt0000001|       6|          Карменсита|    RU|    NULL|imdbDisplay|         NULL|                0|
|tt0000001|       7|          Карменсіта|    UA|    NULL|imdbDisplay|         NULL

In [16]:
spark.sql("""
CREATE TABLE IF NOT EXISTS demo.imdb.title_crew (
    title_id STRING,
    director_id STRING,
    writers STRING
) USING iceberg
""")

DataFrame[]

In [17]:
title_crew_df = spark.read \
    .option("header", True) \
    .option("sep", "\t") \
    .csv("s3a://data/imdb/year=2025/month=09/day=22/title.crew.tsv.gz")

title_crew_rename_columns_df = title_crew_df.withColumnsRenamed({"tconst": "title_id", "directors": "director_id"})

title_crew_set_null_values_df = title_crew_rename_columns_df \
    .withColumn("writers",when(col("writers") == "\\N", None).otherwise(col("writers"))) 

title_crew_set_null_values_df.schema
title_crew_set_null_values_df.show(5)




+---------+-----------+---------+
| title_id|director_id|  writers|
+---------+-----------+---------+
|tt0000001|  nm0005690|     NULL|
|tt0000002|  nm0721526|     NULL|
|tt0000003|  nm0721526|nm0721526|
|tt0000004|  nm0721526|     NULL|
|tt0000005|  nm0005690|     NULL|
+---------+-----------+---------+
only showing top 5 rows



In [18]:
title_crew_set_null_values_df.writeTo("demo.imdb.title_crew").createOrReplace()

                                                                                

In [19]:
spark.sql("SELECT * FROM demo.imdb.title_crew LIMIT 10").show()

+---------+-------------------+---------+
| title_id|        director_id|  writers|
+---------+-------------------+---------+
|tt0000001|          nm0005690|     NULL|
|tt0000002|          nm0721526|     NULL|
|tt0000003|          nm0721526|nm0721526|
|tt0000004|          nm0721526|     NULL|
|tt0000005|          nm0005690|     NULL|
|tt0000006|          nm0005690|     NULL|
|tt0000007|nm0005690,nm0374658|     NULL|
|tt0000008|          nm0005690|     NULL|
|tt0000009|          nm0085156|nm0085156|
|tt0000010|          nm0525910|     NULL|
+---------+-------------------+---------+



In [20]:
spark.sql("""
CREATE TABLE IF NOT EXISTS demo.imdb.title_episode (
    title_id STRING,
    parent_title_id STRING,
    season_number STRING,
    episode_number STRING
) USING iceberg
""")

DataFrame[]

In [21]:
title_episode_df = spark.read \
    .option("header", True) \
    .option("sep", "\t") \
    .csv("s3a://data/imdb/year=2025/month=09/day=22/title.episode.tsv.gz")

title_episode_rename_columns_df = title_episode_df.withColumnsRenamed({"tconst": "title_id", "parentTconst": "parent_title_id", "seasonNumber": "season_number", "episodeNumber": "episode_number"})


title_episode_set_null_values_df = title_episode_rename_columns_df \
    .withColumn("season_number",when(col("season_number") == "\\N", None).otherwise(col("season_number"))) \
    .withColumn("episode_number",when(col("episode_number") == "\\N", None).otherwise(col("episode_number")))

# df.schema
title_episode_set_null_values_df.show(5)

+---------+---------------+-------------+--------------+
| title_id|parent_title_id|season_number|episode_number|
+---------+---------------+-------------+--------------+
|tt0031458|     tt32857063|         NULL|          NULL|
|tt0041951|      tt0041038|            1|             9|
|tt0042816|      tt0989125|            1|            17|
|tt0042889|      tt0989125|         NULL|          NULL|
|tt0043426|      tt0040051|            3|            42|
+---------+---------------+-------------+--------------+
only showing top 5 rows



In [22]:
title_episode_set_null_values_df.writeTo("demo.imdb.title_episode").createOrReplace()

                                                                                

In [23]:
spark.sql("SELECT * FROM demo.imdb.title_episode LIMIT 10").show()

+---------+---------------+-------------+--------------+
| title_id|parent_title_id|season_number|episode_number|
+---------+---------------+-------------+--------------+
|tt0031458|     tt32857063|         NULL|          NULL|
|tt0041951|      tt0041038|            1|             9|
|tt0042816|      tt0989125|            1|            17|
|tt0042889|      tt0989125|         NULL|          NULL|
|tt0043426|      tt0040051|            3|            42|
|tt0043631|      tt0989125|            2|            16|
|tt0043693|      tt0989125|            2|             8|
|tt0043710|      tt0989125|            3|             3|
|tt0044093|      tt0959862|            1|             6|
|tt0044668|      tt0044243|            2|            16|
+---------+---------------+-------------+--------------+



In [25]:
spark.sql("""
CREATE TABLE IF NOT EXISTS demo.imdb.title_principals (
    title_id STRING,
    ordering STRING,
    name_id STRING,
    category STRING,
    job STRING,
    characters STRING
) USING iceberg
""")

DataFrame[]

In [26]:
title_principals_df = spark.read \
    .option("header", True) \
    .option("sep", "\t") \
    .csv("s3a://data/imdb/year=2025/month=09/day=22/title.principals.tsv.gz")

title_principals_rename_columns_df = title_principals_df.withColumnsRenamed({"tconst": "title_id", "nconst": "name_id"})

title_principals_set_null_values_df = title_principals_rename_columns_df \
    .withColumn("job",when(col("job") == "\\N", None).otherwise(col("job"))) \
    .withColumn("characters",when(col("characters") == "\\N", None).otherwise(col("characters")))

# df.schema
title_principals_set_null_values_df.show(5)

+---------+--------+---------+---------------+--------------------+----------+
| title_id|ordering|  name_id|       category|                 job|characters|
+---------+--------+---------+---------------+--------------------+----------+
|tt0000001|       1|nm1588970|           self|                NULL|  ["Self"]|
|tt0000001|       2|nm0005690|       director|                NULL|      NULL|
|tt0000001|       3|nm0005690|       producer|            producer|      NULL|
|tt0000001|       4|nm0374658|cinematographer|director of photo...|      NULL|
|tt0000002|       1|nm0721526|       director|                NULL|      NULL|
+---------+--------+---------+---------------+--------------------+----------+
only showing top 5 rows



In [None]:
title_principals_set_null_values_df.writeTo("demo.imdb.title_principals").createOrReplace()

[Stage 21:>                                                         (0 + 1) / 1]

In [None]:
spark.sql("SELECT * FROM demo.imdb.title_principals LIMIT 10").show()

In [42]:
spark.sql("""
CREATE TABLE IF NOT EXISTS demo.imdb.title_ratings (
    title_id STRING,
    average_rating STRING,
    num_votes STRING
) USING iceberg
""")

DataFrame[]

In [16]:
ratings_df = spark.read \
    .option("header", True) \
    .option("sep", "\t") \
    .csv("s3a://data/imdb/year=2025/month=09/day=22/title.ratings.tsv.gz")

ratings_rename_columns_df = ratings_df.withColumnsRenamed({"tconst": "title_id", "averageRating": "average_rating", "numVotes": "num_votes"})

# df.schema
ratings_rename_columns_df.show(5)


+---------+--------------+---------+
| title_id|average_rating|num_votes|
+---------+--------------+---------+
|tt0000001|           5.7|     2181|
|tt0000002|           5.5|      301|
|tt0000003|           6.4|     2248|
|tt0000004|           5.2|      194|
|tt0000005|           6.2|     2990|
+---------+--------------+---------+
only showing top 5 rows



In [43]:
set_null_values_df.writeTo("demo.imdb.title_ratings").createOrReplace()

                                                                                

In [45]:
spark.sql("""
CREATE TABLE IF NOT EXISTS demo.imdb.name_basics (
    id STRING,
    primary_name STRING,
    birth_year STRING,
    death_year STRING,
    primary_profession STRING,
    titles STRING
) USING iceberg
""")

DataFrame[]

In [3]:
name_basics_df = spark.read \
    .option("header", True) \
    .option("sep", "\t") \
    .csv("s3a://data/imdb/year=2025/month=09/day=22/name.basics.tsv.gz")

# name_basics_df.schema
# name_basics_df.show(5)

from pyspark.sql.functions import col, when


name_rename_columns_df = name_basics_df .withColumnsRenamed({"nconst": "id", "primaryName": "primary_name", "birthYear": "birth_year", "deathYear": "death_year", "primaryProfession": "primary_profession", "knownForTitles": "titles"})
set_null_death_year_df = name_rename_columns_df.withColumn("death_year",when(col("death_year") == "\\N", None).otherwise(col("death_year")))

set_null_death_year_df.show(5)


+---------+---------------+----------+----------+--------------------+--------------------+
|       id|   primary_name|birth_year|death_year|  primary_profession|              titles|
+---------+---------------+----------+----------+--------------------+--------------------+
|nm0000001|   Fred Astaire|      1899|      1987|actor,miscellaneo...|tt0072308,tt00504...|
|nm0000002|  Lauren Bacall|      1924|      2014|actress,soundtrac...|tt0037382,tt00752...|
|nm0000003|Brigitte Bardot|      1934|      NULL|actress,music_dep...|tt0057345,tt00491...|
|nm0000004|   John Belushi|      1949|      1982|actor,writer,musi...|tt0072562,tt00779...|
|nm0000005| Ingmar Bergman|      1918|      2007|writer,director,a...|tt0050986,tt00694...|
+---------+---------------+----------+----------+--------------------+--------------------+
only showing top 5 rows



In [46]:
set_null_values_df.writeTo("demo.imdb.name_basics").createOrReplace()

                                                                                

In [59]:
spark.sql("SHOW TABLES IN demo.imdb").show()


+---------+----------------+-----------+
|namespace|       tableName|isTemporary|
+---------+----------------+-----------+
|     imdb|     name_basics|      false|
|     imdb|      title_akas|      false|
|     imdb|    title_basics|      false|
|     imdb|      title_crew|      false|
|     imdb|   title_episode|      false|
|     imdb|title_principals|      false|
|     imdb|   title_ratings|      false|
+---------+----------------+-----------+



In [51]:
spark.sql("SELECT * FROM demo.imdb.title_basics LIMIT 10").show()

+---------+----------+--------------------+--------------------+--------+----------+--------+---------------+--------------------+
| title_id|title_type|       primary_title|      original_title|is_adult|start_year|end_year|runtime_minutes|              genres|
+---------+----------+--------------------+--------------------+--------+----------+--------+---------------+--------------------+
|tt0000001|     short|          Carmencita|          Carmencita|       0|      1894|    NULL|              1|   Documentary,Short|
|tt0000002|     short|Le clown et ses c...|Le clown et ses c...|       0|      1892|    NULL|              5|     Animation,Short|
|tt0000003|     short|        Poor Pierrot|      Pauvre Pierrot|       0|      1892|    NULL|              5|Animation,Comedy,...|
|tt0000004|     short|         Un bon bock|         Un bon bock|       0|      1892|    NULL|             12|     Animation,Short|
|tt0000005|     short|    Blacksmith Scene|    Blacksmith Scene|       0|      1893