In [5]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType,StringType, FloatType
import pyspark
from pyspark.sql import SparkSession
import logging

In [26]:
spark = (
    SparkSession.builder.appName("Iceberg-Nessie")
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.lakehouse_prod_catalog", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.lakehouse_prod_catalog.type", "nessie")
    .config("spark.sql.catalog.lakehouse_prod_catalog.uri", "http://nessie:19120/api/v2")
    .config("spark.sql.catalog.lakehouse_prod_catalog.ref", "main")
    .config("spark.sql.catalog.lakehouse_prod_catalog.warehouse", "s3a://warehouse/")
    .config("spark.sql.catalog.lakehouse_prod_catalog.s3.endpoint", "http://minio:9000")
    .config("spark.sql.catalog.lakehouse_prod_catalog.s3.path-style-access", "true")
    .config("spark.sql.catalog.lakehouse_prod_catalog.s3.access-key-id", "admin")
    .config("spark.sql.catalog.lakehouse_prod_catalog.s3.secret-access-key", "password")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    .getOrCreate()
)

25/10/15 16:15:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [27]:
spark

In [28]:
!pip install boto3

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [29]:
import boto3
import pandas as pd
from io import StringIO

In [30]:
# S3 Credential
aws_access_key_id = "admin"
aws_secret_access_key = "password"

# Create S3 Client for MinIO
s3 = boto3.client(
    "s3",
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    endpoint_url="http://minio:9000",  # MinIO endpoint
    use_ssl=False
)

# Bucket Name
bucket_name = "landing-data"

# List objects in bucket
response = s3.list_objects_v2(Bucket=bucket_name)
objects_list = response.get("Contents", [])

# Loop through objects and read CSV into Pandas DataFrame
dfs = []  # list to store DataFrames
for obj in objects_list:
    obj_name = obj["Key"]
    response = s3.get_object(Bucket=bucket_name, Key=obj_name)
    object_content = response["Body"].read().decode("utf-8")

    # Convert CSV text to Pandas DataFrame
    df = pd.read_csv(StringIO(object_content))
    dfs.append(df)

# Combine all files into a single DataFrame (optional)
final_df = pd.concat(dfs, ignore_index=True)

final_df.head()

Unnamed: 0.1,Unnamed: 0,FirstName,LastName,StartDate,ExitDate,Title,Supervisor,ADEmail,BusinessUnit,EmployeeStatus,...,Satisfaction Score,Work-Life Balance Score,Training Date,Training Program Name,Training Type,Training Outcome,Location,Trainer,Training Duration(Days),Training Cost
0,0,Uriah,Bridges,20-Sep-19,,Production Technician I,Peter Oneill,uriah.bridges@bilearner.com,CCDR,Active,...,2,3,15-Jul-23,Leadership Development,Internal,Failed,South Marisa,Taylor Rodriguez,2,606.11
1,1,Paula,Small,11-Feb-23,,Production Technician I,Renee Mccormick,paula.small@bilearner.com,EW,Active,...,1,5,12-Sep-22,Customer Service,External,Incomplete,Tammieville,Kelly Patterson DDS,4,673.02
2,2,Edward,Buck,10-Dec-18,,Area Sales Manager,Crystal Walker,edward.buck@bilearner.com,PL,Active,...,2,1,13-Aug-22,Leadership Development,External,Failed,East Roberthaven,Taylor Thomas,2,413.28
3,3,Michael,Riordan,21-Jun-21,,Area Sales Manager,Rebekah Wright,michael.riordan@bilearner.com,CCDR,Active,...,5,4,15-Dec-22,Project Management,External,Completed,Garzatown,Holly Elliott,3,663.78
4,4,Jasmine,Onque,29-Jun-19,,Area Sales Manager,Jason Kim,jasmine.onque@bilearner.com,TNS,Active,...,5,3,13-Jul-23,Technical Skills,External,Failed,Lake Meganville,Donald Martinez,5,399.03


In [31]:
catalog = 'lakehouse_prod_catalog'
schema = 'raw_bronze_zone'
table_name = 'bronze_hris_employee'
mode = 'overwrite'

In [32]:
df_spark = spark.createDataFrame(final_df)
df_spark.show()

[Stage 0:>                                                          (0 + 1) / 1]

+----------+-----------+--------+---------+---------+--------------------+-----------------+--------------------+------------+--------------+------------+-------+--------------------------+---------------+----------------------+-----------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+-----------+-----------+----------------+------------------+-----------------------+-------------+---------------------+-------------+----------------+-----------------+--------------------+-----------------------+-------------+
|Unnamed: 0|  FirstName|LastName|StartDate| ExitDate|               Title|       Supervisor|             ADEmail|BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|TerminationDescription|   DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current

                                                                                

In [33]:
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS {catalog}.{schema};")

DataFrame[]

In [34]:
try:
    df_spark.write.mode(mode).format("parquet").saveAsTable(f"{catalog}.{schema}.{table_name}")
    print(f"Successfully load data into {catalog}.{schema}.{table_name}")
except Exception as e:
    print(f"Failed to load data into {catalog}.{schema}.{table_name}: {e}")
    raise

                                                                                

Successfully load data into lakehouse_prod_catalog.raw_bronze_zone.bronze_hris_employee


In [36]:
spark.sql(f"SELECT * FROM {catalog}.{schema}.{table_name};").show()

+----------+-----------+--------+---------+---------+--------------------+-----------------+--------------------+------------+--------------+------------+-------+--------------------------+---------------+----------------------+-----------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+-----------+-----------+----------------+------------------+-----------------------+-------------+---------------------+-------------+----------------+-----------------+--------------------+-----------------------+-------------+
|Unnamed: 0|  FirstName|LastName|StartDate| ExitDate|               Title|       Supervisor|             ADEmail|BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|TerminationDescription|   DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current

25/10/15 16:26:02 WARN NettyRpcEnv: Ignored failure: java.util.concurrent.TimeoutException: Cannot receive any reply from 95ac209e1b97:33799 in 10000 milliseconds
25/10/15 16:29:05 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10000 milliseconds]. This timeout is controlled by spark.executor.heartbeatInterval
	at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.s

In [25]:
spark.stop()