In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

CATALOG_URI = "http://nessie:19120/api/v1"
WAREHOUSE = "s3://warehouse/"
STORAGE_URI = "http://172.18.0.5:9000"

conf = (
    pyspark.SparkConf()
        .setAppName('sales_data_app')
        .set('spark.jars.packages', 'org.postgresql:postgresql:42.7.3,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.77.1,software.amazon.awssdk:bundle:2.24.8,software.amazon.awssdk:url-connection-client:2.24.8')
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions')
        .set('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.nessie.uri', CATALOG_URI)
        .set('spark.sql.catalog.nessie.ref', 'main')
        .set('spark.sql.catalog.nessie.authentication.type', 'NONE')
        .set('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
        .set('spark.sql.catalog.nessie.s3.endpoint', STORAGE_URI)
        .set('spark.sql.catalog.nessie.warehouse', WAREHOUSE)
        .set('spark.sql.catalog.nessie.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
)

spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Session Started")

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.postgresql#postgresql added as a dependency
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12 added as a dependency
software.amazon.awssdk#bundle added as a dependency
software.amazon.awssdk#url-connection-client added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-dcf859ba-767d-4d02-ad44-26decc70e92d;1.0
	confs: [default]
	found org.postgresql#postgresql;42.7.3 in central
	found org.checkerframework#checker-qual;3.42.0 in central
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.0 in central
	found org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12;0.77.1 in central
	found software.amazon.awssdk#bundle;2.24.8 in central
	found software.amazon.awssdk#url-connection-client;2.24.8 in central
	found software.amazon.awssdk#utils;2.24

Spark Session Started


In [3]:
spark.sql("""
    ALTER TABLE nessie.sales.sales_data_raw 
    SET TBLPROPERTIES ('gc.enabled'='true')
""")

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
25/07/29 14:13:41 WARN NessieUtil: The Iceberg property 'gc.enabled' and/or 'write.metadata.delete-after-commit.enabled' is enabled on table 'sales.sales_data_raw' in NessieCatalog. This will likely make data in other Nessie branches and tags and in earlier, historical Nessie commits inaccessible. The recommended setting for those properties is 'false'. Use the 'nessie-gc' tool for Nessie reference-aware garbage collection.


DataFrame[]

In [4]:
spark.sql("""
    CALL nessie.system.rewrite_manifests(
        table => 'nessie.sales.sales_data_raw'
    )
""").show()

+-------------------------+---------------------+
|rewritten_manifests_count|added_manifests_count|
+-------------------------+---------------------+
|                        0|                    0|
+-------------------------+---------------------+



In [7]:
spark.sql("""
    CALL nessie.system.rewrite_data_files(
        table => 'nessie.sales.sales_data_raw',
        options => map(
            'target-file-size-bytes', '5368709120',   -- 5GB par fichier
            'rewrite-all', 'true',
            'min-input-files', '1'
        )
    )
""").show()

25/07/29 14:43:04 WARN NessieUtil: The Iceberg property 'gc.enabled' and/or 'write.metadata.delete-after-commit.enabled' is enabled on table 'sales.sales_data_raw' in NessieCatalog. This will likely make data in other Nessie branches and tags and in earlier, historical Nessie commits inaccessible. The recommended setting for those properties is 'false'. Use the 'nessie-gc' tool for Nessie reference-aware garbage collection.


+--------------------------+----------------------+---------------------+-----------------------+
|rewritten_data_files_count|added_data_files_count|rewritten_bytes_count|failed_data_files_count|
+--------------------------+----------------------+---------------------+-----------------------+
|                         6|                     1|                10183|                      0|
+--------------------------+----------------------+---------------------+-----------------------+



In [9]:
df = spark.table("nessie.sales.sales_data_raw")
df.coalesce(1).writeTo("nessie.sales.sales_data_raw_compacted").using("iceberg").create()
#df.coalesce(1).writeTo("nessie.sales.sales_data_raw").using("iceberg").replace()

                                                                                

In [None]:
spark.sql("""
    CALL nessie.system.rewrite_data_files(
        table => 'nessie.customers',
        where => 'colonne_partition' = "premium"'
    )
""").show()

In [14]:
spark.sql("""
    SELECT 
        partition,
        COUNT(*) as file_count,
        ROUND(AVG(file_size_in_bytes)/1024/1024, 2) as avg_size_mb,
        ROUND(MIN(file_size_in_bytes)/1024/1024, 2) as min_size_mb,
        ROUND(MAX(file_size_in_bytes)/1024/1024, 2) as max_size_mb,
        ROUND(SUM(file_size_in_bytes)/1024/1024, 2) as total_size_mb
    FROM nessie.customers.data_files
    GROUP BY partition
    ORDER BY file_count DESC
""").show()

+---------+----------+-----------+-----------+-----------+-------------+
|partition|file_count|avg_size_mb|min_size_mb|max_size_mb|total_size_mb|
+---------+----------+-----------+-----------+-----------+-------------+
|   {W, B}|         1|        0.0|        0.0|        0.0|          0.0|
|   {S, J}|         1|        0.0|        0.0|        0.0|          0.0|
|   {B, C}|         1|        0.0|        0.0|        0.0|          0.0|
|   {B, F}|         1|        0.0|        0.0|        0.0|          0.0|
|   {A, E}|         1|        0.0|        0.0|        0.0|          0.0|
|   {Y, I}|         1|        0.0|        0.0|        0.0|          0.0|
|   {C, G}|         1|        0.0|        0.0|        0.0|          0.0|
|   {R, L}|         1|        0.0|        0.0|        0.0|          0.0|
|   {M, K}|         1|        0.0|        0.0|        0.0|          0.0|
|   {D, J}|         1|        0.0|        0.0|        0.0|          0.0|
|   {J, A}|         1|        0.0|        0.0|     

In [16]:
spark.sql("""
    SELECT 
        partition,
        COUNT(*) as small_files_count
    FROM nessie.customers.data_files
    WHERE file_size_in_bytes < 64*1024*1024  -- Fichiers < 64MB
    GROUP BY partition
    ORDER BY small_files_count DESC
""").show()



+---------+-----------------+
|partition|small_files_count|
+---------+-----------------+
|   {W, B}|                1|
|   {S, J}|                1|
|   {B, C}|                1|
|   {B, F}|                1|
|   {A, E}|                1|
|   {Y, I}|                1|
|   {C, G}|                1|
|   {R, L}|                1|
|   {M, K}|                1|
|   {D, J}|                1|
|   {J, A}|                1|
|   {G, H}|                1|
+---------+-----------------+



                                                                                

In [None]:
# Table Copy-on-Write (par défaut)
spark.sql("""
    CREATE TABLE nessie.cow_table (id INT, name STRING) 
    USING ICEBERG
    TBLPROPERTIES ('write.delete.mode'='copy-on-write')
""")

# Table Merge-on-Read pour les updates fréquents
spark.sql("""
    CREATE TABLE nessie.mor_table (id INT, name STRING) 
    USING ICEBERG
    TBLPROPERTIES ('write.delete.mode'='merge-on-read')
""")

In [None]:
spark.stop()