<img src="https://docs.lakefs.io/assets/logo.svg" alt="lakeFS logo" width=300/> 

## Write-Audit-Publish (WAP) pattern with multiple data formats and cross-collection consistency

**New to Write-Audit-Publish? This [talk](https://www.youtube.com/watch?v=fXHdeBnpXrg&t=1001s) explains it well.**

[@rmoff](https://twitter.com/rmoff/) 

# Initialisation

## Set up Spark 

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("lakeFS / Jupyter") \
        .config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.endpoint", "http://lakefs:8000") \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.access.key", "AKIAIOSFODNN7EXAMPLE") \
        .config("spark.hadoop.fs.s3a.secret.key", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY") \
        .config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
        .getOrCreate()
spark.sparkContext.setLogLevel("INFO")

spark




## Set up the connection to lakeFS

In [2]:
import lakefs_client
from lakefs_client.client import LakeFSClient

lakefs_config = lakefs_client.Configuration()
lakefs_config.username = 'AKIAIOSFODNN7EXAMPLE'
lakefs_config.password = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
lakefs_config.host = 'http://lakefs:8000'

lakefs = LakeFSClient(lakefs_config)
lakefs_api_client = lakefs_client.ApiClient(lakefs_config)

### Get the first repository present in lakeFS

In [3]:
repo=lakefs.repositories.list_repositories().results[0]
print(f"Using lakeFS repository '{repo.id}' with storage namespace {repo.storage_namespace}")

Using lakeFS repository 'example' with storage namespace s3://example


### Define the data storage directory based on the provided namespace

In [4]:
data_dir=repo.storage_namespace.replace('s3','s3a')
print(f"Using {data_dir} for data storage")

Using s3a://example for data storage


In [5]:
branch="main"
main_datapath=(f"{data_dir}/{branch}")
print(f"Main data path:\t{main_datapath}")

Main data path:	s3a://example/main


## Upload sample data files to lakeFS

In [8]:
import os
import boto3

s3 = boto3.client('s3',
                  endpoint_url='http://lakefs:8000/',
                  aws_access_key_id='AKIAIOSFODNN7EXAMPLE',
                  aws_secret_access_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY')


# Set the path to the folder you want to upload
folder_path = '/data/DOI-10-13012-b2idb-7865141_v1'

# Set the S3 bucket name and key prefix
bucket_name = repo.id
branch="main"
key_prefix = f"{branch}/src/"

# Iterate over the files in the folder and upload each file to S3
for root, dirs, files in os.walk(folder_path):
    for file in files:
        local_path = os.path.join(root, file)
        s3_key = os.path.join(key_prefix, os.path.relpath(local_path, folder_path))
        s3.upload_file(local_path, bucket_name, s3_key)
        print(f"Uploaded {local_path} to {bucket_name}/{s3_key}")
        

Uploaded /data/DOI-10-13012-b2idb-7865141_v1/dataset_info.txt to example/main/src/dataset_info.txt
Uploaded /data/DOI-10-13012-b2idb-7865141_v1/morrow-plots_README.txt to example/main/src/morrow-plots_README.txt
Uploaded /data/DOI-10-13012-b2idb-7865141_v1/morrow-plots_v01_1888-2019_soil.csv to example/main/src/morrow-plots_v01_1888-2019_soil.csv
Uploaded /data/DOI-10-13012-b2idb-7865141_v1/morrow-plots_v01_2020-2021_soil.csv to example/main/src/morrow-plots_v01_2020-2021_soil.csv
Uploaded /data/DOI-10-13012-b2idb-7865141_v1/morrow-plots_v01_codebook.pdf to example/main/src/morrow-plots_v01_codebook.pdf
Uploaded /data/DOI-10-13012-b2idb-7865141_v1/morrow-plots_v01x_2020-2021_soil.csv to example/main/src/morrow-plots_v01x_2020-2021_soil.csv


### Commit the sample data to the `main` branch

In [9]:
from lakefs_client.api import commits_api
from lakefs_client.model.commit import Commit
from lakefs_client.model.commit_creation import CommitCreation

api_instance = commits_api.CommitsApi(lakefs_api_client)

api_instance.commit(repo.id, 
                    'main', 
                    CommitCreation(
                        message="Import the source data", 
                        metadata={'url': 'https://databank.illinois.edu/datasets/IDB-7865141', 
                                  'citation': 'Morrow Plots Data Curation Working Group (2022): Morrow Plots Treatment and Yield Data. University of Illinois at Urbana-Champaign. https://doi.org/10.13012/B2IDB-7865141_V1'
                                 }) )

{'committer': 'docker',
 'creation_date': 1684407937,
 'id': '5bea70de7a40f2ae5ab7528c62ef9f8035feb5b68c3e6122f16ee0d7629fe716',
 'message': 'Import the source data',
 'meta_range_id': '',
 'metadata': {'citation': 'Morrow Plots Data Curation Working Group (2022): '
                          'Morrow Plots Treatment and Yield Data. University '
                          'of Illinois at Urbana-Champaign. '
                          'https://doi.org/10.13012/B2IDB-7865141_V1',
              'url': 'https://databank.illinois.edu/datasets/IDB-7865141'},
 'parents': ['2393b3f1e52d51968bac08cd25cd8020e05a1534e8ac0312429c4fab0f0bcbb6']}

### 👉🏻 [View of the files in lakeFS](http://localhost:8000/repositories/example/objects?ref=main&path=src%2F)

----

# Import the first CSV file and store it as a Delta Lake table

## Create a branch

In [10]:
from lakefs_client.api import branches_api
from lakefs_client.model.branch_creation import BranchCreation

api_instance = branches_api.BranchesApi(lakefs_api_client)

branch="initial_load"
api_instance.create_branch(repo.id, 
                           BranchCreation(
                                name=branch,
                                source="main"
                            ))

'5bea70de7a40f2ae5ab7528c62ef9f8035feb5b68c3e6122f16ee0d7629fe716'

In [11]:
base_datapath=(f"{data_dir}/{branch}")
print(f"Branch:\t\t{branch}\nBase data path:\t{base_datapath}")

Branch:		initial_load
Base data path:	s3a://example/initial_load


## Load the data into a data frame

In [12]:
src_csv=(f"{base_datapath}/src/morrow-plots_v01_1888-2019_soil.csv")
df = spark.read.csv(src_csv,inferSchema=True,header=True)
df.createOrReplaceTempView("soil_src")

In [13]:
%sql SELECT min(year),max(year) FROM soil_src 

min(year),max(year)
1888,2019


## Write the data to Delta Table

In [14]:
df.write.format("delta").mode('overwrite').save(f"{base_datapath}/raw/soil")

## Table structure

In [15]:
%sql DESCRIBE EXTENDED delta.`{base_datapath}/raw/soil`

col_name,data_type,comment
phase,int,
year,int,
plot,string,
plot_num,int,
plot_dir,string,
rotation,string,
corn,string,
crop,string,
variety,string,
all_corn,string,


## Query the Delta Table

In [16]:
%sql SELECT COUNT(*),min(year),max(year) as num_rows FROM delta.`{base_datapath}/raw/soil`

count(1),min(year),num_rows
3168,1888,2019


## Commit the Delta Table in lakeFS

In [17]:
api_client = lakefs_client.ApiClient(lakefs_config)

from lakefs_client.api import commits_api
from lakefs_client.model.commit import Commit
from lakefs_client.model.commit_creation import CommitCreation

api_instance = commits_api.CommitsApi(lakefs_api_client)

api_instance.commit(repo.id, branch, CommitCreation(
    message="Convert CSV to Delta Lake table",
    metadata={'src_file': 'morrow-plots_v01_1888-2019_soil.csv'}
) )

{'committer': 'docker',
 'creation_date': 1684407964,
 'id': 'b53446a273fb1fd91fed59b199ecde494518ad9fbf844fb7f70f2cf20614a740',
 'message': 'Convert CSV to Delta Lake table',
 'meta_range_id': '',
 'metadata': {'src_file': 'morrow-plots_v01_1888-2019_soil.csv'},
 'parents': ['5bea70de7a40f2ae5ab7528c62ef9f8035feb5b68c3e6122f16ee0d7629fe716']}

----

# Build a couple of aggregate tables

In [18]:
dims = {
    "variety",
    "plot"
}

for dim in dims:
    spark.sql(f"""CREATE OR REPLACE TABLE agg_{dim} USING DELTA 
                    LOCATION '{base_datapath}/aggs/agg_{dim}'
                    AS SELECT {dim}, 
                              COUNT(*) AS record_ct, 
                              PERCENTILE_APPROX(yield_bush,0.5) AS median_yield,
                              min(year) AS min_year,
                              max(year) AS max_year
                        FROM delta.`{base_datapath}/raw/soil`
                        GROUP BY {dim}""")

In [19]:
spark.sql("show tables").show()

+---------+-----------+-----------+
|namespace|  tableName|isTemporary|
+---------+-----------+-----------+
|  default|   agg_plot|      false|
|  default|agg_variety|      false|
|         |   soil_src|      false|
+---------+-----------+-----------+



## Commit the change 

### 👉🏻 [Uncommitted changes in lakeFS](http://localhost:8000/repositories/example/changes?ref=initial_load)

In [20]:
api_client = lakefs_client.ApiClient(lakefs_config)

from lakefs_client.api import commits_api
from lakefs_client.model.commit import Commit
from lakefs_client.model.commit_creation import CommitCreation

api_instance = commits_api.CommitsApi(lakefs_api_client)
# commit_creation = CommitCreation(
#     message=f"Build two aggregate tables"
# ) 


api_instance.commit(repo.id, branch, CommitCreation(f"Build two aggregate tables"))

{'committer': 'docker',
 'creation_date': 1684407991,
 'id': '8815c6f65e2f019d718d4ce63e07b7a3cc6e2e01b8e00943c2cff784d65ec28b',
 'message': 'Build two aggregate tables',
 'meta_range_id': '',
 'metadata': {},
 'parents': ['b53446a273fb1fd91fed59b199ecde494518ad9fbf844fb7f70f2cf20614a740']}

## We're happy with the change, so merge the branch back into `main`

In [21]:
lakefs.refs.merge_into_branch(repository=repo.id, source_ref=branch, destination_branch='main')

{'reference': '996096b3ccaedbfa5c05ec2980beb0786049c11c8b74f400ad9fbf338b949cc4',
 'summary': {'added': 0, 'changed': 0, 'conflict': 0, 'removed': 0}}

----

----

# 👀 WAP stuff happens now 👇🏻

----

----

## Load new data for year 2020 and 2021

_and use the Write-Audit-Publish pattern as part of this process_

## Start a new Spark session

This would be a new Spark session in real life anyway, and is needed here so that table locations don't get confused when reused. 

In [22]:
spark.stop()

In [23]:
spark = SparkSession.builder.appName("lakeFS / Jupyter") \
        .config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.endpoint", "http://lakefs:8000") \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.access.key", "AKIAIOSFODNN7EXAMPLE") \
        .config("spark.hadoop.fs.s3a.secret.key", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY") \
        .config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
        .getOrCreate()
spark.sparkContext.setLogLevel("INFO")

spark

## Create a branch for our work

In [24]:
branch="load_new_data_20-21"

In [25]:
base_datapath=(f"{data_dir}/{branch}")
print(f"Branch:\t\t{branch}\nBase data path:\t{base_datapath}")

Branch:		load_new_data_20-21
Base data path:	s3a://example/load_new_data_20-21


In [26]:
from lakefs_client.api import branches_api
from lakefs_client.model.branch_creation import BranchCreation

api_instance = branches_api.BranchesApi(lakefs_api_client)

api_instance.create_branch(repo.id, BranchCreation(branch,"main"))

'996096b3ccaedbfa5c05ec2980beb0786049c11c8b74f400ad9fbf338b949cc4'

## Load the new data into a data frame

In [27]:
src_csv=(f"{data_dir}/{branch}/src/morrow-plots_v01x_2020-2021_soil.csv")

In [28]:
df = spark.read.csv(src_csv,inferSchema=True,header=True)
df.createOrReplaceTempView("soil_src")

In [29]:
spark.sql("SELECT min(year),max(year) FROM soil_src").show()

+---------+---------+
|min(year)|max(year)|
+---------+---------+
|     2020|     2021|
+---------+---------+



## Align schemas

CSV import just takes a best guess as data types, and the new set of data whilst matching on fields may not on derived data types

In [30]:
from pyspark.sql.functions import col

existing_table = spark.read.format("delta").load(f"{base_datapath}/raw/soil")

# Get the schema of each table
existing_schema = existing_table.schema
new_schema = df.schema

# Compare the schema of the tables
if existing_schema == new_schema:
    print("The schema of the two tables is the same.")
else:
    print("The schema of the two tables is different.")
    # Find any mismatched field types
    for field1, field2 in zip(existing_schema, new_schema):
        if field1 != field2:
            print(f"Field {field1.name} has type {field1.dataType} in existing table, but type {field2.dataType} in import data. Fixing.")
            # Update the newly-imported data to have the same schema as the existing data
            df = df.withColumnRenamed(field2.name, field1.name).withColumn(field1.name, col(field2.name).cast(field1.dataType))

The schema of the two tables is different.
Field rotation has type StringType() in existing table, but type IntegerType() in import data. Fixing.
Field corn has type StringType() in existing table, but type BooleanType() in import data. Fixing.
Field all_corn has type StringType() in existing table, but type BooleanType() in import data. Fixing.
Field yield_bush has type StringType() in existing table, but type IntegerType() in import data. Fixing.
Field treated has type StringType() in existing table, but type BooleanType() in import data. Fixing.


## Append the new data to existing Table

In [31]:
df.write.format("delta").mode('append').save(f"{base_datapath}/raw/soil")

## Inspecting the staged/unpublished data

### Staged/unpublished data

#### The changes are reflected in the table:

In [32]:
%sql SELECT min(year),max(year) FROM delta.`{base_datapath}/raw/soil`

min(year),max(year)
1888,2021


### Published data

The data on the `main` branch remains unchanged. We can validate this by running a query against the data, specifying `main` as the branch:

In [33]:
%sql SELECT min(year),max(year) FROM delta.`{main_datapath}/raw/soil`

min(year),max(year)
1888,2019


## Rebuild aggregates with new data

In [34]:
dims = {
    "variety",
    "plot"
}

for dim in dims:
    spark.sql(f"""CREATE OR REPLACE TABLE agg_{dim} USING DELTA 
                    LOCATION '{base_datapath}/aggs/agg_{dim}'
                    AS SELECT {dim}, 
                              COUNT(*) AS record_ct, 
                              PERCENTILE_APPROX(yield_bush,0.5) AS median_yield,
                              min(year) AS min_year,
                              max(year) AS max_year
                        FROM delta.`{base_datapath}/raw/soil`
                        GROUP BY {dim}""")

## Reminder: the changes are not published yet! The `main` copy of the data remains as it was before

### Staged/unpublished data

In [35]:
%sql SELECT "agg_variety" AS table,COUNT(*) AS row_ct,min(min_year),max(max_year) FROM delta.`{base_datapath}/aggs/agg_variety` UNION ALL SELECT "agg_plot" AS table,COUNT(*) AS row_ct,min(min_year),max(max_year) FROM delta.`{base_datapath}/aggs/agg_plot` 

table,row_ct,min(min_year),max(max_year)
agg_variety,46,1888,2021
agg_plot,25,1888,2021


### Published data

The data on the `main` branch remains unchanged. We can validate this by running a query against the data, specifying `main` as the branch:

In [36]:
%sql SELECT "agg_variety" AS table,COUNT(*) AS row_ct,min(min_year),max(max_year) FROM delta.`{main_datapath}/aggs/agg_variety` UNION ALL SELECT "agg_plot" AS table,COUNT(*) AS row_ct,min(min_year),max(max_year) FROM delta.`{main_datapath}/aggs/agg_plot` 

table,row_ct,min(min_year),max(max_year)
agg_variety,46,1888,2019
agg_plot,24,1888,2019


# Audit 

At the moment the data is written to the audit branch (`data_load`), but not published to `main`. 

How you audit the data is up to you. The nice thing about the data being staged is that you can do it within the same ETL job, or have another tool do it.

Here's a very simple example of doing in Python. We're going to programatically check that: 

1. The year on each table we've loaded matches the most recent year in the source CSV file
2. From a data quality point of view, there should be no NULLs in the dimension field for each aggregate

## Has the latest data been added to each table? 

In [37]:
from pyspark.sql.functions import desc

src_csv=(f"{data_dir}/{branch}/src/morrow-plots_v01x_2020-2021_soil.csv")
latest_year_src = spark.read.csv(src_csv,inferSchema=True,header=True).orderBy(desc("year")).first()["year"]
print(f"Latest year in source file is {latest_year_src}")

Latest year in source file is 2021


Then we get the latest year in each table in turn and compare it to the source

In [38]:
tables = {
    "raw/soil": "year",
    "aggs/agg_variety": "max_year",
    "aggs/agg_plot": "max_year"
}

for table, year_col in tables.items():
    df = spark.read.format("delta").load(f"{base_datapath}/{table}")
    latest_year = df.selectExpr(f"max({year_col})").collect()[0][0]

    if ( latest_year_src!=latest_year ):
        raise ValueError(f"Audit failed: latest year on {table} ({latest_year}) does not match source ({latest_year_src})")
    else:
        print(f"🙌 Audit has passed: the latest year on {table} ({latest_year}) matches the source ({latest_year_src})")

🙌 Audit has passed: the latest year on raw/soil (2021) matches the source (2021)
🙌 Audit has passed: the latest year on aggs/agg_variety (2021) matches the source (2021)
🙌 Audit has passed: the latest year on aggs/agg_plot (2021) matches the source (2021)


## Are there any NULLs in the dimension of each aggregate?

In [39]:
dims = {
    "variety",
    "plot"
}

for dim in dims:
    ct=spark.sql(f"SELECT COUNT(*) AS ct FROM agg_{dim} WHERE {dim} IS NULL").first()['ct']
    
    if ( ct!=0 ):
        raise ValueError(f"Audit failed: Aggregate agg_{dim} has {ct} non-null rows")
    else:
        print(f"🙌 Audit has passed: Aggregate agg_{dim} has 0 non-null rows")

ValueError: Audit failed: Aggregate agg_plot has 1 non-null rows

## The audit failed! Oh noes!

`ValueError: Audit failed: Aggregate agg_plot has 1 non-null rows`

Why would `plot` be null? Let's look at the underlying data. First, is it just the new data (for 2020 and 2021)? 

In [40]:
%sql SELECT * FROM delta.`{base_datapath}/raw/soil` WHERE plot IS NULL AND year NOT IN (2020,2021);

phase,year,plot,plot_num,plot_dir,rotation,corn,crop,variety,all_corn,yield_bush,yield_ton,treated,treatment,manure,lime,nit,p205,k20,stover,population,plant_date,plant_day,soil_sample,damage,notes


It's only in the new data. Let's look at the source CSV file:

In [41]:
src=spark.read.text(src_csv)

src.show(1)
src.sample(fraction=0.2, seed=42).show()

+--------------------+
|               value|
+--------------------+
|phase,year,plot,p...|
+--------------------+
only showing top 1 row

+--------------------+
|               value|
+--------------------+
|5,2020,,3,SE,1,tr...|
|5,2020,,4,SE,2,fa...|
|5,2020,,5,NW,3,fa...|
|5,2021,,3,NW,1,tr...|
|5,2021,,4,NE,2,tr...|
|5,2021,,4,SW,2,tr...|
|5,2021,,5,SE,3,tr...|
+--------------------+



We can see the third field is `plot` (based on the header), and based on a random sample of the file seems to be always empty. 

🤦🏻 turns out we're using the wrong source file! Let's fix that and check it looks better

In [42]:
src_csv=(f"{data_dir}/{branch}/src/morrow-plots_v01_2020-2021_soil.csv")

In [43]:
src=spark.read.text(src_csv)

src.show(1)
src.sample(fraction=0.2, seed=42).show()

+--------------------+
|               value|
+--------------------+
|phase,year,plot,p...|
+--------------------+
only showing top 1 row

+--------------------+
|               value|
+--------------------+
|5,2020,3SC,3,SE,1...|
|5,2020,4SD,4,SE,2...|
|5,2020,5NB,5,NW,3...|
|5,2021,3NB,3,NW,1...|
|5,2021,4ND,4,NE,2...|
|5,2021,4SB,4,SW,2...|
|5,2021,5SD,5,SE,3...|
+--------------------+



**That** does look better - we can see the plot value clearly in the source. 

Before we can re-run the data load we need to reset the branch to undo the existing unpublished work

### Show list of uncommitted changes in lakeFS

👉🏻 [Web UI showing uncommitted changes in lakeFS](http://localhost:8000/repositories/example/changes?ref=data_load)

In [44]:
api_instance = branches_api.BranchesApi(api_client)

api_response = api_instance.diff_branch(repo.id, branch)
if api_response.pagination.results==0:
    display("Nothing to commit")
else:
    display(api_response.results)

[{'path': 'aggs/agg_plot/_delta_log/00000000000000000001.json',
  'path_type': 'object',
  'size_bytes': 8885,
  'type': 'added'},
 {'path': 'aggs/agg_plot/part-00000-5367224f-97f0-4661-897b-4be013fd503a-c000.snappy.parquet',
  'path_type': 'object',
  'size_bytes': 8885,
  'type': 'added'},
 {'path': 'aggs/agg_variety/_delta_log/00000000000000000001.json',
  'path_type': 'object',
  'size_bytes': 8885,
  'type': 'added'},
 {'path': 'aggs/agg_variety/part-00000-88223df5-3048-42e3-ab4e-b64672cae9b8-c000.snappy.parquet',
  'path_type': 'object',
  'size_bytes': 8885,
  'type': 'added'},
 {'path': 'raw/soil/_delta_log/00000000000000000001.json',
  'path_type': 'object',
  'size_bytes': 8885,
  'type': 'added'},
 {'path': 'raw/soil/part-00000-a9a3b430-621d-4486-84a6-d264eaf165e7-c000.snappy.parquet',
  'path_type': 'object',
  'size_bytes': 8885,
  'type': 'added'}]

### Reset the branch

In [45]:
from lakefs_client.model.reset_creation import ResetCreation

print(f"Reseting branch {branch} on repo {repo.id}")
lakefs.branches.reset_branch(repo.id, branch,ResetCreation("reset"))

Reseting branch load_new_data_20-21 on repo example


### Now re-run the data load

# ⚠️⚠️⚠️⚠️⚠️⚠️ RESTART THE JUPYTER KERNEL BEFORE CONTINUING

This kills the kernel and forces it to restart

In [None]:
import os
os._exit(00)

# ⚠️⚠️⚠️⚠️⚠️⚠️ RESTART THE JUPYTER KERNEL BEFORE CONTINUING

### Set stuff back up again

In [1]:
import lakefs_client
from lakefs_client.client import LakeFSClient

lakefs_config = lakefs_client.Configuration()
lakefs_config.username = 'AKIAIOSFODNN7EXAMPLE'
lakefs_config.password = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
lakefs_config.host = 'http://lakefs:8000'

lakefs = LakeFSClient(lakefs_config)
lakefs_api_client = lakefs_client.ApiClient(lakefs_config)

In [2]:
repo=lakefs.repositories.list_repositories().results[0]
print(f"Using lakeFS repository '{repo.id}' with storage namespace {repo.storage_namespace}")

Using lakeFS repository 'example' with storage namespace s3://example


In [3]:
data_dir=repo.storage_namespace.replace('s3','s3a')
print(f"Using {data_dir} for data storage")
main_datapath=(f"{data_dir}/main")

Using s3a://example for data storage


In [4]:
branch="load_new_data_20-21"

In [5]:
base_datapath=(f"{data_dir}/{branch}")
print(f"Branch:\t\t{branch}\nBase data path:\t{base_datapath}")

Branch:		load_new_data_20-21
Base data path:	s3a://example/load_new_data_20-21


In [6]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(f"lakeFS / Jupyter {branch}") \
        .config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.endpoint", "http://lakefs:8000") \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.access.key", "AKIAIOSFODNN7EXAMPLE") \
        .config("spark.hadoop.fs.s3a.secret.key", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY") \
        .config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
        .getOrCreate()
spark.sparkContext.setLogLevel("INFO")

spark

## Load the new data from the corrected source file

In [7]:
src_csv=(f"{data_dir}/{branch}/src/morrow-plots_v01_2020-2021_soil.csv")

In [8]:
print(f"Source file: {src_csv}")

Source file: s3a://example/load_new_data_20-21/src/morrow-plots_v01_2020-2021_soil.csv


In [9]:
df = spark.read.csv(src_csv,inferSchema=True,header=True)
df.createOrReplaceTempView("soil_src")

In [10]:
%sql SELECT min(year),max(year) FROM soil_src 

min(year),max(year)
2020,2021


## Align schemas

In [11]:
from pyspark.sql.functions import col

existing_table = spark.read.format("delta").load(f"{base_datapath}/raw/soil")

# Get the schema of each table
existing_schema = existing_table.schema
new_schema = df.schema

# Compare the schema of the tables
if existing_schema == new_schema:
    print("The schema of the two tables is the same.")
else:
    print("The schema of the two tables is different.")
    # Find any mismatched field types
    for field1, field2 in zip(existing_schema, new_schema):
        if field1 != field2:
            print(f"Field {field1.name} has type {field1.dataType} in existing table, but type {field2.dataType} in import data. Fixing.")
            # Update the newly-imported data to have the same schema as the existing data
            df = df.withColumnRenamed(field2.name, field1.name).withColumn(field1.name, col(field2.name).cast(field1.dataType))

The schema of the two tables is different.
Field rotation has type StringType() in existing table, but type IntegerType() in import data. Fixing.
Field corn has type StringType() in existing table, but type BooleanType() in import data. Fixing.
Field all_corn has type StringType() in existing table, but type BooleanType() in import data. Fixing.
Field yield_bush has type StringType() in existing table, but type IntegerType() in import data. Fixing.
Field treated has type StringType() in existing table, but type BooleanType() in import data. Fixing.


## Append the new data to existing Table

In [12]:
df.write.format("delta").mode('append').save(f"{base_datapath}/raw/soil")

## Inspecting the staged/unpublished data

### Staged/unpublished data

#### The changes are reflected in the table:

In [13]:
%sql SELECT min(year),max(year) FROM delta.`{base_datapath}/raw/soil`

min(year),max(year)
1888,2021


### Published data

The data on the `main` branch remains unchanged. We can validate this by running a query against the data, specifying `main` as the branch:

In [14]:
%sql SELECT min(year),max(year) FROM delta.`{main_datapath}/raw/soil`

min(year),max(year)
1888,2019


## Rebuild aggregates with new data

In [15]:
dims = {
    "variety",
    "plot"
}

for dim in dims:
    spark.sql(f"""CREATE OR REPLACE TABLE agg_{dim} USING DELTA 
                    LOCATION '{base_datapath}/aggs/agg_{dim}'
                    AS SELECT {dim}, 
                              COUNT(*) AS record_ct, 
                              PERCENTILE_APPROX(yield_bush,0.5) AS median_yield,
                              min(year) AS min_year,
                              max(year) AS max_year
                        FROM delta.`{base_datapath}/raw/soil`
                        GROUP BY {dim}""")

## Reminder: the changes are not published yet! The `main` copy of the data remains as it was before

### Staged/unpublished data

In [16]:
%sql SELECT "agg_variety" AS table,COUNT(*) AS row_ct,min(min_year),max(max_year) FROM delta.`{base_datapath}/aggs/agg_variety` UNION ALL SELECT "agg_plot" AS table,COUNT(*) AS row_ct,min(min_year),max(max_year) FROM delta.`{base_datapath}/aggs/agg_plot` 

table,row_ct,min(min_year),max(max_year)
agg_variety,46,1888,2021
agg_plot,24,1888,2021


### Published data

The data on the `main` branch remains unchanged. We can validate this by running a query against the data, specifying `main` as the branch:

In [17]:
%sql SELECT "agg_variety" AS table,COUNT(*) AS row_ct,min(min_year),max(max_year) FROM delta.`{main_datapath}/aggs/agg_variety` UNION ALL SELECT "agg_plot" AS table,COUNT(*) AS row_ct,min(min_year),max(max_year) FROM delta.`{main_datapath}/aggs/agg_plot` 

table,row_ct,min(min_year),max(max_year)
agg_variety,46,1888,2019
agg_plot,24,1888,2019


# Audit 

At the moment the data is written to the audit branch (`data_load`), but not published to `main`. 

How you audit the data is up to you. The nice thing about the data being staged is that you can do it within the same ETL job, or have another tool do it.

Here's a very simple example of doing in Python. We're going to programatically check that: 

1. The year on each table we've loaded matches the most recent year in the source CSV file
2. From a data quality point of view, there should be no NULLs in the dimension field for each aggregate

## Has the latest data been added to each table? 

In [18]:
from pyspark.sql.functions import desc

src_csv=(f"{data_dir}/{branch}/src/morrow-plots_v01x_2020-2021_soil.csv")
latest_year_src = spark.read.csv(src_csv,inferSchema=True,header=True).orderBy(desc("year")).first()["year"]
print(f"Latest year in source file is {latest_year_src}")

Latest year in source file is 2021


Then we get the latest year in each table in turn and compare it to the source

In [19]:
tables = {
    "raw/soil": "year",
    "aggs/agg_variety": "max_year",
    "aggs/agg_plot": "max_year"
}

for table, year_col in tables.items():
    df = spark.read.format("delta").load(f"{base_datapath}/{table}")
    latest_year = df.selectExpr(f"max({year_col})").collect()[0][0]

    if ( latest_year_src!=latest_year ):
        raise ValueError(f"Audit failed: latest year on {table} ({latest_year}) does not match source ({latest_year_src})")
    else:
        print(f"🙌 Audit has passed: the latest year on {table} ({latest_year}) matches the source ({latest_year_src})")

🙌 Audit has passed: the latest year on raw/soil (2021) matches the source (2021)
🙌 Audit has passed: the latest year on aggs/agg_variety (2021) matches the source (2021)
🙌 Audit has passed: the latest year on aggs/agg_plot (2021) matches the source (2021)


## Are there any NULLs in the dimension of each aggregate?

In [20]:
dims = {
    "variety",
    "plot"
}

for dim in dims:
    ct=spark.sql(f"SELECT COUNT(*) AS ct FROM agg_{dim} WHERE {dim} IS NULL").first()['ct']
    
    if ( ct!=0 ):
        raise ValueError(f"Audit failed: Aggregate agg_{dim} has {ct} non-null rows")
    else:
        print(f"🙌 Audit has passed: Aggregate agg_{dim} has 0 non-null rows")

🙌 Audit has passed: Aggregate agg_variety has 0 non-null rows
🙌 Audit has passed: Aggregate agg_plot has 0 non-null rows


----

----

# 🎈🎈🎈🎉🎉🎉🎉 The Audit passed! We are now ready to Publish

# 🎈🎈🎈🎉🎉🎉🎉 The Audit passed! We are now ready to Publish

# 🎈🎈🎈🎉🎉🎉🎉 The Audit passed! We are now ready to Publish

# 🎈🎈🎈🎉🎉🎉🎉 The Audit passed! We are now ready to Publish

----

----

# Publish

Publishing data in lakeFS means merging the audit branch back into `main`, making it available to anyone working with the data in that branch.

## Commit the data to the audit [working] branch 

We can add a commit message, as well as optional metadata

In [21]:
from lakefs_client.api import commits_api
from lakefs_client.model.commit import Commit
from lakefs_client.model.commit_creation import CommitCreation

api_instance = commits_api.CommitsApi(lakefs_api_client)

api_instance.commit(repo.id, branch, CommitCreation(
    message="Add data for years 2020 and 2021",
    metadata={'src_file': 'morrow-plots_v01_2020-2021_soil.csv'}
) )

{'committer': 'docker',
 'creation_date': 1684408072,
 'id': 'c1170427f2e59a291e7a344a10efc057a4b83e3d37aab5427170f3e9405f2c1f',
 'message': 'Add data for years 2020 and 2021',
 'meta_range_id': '',
 'metadata': {'src_file': 'morrow-plots_v01_2020-2021_soil.csv'},
 'parents': ['996096b3ccaedbfa5c05ec2980beb0786049c11c8b74f400ad9fbf338b949cc4']}

## Merge the branch back into `main`

In [22]:
lakefs.refs.merge_into_branch(repository=repo.id, source_ref=branch, destination_branch='main')

{'reference': '00b07af2b550fdee6a19437999806b67bd2aae894220881293b790207205a6c1',
 'summary': {'added': 0, 'changed': 0, 'conflict': 0, 'removed': 0}}

## Inspect the published data

The data on `main` now includes the additional data for 2020 and 2021, and can be read by anyone using this trunk branch

In [23]:
%sql SELECT "agg_variety" AS table,COUNT(*) AS row_ct,min(min_year),max(max_year) FROM delta.`{main_datapath}/aggs/agg_variety` UNION ALL SELECT "agg_plot" AS table,COUNT(*) AS row_ct,min(min_year),max(max_year) FROM delta.`{main_datapath}/aggs/agg_plot` 

table,row_ct,min(min_year),max(max_year)
agg_variety,46,1888,2021
agg_plot,24,1888,2021


In [24]:
%sql SELECT min(year),max(year) FROM delta.`{main_datapath}/raw/soil`

min(year),max(year)
1888,2021


# Where Next?

* For more information about write-audit-publish see [this talk from Michelle Winters](https://www.youtube.com/watch?v=fXHdeBnpXrg&t=1001s) and [this talk from Sam Redai](https://www.dremio.com/wp-content/uploads/2022/05/Sam-Redai-The-Write-Audit-Publish-Pattern-via-Apache-Iceberg.pdf).
* To try out lakeFS check out the [hands-on Quickstart](https://docs.lakefs.io/quickstart/)