In [2]:
import os
from pyspark.sql import SparkSession
print(os.environ["MINIO_ENDPOINT_URL"])
print(f"s3a://{os.environ['WAREHOUSE_BUCKET_NAME']}/iceberg/")

http://minio:9000
s3a://lakehouse/iceberg/


In [6]:
spark = (
        SparkSession.builder.master("spark://spark-master:7077")
        .appName("stg_clockify__time_entries")
        .config(
            "spark.jars.packages",
            "org.apache.hadoop:hadoop-aws:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.2",
        )
        .config("spark.hadoop.fs.s3a.access.key", os.environ["MINIO_ACCESS_KEY"])
        .config("spark.hadoop.fs.s3a.secret.key", os.environ["MINIO_SECRET_KEY"])
        .config("spark.hadoop.fs.s3a.endpoint", os.environ["MINIO_ENDPOINT_URL"])
        .config("spark.hadoop.fs.s3a.path.style.access", "true")
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config("spark.sql.catalog.clockify_catalog", "org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.catalog.clockify_catalog.type", "hive")
        .config("spark.hadoop.hive.metastore.uris", "thrift://metastore:9083")
        .config("spark.sql.catalog.clockify_catalog.warehouse", f"s3a://{os.environ['WAREHOUSE_BUCKET_NAME']}/iceberg/")
        .enableHiveSupport()
        .getOrCreate()
    )

In [9]:
df = spark.read.parquet("s3a://raw/clockify/time-entries/parquet/*.parquet")
df.printSchema()

root
 |-- billable: boolean (nullable = true)
 |-- costRate: struct (nullable = true)
 |    |-- amount: long (nullable = true)
 |    |-- currency: string (nullable = true)
 |-- customFieldValues: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- description: string (nullable = true)
 |-- hourlyRate: struct (nullable = true)
 |    |-- amount: long (nullable = true)
 |    |-- currency: string (nullable = true)
 |-- id: string (nullable = true)
 |-- isLocked: boolean (nullable = true)
 |-- kioskId: string (nullable = true)
 |-- projectId: string (nullable = true)
 |-- tagIds: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- taskId: string (nullable = true)
 |-- timeInterval: struct (nullable = true)
 |    |-- duration: string (nullable = true)
 |    |-- end: string (nullable = true)
 |    |-- start: string (nullable = true)
 |-- type: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- workspaceId: string (nullable = t

In [11]:
df.limit(5).toPandas().head()

Unnamed: 0,billable,costRate,customFieldValues,description,hourlyRate,id,isLocked,kioskId,projectId,tagIds,taskId,timeInterval,type,userId,workspaceId
0,True,"(0, R$)",[],FN,"(0, R$)",6812503ea9fefb2f03fdce00,False,,61e54e2ddc3256444ce00210,,,"(PT58M, 2025-04-30T17:28:54Z, 2025-04-30T16:30...",REGULAR,5e95c064ea8094116e8e0a54,5e95c064ea8094116e8e0a56
1,True,"(0, R$)",[],FN,"(0, R$)",68108631685bcf6ccbc048c3,False,,61e54e2ddc3256444ce00210,[],,"(PT8H, 2025-04-30T16:00:00Z, 2025-04-30T08:00:...",REGULAR,5e95c064ea8094116e8e0a54,5e95c064ea8094116e8e0a56
2,False,"(0, R$)",[],AIRFLOW,"(0, R$)",681d4cfa9ee5b440ea2a2aaa,False,,5e9f4704ea8094116e994d87,[],,"(None, None, 2025-05-09T00:31:54Z)",REGULAR,5e95c064ea8094116e8e0a54,5e95c064ea8094116e8e0a56
3,True,"(0, R$)",[],FN,"(0, R$)",681c62ae1f74a85f2fba2592,False,,61e54e2ddc3256444ce00210,,,"(PT8H13M, 2025-05-08T16:05:14Z, 2025-05-08T07:...",REGULAR,5e95c064ea8094116e8e0a54,5e95c064ea8094116e8e0a56
4,False,"(0, R$)",[],AIRFLOW,"(0, R$)",681086462e56197b9fdc38a3,False,,5e9f4704ea8094116e994d87,[],,"(PT1H30M, 2025-04-28T19:30:00Z, 2025-04-28T18:...",REGULAR,5e95c064ea8094116e8e0a54,5e95c064ea8094116e8e0a56


In [13]:
spark.sql("CREATE NAMESPACE IF NOT EXISTS bronze;").show()
# spark.sql("CREATE DATABASE IF NOT EXISTS bronze;").show()

AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Unable to create database path file:/app/spark-warehouse/bronze.db, failed to create database bronze)

In [None]:
spark.sql("SHOW SCHEMAS;").show()

In [None]:
df.writeTo("clockify_catalog.bronze.time_entries").createOrReplace()

In [None]:
df = spark.read.format("iceberg").load("clockify_catalog.bronze.time_entries")

In [None]:
spark.read.json("s3a://raw/clockify/time-entries/*.json").printSchema()

In [None]:
spark.read.format("iceberg").load("s3a://staging/bronze/time_entries").show()

In [None]:
# spark.sql("SHOW NAMESPACES IN clockify_catalog").show()  # Lists all databases (namespaces)
spark.sql("SHOW TABLES IN clockify_catalog.bronze").show()  # Lists all tables in bronze

In [None]:
sp