In [22]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Iceberg Catalog Setup") \
    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.iceberg.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") \
    .config("spark.sql.catalog.iceberg.uri", "http://iceberg-rest:8181") \
    .config("spark.sql.catalog.iceberg.warehouse", "warehouse") \
    .config("spark.sql.catalog.iceberg.s3.access-key", "admin") \
    .config("spark.sql.catalog.iceberg.s3.secret-key", "password") \
    .config("spark.sql.catalog.iceberg.s3.endpoint", "http://minio:9000") \
    .config("spark.sql.catalog.iceberg.s3.path-style-access", "true") \
    .config("spark.sql.catalog.iceberg.client.factory", "com.starrocks.connector.iceberg.IcebergAwsClientFactory") \
    .getOrCreate()

print("Spark Running")
print(spark.sparkContext.getConf().getAll())
print("current catalog:", spark.catalog.currentCatalog())
print("Spark UI:", spark.sparkContext.uiWebUrl)

24/07/16 20:02:32 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Spark Running
[('spark.eventLog.enabled', 'true'), ('spark.driver.cores', '4'), ('spark.driver.host', '30c3b29e5e01'), ('spark.task.cpus', '4'), ('spark.executor.cores', '4'), ('spark.history.fs.logDirectory', '/home/iceberg/spark-events'), ('spark.driver.port', '41981'), ('spark.sql.catalog.demo.s3.endpoint', 'http://minio:9000'), ('spark.eventLog.dir', '/home/iceberg/spark-events'), ('spark.serializer.objectStreamReset', '100'), ('spark.master', 'local[*]'), ('spark.executor.memory', '8g'), ('spark.submit.deployMode', 'client'), ('spark.app.startTime', '1721158989297'), ('spark.app.id', 'local-1721158989882'), ('spark.driver.extraJavaOptions', '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED 

In [60]:
from pathlib import Path

emissions_data_path = "/home/iceberg/data/emissions_data"

file_path = f"{emissions_data_path}/co2_emissions_passenger_cars_2020.json"
file_name = Path(file_path).stem

df = spark.read.option("multiline","true").json(file_path)
df.createOrReplaceTempView(f"{file_name}_tempTable")
spark.sql(f"CREATE TABLE IF NOT EXISTS raw.co2_passenger_cars_emissions.{file_name} as select * from {file_name}_tempTable")
spark.catalog.dropTempView(f"{file_name}_tempTable")

                                                                                

True

In [61]:
co2_emissions_2020_df = spark.read.table("raw.co2_passenger_cars_emissions.co2_emissions_passenger_cars_2020")

co2_emissions_2020_df.printSchema()


root
 |-- At1 (mm): long (nullable = true)
 |-- At2 (mm): long (nullable = true)
 |-- Cn: string (nullable = true)
 |-- Cr: string (nullable = true)
 |-- Ct: string (nullable = true)
 |-- De: double (nullable = true)
 |-- E (g/km): string (nullable = true)
 |-- Enedc (g/km): long (nullable = true)
 |-- Enedc (g/km) V2: double (nullable = true)
 |-- Er (g/km): string (nullable = true)
 |-- Ernedc (g/km): double (nullable = true)
 |-- Erwltp (g/km): double (nullable = true)
 |-- Ewltp (g/km): long (nullable = true)
 |-- Fm: string (nullable = true)
 |-- Ft: string (nullable = true)
 |-- ID: long (nullable = true)
 |-- It: string (nullable = true)
 |-- MMS: string (nullable = true)
 |-- MS: string (nullable = true)
 |-- Man: string (nullable = true)
 |-- Mh: string (nullable = true)
 |-- Mk: string (nullable = true)
 |-- Mp: string (nullable = true)
 |-- Mt: long (nullable = true)
 |-- Status: string (nullable = true)
 |-- T: string (nullable = true)
 |-- TAN: string (nullable = true)
 |-

In [62]:
import re
import pyspark.sql.functions as F

# ------ Applying the data quality filters --------

# Replace spaces in column names with underscores and remove ()
co2_emissions_columns = co2_emissions_2020_df.columns
print(f"Original Column names:: {co2_emissions_2020_df.columns}")

co2_emissions_2020_df = (co2_emissions_2020_df.select(
                      [F.col(col).alias(re.sub('[()]', '', col.replace(' ', '_'))) for col in co2_emissions_2020_df.columns]
                    ))


print(f"Updated Column names:: {co2_emissions_2020_df.columns}")

# Drop null records
print(f"Number of records of CO2 emissions dataframe before dropping nulls: {co2_emissions_2020_df.count()}")
co2_emissions_2020_df = co2_emissions_2020_df.na.drop('all')
print(f"Number of records of CO2 emissions dataframe after dropping nulls: {co2_emissions_2020_df.count()}")

# Drop duplicates
print(f"Number of records of CO2 emissions dataframe before dropping duplicates: {co2_emissions_2020_df.count()}")
co2_emissions_2020_df = co2_emissions_2020_df.distinct()
print(f"Number of records of CO2 emissions dataframe after dropping duplicates: {co2_emissions_2020_df.count()}")

# Filter records with corrupt Member State code - We keep values with two uppercase letters
print(f"Number of records of CO2 emissions dataframe before MS filter: {co2_emissions_2020_df.count()}")
co2_emissions_2020_df = co2_emissions_2020_df.filter(co2_emissions_2020_df['MS'].rlike('^[A-Z][A-Z]$'))
print(f"Number of records of CO2 emissions dataframe after MS filter: {co2_emissions_2020_df.count()}")

Original Column names:: ['At1 (mm)', 'At2 (mm)', 'Cn', 'Cr', 'Ct', 'De', 'E (g/km)', 'Enedc (g/km)', 'Enedc (g/km) V2', 'Er (g/km)', 'Ernedc (g/km)', 'Erwltp (g/km)', 'Ewltp (g/km)', 'Fm', 'Ft', 'ID', 'It', 'MMS', 'MS', 'Man', 'Mh', 'Mk', 'Mp', 'Mt', 'Status', 'T', 'TAN', 'VFN', 'Va', 'Ve', 'Vf', 'W (mm)', 'Zr', 'ec (cm3)', 'ep (KW)', 'm (kg)', 'r', 'version_file', 'year', 'z (Wh/km)']
Updated Column names:: ['At1_mm', 'At2_mm', 'Cn', 'Cr', 'Ct', 'De', 'E_g/km', 'Enedc_g/km', 'Enedc_g/km_V2', 'Er_g/km', 'Ernedc_g/km', 'Erwltp_g/km', 'Ewltp_g/km', 'Fm', 'Ft', 'ID', 'It', 'MMS', 'MS', 'Man', 'Mh', 'Mk', 'Mp', 'Mt', 'Status', 'T', 'TAN', 'VFN', 'Va', 'Ve', 'Vf', 'W_mm', 'Zr', 'ec_cm3', 'ep_KW', 'm_kg', 'r', 'version_file', 'year', 'z_Wh/km']
Number of records of CO2 emissions dataframe before dropping nulls: 100000
Number of records of CO2 emissions dataframe after dropping nulls: 100000
Number of records of CO2 emissions dataframe before dropping duplicates: 100000
Number of records of C

In [63]:
# Notice the new column in the output (Enedc_g/km_V2)
display(co2_emissions_2020_df.show())

+------+------+--------------------+---+---+------+------+----------+-----------------+-------+-----------+-----------+----------+---+------+--------+---+--------------------+---+--------------------+----------------+--------------------+-------------+----+------+------------+-------------------+--------------------+--------------+--------------------+----+----+----+------+-----+----+---+------------+----+-------+
|At1_mm|At2_mm|                  Cn| Cr| Ct|    De|E_g/km|Enedc_g/km|    Enedc_g/km_V2|Er_g/km|Ernedc_g/km|Erwltp_g/km|Ewltp_g/km| Fm|    Ft|      ID| It|                 MMS| MS|                 Man|              Mh|                  Mk|           Mp|  Mt|Status|           T|                TAN|                 VFN|            Va|                  Ve|  Vf|W_mm|  Zr|ec_cm3|ep_KW|m_kg|  r|version_file|year|z_Wh/km|
+------+------+--------------------+---+---+------+------+----------+-----------------+-------+-----------+-----------+----------+---+------+--------+---+----------

None

In [64]:
from pyspark.sql.types import LongType
# We use repartition() to get one file per partition value
# We're dropping the column z_Wh/km because it only contains null values for this year
# Do the same to other columns that may cause issues, except the column Enedc_g/km_V2
co2_emissions_2020_df = co2_emissions_2020_df.repartition('year')
co2_emissions_2020_df = co2_emissions_2020_df.withColumn('z_Wh/km', F.col('z_Wh/km').cast(LongType()))
(
  co2_emissions_2020_df
  .write
  .mode('append')
  .partitionBy('year')
  .format('iceberg')
  .saveAsTable('curated.co2_passenger_cars_emissions')
)

AnalysisException: [INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA] Cannot write incompatible data for the table `demo`.`curated`.`co2_passenger_cars_emissions`: Cannot find data for the output column `Enedc_g/km_deprecated`.

In [66]:
# Updating the columns to match business requirements
co2_emissions_2020_df = (co2_emissions_2020_df
                      .withColumnRenamed('Enedc_g/km', 'Enedc_g/km_deprecated')
                      .withColumnRenamed('Enedc_g/km_V2', 'Enedc_g/km')
                      )

spark.sql("""
  ALTER TABLE curated.co2_passenger_cars_emissions
  ADD COLUMNS (`Enedc_g/km_deprecated` DOUBLE)
""")

# Trying the ingest again
df_co2_emissions = co2_emissions_2020_df.repartition('year')
(
  df_co2_emissions
  .write
  .mode('append')
  .partitionBy('year')
  .format('iceberg')
  .option('mergeSchema', 'true')
  .saveAsTable('curated.co2_passenger_cars_emissions')
)

In [87]:
# Query the history of the table
history_df = spark.read.format("iceberg") \
    .load("curated.co2_passenger_cars_emissions.history")

# Show the results to display the history
history_df.show(truncate=False)

+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2024-07-16 19:12:17.524|873909206733898876 |NULL               |true               |
|2024-07-16 20:37:13.642|6502908078235518678|873909206733898876 |true               |
|2024-07-16 20:38:48.019|5011446837778532254|6502908078235518678|true               |
+-----------------------+-------------------+-------------------+-------------------+

