In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder \
    .appName("IcebergIntegration") \
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.2") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .config("spark.sql.catalog.hive_prod", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.hive_prod.type", "hive") \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("spark.sql.hive.thriftServer.singleSession", "false") \
    .config("spark.hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.hive.metastore.schema.verification", "false") \
    .enableHiveSupport() \
    .getOrCreate()

In [None]:
raw_df = spark.read.format("csv").option("header","true").load("./movies.csv")

In [None]:
raw_df.printSchema()

In [None]:
raw_df.show()

In [None]:
raw_df = raw_df.withColumn("Year", F.col("Year").cast("int"))
raw_df = raw_df.withColumn("Rotten Tomatoes %", F.col("Rotten Tomatoes %").cast("int"))
raw_df = raw_df.withColumn("Profitability", F.col("Profitability").cast("int"))
raw_df = raw_df.withColumn("Audience score %", F.col("Audience score %").cast("int"))

In [None]:
raw_df = raw_df.withColumnRenamed("Film","film")
raw_df = raw_df.withColumnRenamed("Genre","genre")
raw_df = raw_df.withColumnRenamed("Lead Studio","lead_studio")
raw_df = raw_df.withColumnRenamed("Audience score %","audience_score")
raw_df = raw_df.withColumnRenamed("Rotten Tomatoes %","rotten_tomatoes")
raw_df = raw_df.withColumnRenamed("Worldwide Gross","worldwide_gross")
raw_df = raw_df.withColumnRenamed("Year","year")
raw_df = raw_df.withColumnRenamed("Profitability","profitability")

In [None]:
raw_df.printSchema()

In [None]:
spark.sql("CREATE SCHEMA iceberg_db LOCATION 'hdfs://namenode:8020/user/hive/warehouse/iceberg';")

In [None]:
spark.sql("SHOW SCHEMAS").show()

In [None]:
spark.sql("""
CREATE TABLE iceberg_db.movie(
        film string,
        genre string,
        lead_studio string,
        audience_score int,
        profitability int,
        rotten_tomatoes int,
        worldwide_gross string,
        year int
        )
USING iceberg 
PARTITIONED BY (year)
LOCATION 'hdfs://namenode:8020/user/hive/warehouse/iceberg/movie';
""")

In [None]:
spark.sql("USE iceberg_db")
spark.sql("SHOW TABLES").show()

In [None]:
raw_df.writeTo("iceberg_db.movie").append()