Realizando importações necessárias para a implementação do Iceberg

In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrame

import os
import pyspark.sql.functions as F

Configurando o spark para o Iceberg

In [7]:
conf = SparkConf() \
    .setAppName("Apache Iceberg") \
    .setAll([
        # Add Iceberg SQL extensions like UPDATE or DELETE in Spark
        ("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"),

        # Register `my_iceberg_catalog`
        ("spark.sql.catalog.my_iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog"),

        # Configure SQL connection to track tables inside `my_iceberg_catalog`
        ("spark.sql.catalog.my_iceberg_catalog.catalog-impl", "org.apache.iceberg.jdbc.JdbcCatalog"),
        ("spark.sql.catalog.my_iceberg_catalog.uri", "jdbc:postgresql://postgres:5432/iceberg_db"),
        ("spark.sql.catalog.my_iceberg_catalog.jdbc.user", "postgres"),
        ("spark.sql.catalog.my_iceberg_catalog.jdbc.password", "postgres"),

        # Configure Warehouse on MinIO
        ("spark.sql.catalog.my_iceberg_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO"),
        ("spark.sql.catalog.my_iceberg_catalog.s3.endpoint", "http://minio:9000"),
        ("spark.sql.catalog.my_iceberg_catalog.s3.path-style-access", "true"),
        ("spark.sql.catalog.my_iceberg_catalog.warehouse", "s3://warehouse"),
    ])

spark = SparkSession.builder.config(conf=conf).getOrCreate()

Criando tabela com base no parquet

In [None]:
spark.sql("""
  CREATE TABLE IF NOT EXISTS my_iceberg_catalog.db.climate_change (
    dt date,
    average_temperature numeric,
    average_temperature_uncertainty numeric,
    city varchar(255),
    country varchar(255),
    latitude varchar(255),
    longitude varchar(255)
  ) USING iceberg PARTITIONED BY (dt, country)
""")

Lendo o dataset

In [None]:
root = os.path.dirname(os.getcwd())
path = os.path.join(root, "datasource", "climate_change.parquet")

climates: DataFrame = spark.read \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .csv(path)

climates \
  .withColumn("date", F.to_date(F.col("date"))) \
  .writeTo("my_iceberg_catalog.db.climate_change") \
  .append()

Exibindo resultados

In [None]:
climates = spark.table("my_iceberg_catalog.db.climate_change")
climates.orderBy("date").show(3)