In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from datetime import datetime

warehouse_path = "./warehouse"
iceberg_spark_jar  = 'org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.3'
catalog_name = "demo"

# Setup iceberg config
conf = (
    SparkConf()
    .setAppName("EgApp")
    .set(
        "spark.sql.extensions",
        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    )
    .set(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.jars.packages", iceberg_spark_jar)
    .set(f"spark.sql.catalog.{catalog_name}.warehouse", warehouse_path)
    .set(f"spark.sql.catalog.{catalog_name}.type", "hadoop")
    .set("spark.sql.defaultCatalog", catalog_name)
)

sc = SparkSession.builder.master("local").config(conf=conf).getOrCreate()

batch = datetime.now().strftime("%Y%m%d_%H%M%S")
print(batch)

Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=ISO-8859-1
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=ISO-8859-1


:: loading settings :: url = jar:file:/Users/ericyeung/Library/Caches/pypoetry/virtualenvs/evergreen-DHscqOYO-py3.11/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/ericyeung/.ivy2/cache
The jars for the packages stored in: /Users/ericyeung/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.4_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-22b29bc2-d5f9-436e-a870-b71c5d3d6675;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.4_2.12;1.4.3 in central
:: resolution report :: resolve 82ms :: artifacts dl 4ms
	:: modules in use:
	org.apache.iceberg#iceberg-spark-runtime-3.4_2.12;1.4.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.ap

20240229_220216


Setup Iceberg

In [2]:
from conf.init_schema import init_database

init_database(sc)

                                                                                

Postcode related

In [None]:
from extract_utils import extract_csv

df_postcode_lookup = extract_csv(sc, "src_data/National_Statistics_Postcode_Lookup_Latest_Centroids.csv")
df_postcode_lookup.printSchema()
df_postcode_lookup.count()

In [None]:
from transform.postcode import PostcodeTransformer
df_postcode = PostcodeTransformer.transform(df_postcode_lookup)

In [None]:
df_postcode.show()

In [None]:
df_postcode.write.mode("overwrite").insertInto(f"{catalog_name}.db.postcode")

Users related

In [None]:
from pyspark.sql.types import DateType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType

users_schema = StructType(
[StructField("email", StringType(), True),
 StructField("givenName", StringType(), True),
 StructField("familyName", StringType(), True),
 StructField("sex", StringType(), True),
 StructField("dateOfBirth", StringType(), True),
 StructField("address", StructType([
     StructField("street", StringType(), True),
     StructField("postcode", StringType(), True)]), True),]
)

In [None]:
from extract_utils import extract_json

df_users = extract_json(sc, "src_data/users.json", users_schema)
df_users.count()

Data Validation

In [None]:
from validation_utils import check_data_quality

df_validated = check_data_quality(df_users, batch)

In [None]:
df_validated.count()

In [None]:
from transform.users import UserTransformer

df_transformed = UserTransformer.transform(df_validated)
df_transformed.show()

In [None]:
from load_utils import write_to_iceberg

write_to_iceberg(sc, df_transformed, "users")

In [None]:
sc.sql("select count(1) from db.users").show()

In [None]:
df_transformed.createOrReplaceTempView("temp_source")

query = f"""
    MERGE INTO db.users as t USING temp_source as s
            ON (s.last_name = t.last_name)
            WHEN MATCHED AND s.first_name = t.first_name AND s.last_name = t.last_name AND s.dob = t.dob THEN UPDATE SET t.first_name = s.first_name 
            WHEN NOT MATCHED THEN INSERT *
            """

insert_query = "insert into db.users select * from temp_source"

sc.sql(query)

In [None]:
sc.sql("select count(1) from db.users t join temp_source s where s.last_name = t.last_name").show()