In [1]:
import os

import pyspark
from pyspark import SparkFiles
from pyspark.sql import SparkSession, functions as F

In [2]:
AWS_S3_ENDPOINT = os.environ.get('AWS_S3_ENDPOINT')
AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.environ.get('AWS_SECRET_ACCESS_KEY')

In [3]:
conf = (
    pyspark.SparkConf()
        .set('spark.jars.packages','org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.apache.iceberg:iceberg-aws-bundle:1.5.2,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262')
        .set('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
        .set('spark.hadoop.fs.s3a.path.style.access', 'true')
        .set('spark.hadoop.fs.s3a.connection.ssl.enabled', 'false')
        .set('spark.hadoop.fs.s3a.endpoint', AWS_S3_ENDPOINT)
        .set('spark.hadoop.fs.s3a.access.key', AWS_ACCESS_KEY_ID)
        .set('spark.hadoop.fs.s3a.secret.key', AWS_SECRET_ACCESS_KEY)        
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') # Use Iceberg with Spark
        .set('spark.sql.catalog.silver', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.silver.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
        .set('spark.sql.catalog.silver.warehouse', 's3a://local-data-lake-silver/')
        .set('spark.sql.catalog.silver.s3.endpoint', AWS_S3_ENDPOINT)
        .set('spark.sql.defaultCatalog', 'silver') # Name of the Iceberg catalog
        .set('spark.sql.catalogImplementation', 'in-memory')
        .set('spark.sql.catalog.silver.type', 'hadoop') # Iceberg catalog type
        .set('spark.executor.heartbeatInterval', '300000')
        .set('spark.network.timeout', '400000')
)

In [4]:
spark = SparkSession.builder \
    .master('local[5]') \
    .appName('local-data-lake-iceberg') \
    .config(conf=conf) \
    .getOrCreate()

sc = spark.sparkContext
print(f'The Pyspark version {spark.version} is running...')

:: loading settings :: url = jar:file:/Users/dennislafferty/.pyenv/versions/3.10.13/envs/data-platform/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/dennislafferty/.ivy2/cache
The jars for the packages stored in: /Users/dennislafferty/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.apache.iceberg#iceberg-aws-bundle added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-44d15b67-27c9-43c2-bcb1-f46fdae48616;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.2 in central
	found org.apache.iceberg#iceberg-aws-bundle;1.5.2 in central
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 187ms :: artifacts dl 7ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.4 from c

The Pyspark version 3.5.1 is running...


In [10]:
#Reads data from bronze bucket based on dt
dt = '2023-02'
df = spark.read.parquet(f's3a://local-data-lake-bronze/{dt}/yellow_taxi.parquet')

In [14]:
#creates a new df called df_dt with a new column with a constant of dt
df_dt = df.withColumn('dt', F.lit(dt))
df_dt.count()

2913955

In [15]:
#writes to the new catalog table and partitions by dt
# df_dt.writeTo('silver.nyc.taxi') \
#     .partitionedBy('dt') \
#     .createOrReplace() 

#appends data
df_dt.writeTo('silver.nyc.taxi') \
    .partitionedBy('dt') \
    .append()

                                                                                

In [13]:
spark.sql('SELECT COUNT(1) as row_count FROM silver.nyc.taxi').show()

+---------+
|row_count|
+---------+
|  3066766|
+---------+



In [16]:
spark.sql('SELECT COUNT(1) as row_count FROM silver.nyc.taxi').show()

+---------+
|row_count|
+---------+
|  5980721|
+---------+



In [17]:
spark.sql('''
        SELECT COUNT(1) as row_count 
        FROM silver.nyc.taxi
        WHERE dt = '2023-01'
    ''').show()

+---------+
|row_count|
+---------+
|  3066766|
+---------+



In [20]:
spark.sql('''
        SELECT * FROM silver.nyc.taxi.partitions;
    ''').show()

+---------+-------+------------+----------+-----------------------------+----------------------------+--------------------------+----------------------------+--------------------------+--------------------+------------------------+
|partition|spec_id|record_count|file_count|total_data_file_size_in_bytes|position_delete_record_count|position_delete_file_count|equality_delete_record_count|equality_delete_file_count|     last_updated_at|last_updated_snapshot_id|
+---------+-------+------------+----------+-----------------------------+----------------------------+--------------------------+----------------------------+--------------------------+--------------------+------------------------+
|{2023-02}|      0|     2913955|         1|                     45522430|                           0|                         0|                           0|                         0|2024-06-12 21:26:...|     9112964658152648116|
|{2023-01}|      0|     3066766|         1|                     47732041

In [22]:
spark.sql('''
        SELECT * FROM silver.nyc.taxi.files;
    ''').show()

+-------+--------------------+-----------+-------+---------+------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+-------------+------------+-------------+--------------------+
|content|           file_path|file_format|spec_id|partition|record_count|file_size_in_bytes|        column_sizes|        value_counts|   null_value_counts|    nan_value_counts|        lower_bounds|        upper_bounds|key_metadata|split_offsets|equality_ids|sort_order_id|    readable_metrics|
+-------+--------------------+-----------+-------+---------+------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+-------------+------------+-------------+--------------------+
|      0|s3a://local-data-...|    PARQUET|      0|{2023-02}|     2913955|          45522430|{1 -> 369127, 2 -...|{1 ->

In [23]:
spark.sql('''
        SELECT content
         , file_path
         , file_format
         , spec_id
         , partition
         , record_count
         , file_size_in_bytes
        FROM silver.nyc.taxi.files;
    ''').show()

+-------+--------------------+-----------+-------+---------+------------+------------------+
|content|           file_path|file_format|spec_id|partition|record_count|file_size_in_bytes|
+-------+--------------------+-----------+-------+---------+------------+------------------+
|      0|s3a://local-data-...|    PARQUET|      0|{2023-02}|     2913955|          45522430|
|      0|s3a://local-data-...|    PARQUET|      0|{2023-01}|     3066766|          47732041|
+-------+--------------------+-----------+-------+---------+------------+------------------+

