In this notebook we will complete a small end to end data science tutorial that employs LakeFS-spec for data versioning.
We will use weather data to train a random forest classifier and aim to predict whether it is raining a day from now given the current weather. 

Lets set the environment up. You can use the commands below. We first create an environment  (`python -m venv .venv`), activate it (`source .venv/bin/activate`) and install the dependencies (`pip install -r requirements.txt`).

In [2]:
!python -m venv .venv
!source .venv/bin/activate
%pip install -r requirements.txt

Collecting lakefs-spec@ git+https://github.com/appliedAI-Initiative/lakefs-spec.git@e2d188c937310b6053759e7db56804235c64e352
  Using cached lakefs_spec-0.1.5.dev3+ge2d188c-py3-none-any.whl

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.3.1[0m[39;49m -> [0m[32;49m23.2.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


Next, let us set up LakeFS. You can do this by executing the `docker run` command given here [lakeFS quickstart](https://docs.lakefs.io/quickstart/launch.html) in a terminal of your choice. Open a browser and navigate to the lakeFS instance (by default`localhost:8000`). Authenticate with the credentials given in the terminal where you executed the docker container. As an email, you can enter anything, we won't need it in this example. Proceed to create an empty repository. You may call it `weather`.


We will also install the CLI of lakeFS, `lakectl`. Then lakeFS-spec can automatically handle authentication. To do so, open a terminal of your choice and `brew install lakefs`. Then use `lakectl config`. You find the authentication information in the terminal window where you started the LakeFS Docker container. 

Note: for this to work, you need the `pyyaml` package which is not a default dependency of LakeFS-spec. We already installed via the `demo-requirements.txt`. In you own projects you need to add the dependency manually, for example by running `pip install --upgrade pyyaml`, if you want to use automatic authentication.

In [1]:
REPO_NAME = 'weather'

Now it's time to get some data. We will use the [Open-Meteo api](https://open-meteo.com/), where we can pull weather data from an API for free (as long as we are non-commercial) and without an API-token.  

For training, we get the full data of the 2010s from Munich (where I am writing this right now ;) ) 

In [None]:
!curl -o 'data/weather-2010s.json' 'https://archive-api.open-meteo.com/v1/archive?latitude=52.52&longitude=13.41&start_date=2010-01-01&end_date=2019-12-31&hourly=temperature_2m,relativehumidity_2m,rain,pressure_msl,surface_pressure,cloudcover,cloudcover_low,cloudcover_mid,cloudcover_high,windspeed_10m,windspeed_100m,winddirection_10m,winddirection_100m'

The data is in JSON format so we need to wrangle the data a bit to make it usable. But first we will save it into our lakeFS instance. We will create a new branch, `transform-raw-data`.

In [2]:
from lakefs_spec import LakeFSFileSystem

NEW_BRANCH_NAME = 'transform-raw-data'


fs = LakeFSFileSystem()
fs.put('./data/weather-2010s.json',  f'{REPO_NAME}/{NEW_BRANCH_NAME}/weather-2010.json')



Created new branch 'transform-raw-data' from branch 'main'.


Now, on LakeFS in your browser, can change the branch to `transform-raw-data` and see the saved file. However, the change is not yet committed. 
While you can do that manually via the uncommitted changes tab in the lakeFS UI, we will commit the file in a different way. 

To commit changes programmatically, we can register a hook. This hook needs to have the signature `(client, context) -> None`, where the `client` is the file system's LakeFS client. The context object contains information about the requested resource. 



In [3]:
from lakefs_client.client import LakeFSClient
from lakefs_spec.client_helpers import commit
from lakefs_spec.hooks import FSEvent, HookContext

#Define the commit hook
def commit_on_put(client: LakeFSClient, ctx:HookContext) -> None:
    commit_message = f"Add file {ctx.resource}"
    print(f"Attempting Commit: {commit_message}")
    commit(client, repository=ctx.repository, branch=ctx.ref, message=commit_message)
    

#Register the commit hook to be executed after a PUT_FILE event
fs.register_hook(FSEvent.PUT_FILE, commit_on_put)


However, when you execute the next cell, you will see a message indicating that the upload of the resource has been skipped because the file is uploaded to lakeFS already, and the checksums match. This is useful when we work with large files to reduce the amount of network traffic. Nonetheless, in this specific situation the `PUT` is not executed and neither is our commit hook. 


In [4]:
fs.put('data/weather-2010s.json',  f'{REPO_NAME}/{NEW_BRANCH_NAME}/weather-2010.json')

Skipping upload of resource '/Users/maxmynter/Desktop/appliedAI/lakefs/spec/demos/data/weather-2010s.json' to remote path 'weather/transform-raw-data/weather-2010.json': Resource 'weather/transform-raw-data/weather-2010.json' exists and checksums match.


We can circumvent this by disabling pre-checking file checksums on a specific put operation. We do this by passing `precheck=False` to the `PUT` operation.

In [5]:
fs.put('data/weather-2010s.json',  f'{REPO_NAME}/{NEW_BRANCH_NAME}/weather-2010.json', precheck=False)

Attempting Commit: Add file weather-2010.json


Now let's transform the data for our use case. We put the transformation into a function such that we can reuse it later

In this notebook, we follow a simple toy example to predict whether it is raining at the same time tomorrow given weather data from right now. 

We will skip a lot of possible feature engineering etc. in order to focus on the application of lakeFS and the `LakeFSFileSystem`.

In [6]:
import pandas as pd
import numpy as np
import json 

def transform_json_weather_data(filepath):
    f = open(filepath)
    data = json.load(f)
    f.close()

    df = pd.DataFrame.from_dict(data["hourly"])
    df.time = pd.to_datetime(df.time)
    df['is_raining'] = df.rain > 0
    df['is_raining_in_1_day'] = df.is_raining.shift(24)
    df = df.dropna()
    return df
    
df = transform_json_weather_data('data/weather-2010s.json')
df.head(5)


Unnamed: 0,time,temperature_2m,relativehumidity_2m,rain,pressure_msl,surface_pressure,cloudcover,cloudcover_low,cloudcover_mid,cloudcover_high,windspeed_10m,windspeed_100m,winddirection_10m,winddirection_100m,is_raining,is_raining_in_1_day
24,2010-01-02 00:00:00,-3.0,88,0.0,1004.0,999.2,100,54,100,70,8.3,17.3,18,31,False,False
25,2010-01-02 01:00:00,-3.2,89,0.0,1004.5,999.7,100,37,98,92,9.1,18.6,9,22,False,False
26,2010-01-02 02:00:00,-3.4,89,0.0,1005.2,1000.4,100,24,98,77,10.1,20.3,2,13,False,False
27,2010-01-02 03:00:00,-3.5,89,0.0,1005.6,1000.8,93,8,98,89,11.2,21.7,4,12,False,False
28,2010-01-02 04:00:00,-3.7,90,0.0,1006.2,1001.4,94,6,100,95,11.2,21.8,358,9,False,False


Let us now save this data as a csv into the main branch. The `.to_csv` method calls a `put` operation behind the scenes, our commit hook is called and the file committed. You can verify the saving worked in your LakeFS GUI in the browser when looking at the Comits and uncommitted changes tabs of the main branch. 

In [7]:
df.to_csv(f'lakefs://{REPO_NAME}/main/weather_2010s.csv')

Calling open() in write mode results in unbuffered file uploads, because the lakeFS Python client does not support multipart uploads. Note that uploading large files unbuffered can have performance implications.
Attempting Commit: Add file weather_2010s.csv


Now we will do a train-test split.

In [8]:
import sklearn.model_selection

model_data=df.drop('time', axis=1)

train, test = sklearn.model_selection.train_test_split(model_data)

We save these train and test datasets into a new `training` branch. If the branch does not yet exist, as in this case, it is implicitly created by default. You can control this behaviour with the `create_branch_ok` flag when initializing the `LakeFSFileSystem`.  

In [9]:
TRAINING_BRANCH = 'training'
train.to_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/train_weather.csv')
test.to_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/test_weather.csv')


Calling open() in write mode results in unbuffered file uploads, because the lakeFS Python client does not support multipart uploads. Note that uploading large files unbuffered can have performance implications.
Created new branch 'training' from branch 'main'.
Attempting Commit: Add file train_weather.csv
Calling open() in write mode results in unbuffered file uploads, because the lakeFS Python client does not support multipart uploads. Note that uploading large files unbuffered can have performance implications.
Attempting Commit: Add file test_weather.csv


Implicit branch creation is a convenient way to create new branches programmatically. However, one drawback is that typos in your code might result in new accidental branch creations. If you want to avoid this implicit behavior and raise errors instead, you can disable implicit branch creation by setting `fs.create_branch_ok=False`.

We can now read these files directly from the remote LakeFS instance. (You can verify that neither the train nor the test file are saved in the `/data` directory). 

In [10]:
train = pd.read_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/train_weather.csv', index_col=0)
test = pd.read_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/test_weather.csv', index_col=0)

train.head()

Unnamed: 0,temperature_2m,relativehumidity_2m,rain,pressure_msl,surface_pressure,cloudcover,cloudcover_low,cloudcover_mid,cloudcover_high,windspeed_10m,windspeed_100m,winddirection_10m,winddirection_100m,is_raining,is_raining_in_1_day
84769,13.3,72,0.0,1023.3,1018.7,1,0,1,0,5.6,12.2,255,270,False,False
65384,16.2,60,0.0,1019.2,1014.6,90,100,0,1,27.0,38.7,301,302,False,True
56122,18.6,65,0.0,1017.3,1012.8,54,15,17,100,4.8,5.9,318,317,False,True
34991,7.2,86,0.3,1004.9,1000.3,100,91,99,95,5.4,12.1,176,203,True,False
70540,0.1,88,0.0,1001.7,997.0,34,38,0,0,17.3,32.1,228,233,False,False


We now proceed to train a random forest classifier and evaluate it on the test set. 

In [11]:
from sklearn.ensemble import RandomForestClassifier

dependent_variable = 'is_raining_in_1_day'

model = RandomForestClassifier()
x_train, y_train = train.drop(dependent_variable, axis=1), train[dependent_variable].astype(bool)
x_test, y_test = test.drop(dependent_variable, axis=1), test[dependent_variable].astype(bool)

model.fit(x_train, y_train)

test_acc = model.score(x_test, y_test)

print(f"Test accuracy: {round(test_acc, 4) * 100 } %")

Test accuracy: 88.55 %


Until now, we only have used data from the 2010s. Let's download additional 2020s data, transform it, and save it to LakeFS. 

In [12]:
!curl -o './data/weather-2020s.json' 'https://archive-api.open-meteo.com/v1/archive?latitude=52.52&longitude=13.41&start_date=2020-01-01&end_date=2023-08-31&hourly=temperature_2m,relativehumidity_2m,rain,pressure_msl,surface_pressure,cloudcover,cloudcover_low,cloudcover_mid,cloudcover_high,windspeed_10m,windspeed_100m,winddirection_10m,winddirection_100m'

new_data = transform_json_weather_data('./data/weather-2020s.json')
new_data.to_csv(f'lakefs://{REPO_NAME}/main/weather_2020s.csv')

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 2308k    0 2308k    0     0  1128k      0 --:--:--  0:00:02 --:--:-- 1130k
Calling open() in write mode results in unbuffered file uploads, because the lakeFS Python client does not support multipart uploads. Note that uploading large files unbuffered can have performance implications.
Attempting Commit: Add file weather_2020s.csv


Let's test how well our model performs on 2020s data.

First, we drop the `time` column such that we have the same variables as during the fit in the data. 

In [13]:
new_data = new_data.drop('time', axis=1)

In [14]:
acc_2020s = model.score(new_data.drop(dependent_variable, axis=1), new_data[dependent_variable].astype(bool))

print(f"Test accuracy: {round(acc_2020s, 4) * 100 } %")

Test accuracy: 84.6 %


We have an accuracy similar to the one we had on the 2020s data. Yet, it makes sense to utilize this data for training as well. We will create a concatenated dataframe and perform a new train test split. 

However, this means that we now have multiple models which will perform differently on different datasets. For example, if someone takes the model we are about to train and evaluates it on the 2020s data, the accuracy will probably be higher, because of data leakage. We are going to use some of the data points in the 2020s data to train. 

To circumvent this issue (or at least enable the traceability and reproducibility) we should save the `ref` of the specific datasets. `ref` can be the branch we are pulling the file from LakeFS from.  


We are going to implement versioning with the commit ids now.

First we create the new train test split and save it in the training branch.


In [15]:
df_train = pd.read_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/train_weather.csv', index_col=0)
df_test = pd.read_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/test_weather.csv', index_col=0)

full_data = pd.concat([new_data, df_train, df_test])

train_df, test_df = sklearn.model_selection.train_test_split(full_data, test_size=0.9)

train_df.to_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/train_weather.csv')
test_df.to_csv(f'lakefs://{REPO_NAME}/{TRAINING_BRANCH}/test_weather.csv')

Calling open() in write mode results in unbuffered file uploads, because the lakeFS Python client does not support multipart uploads. Note that uploading large files unbuffered can have performance implications.
Attempting Commit: Add file train_weather.csv
Calling open() in write mode results in unbuffered file uploads, because the lakeFS Python client does not support multipart uploads. Note that uploading large files unbuffered can have performance implications.
Attempting Commit: Add file test_weather.csv


This concatenates the old data, creates a new train-test split, and overwrites the files. Of course, this presents problems with respect to strict versioning. When we get the data directly from a branch, we only get the version from the latest commit. 

Let's use explicit versioning instead and get the actual commit SHA. For that, go into the lakeFS UI, select the training branch, and navigate to the "Commits" tab. 

You should see the latest two commits `Add file test_weather.csv` and `Add file train_weather.csv`.

Copy the ID to your clipboard and paste it below. (They should look something like this `be9e4c17be128bd86a082e9c5eb63135160699edd135b8b6eb78180d070b31a1`).

In [None]:
test_commit_id  = ''
train_commit_id = ''

We can now get the specific dataset versions irrespective of subsequent changes to the files on the branches.

In [None]:
train = pd.read_csv(f"lakefs://{REPO_NAME}/{train_commit_id}/train_weather.csv", index_col=0)
test = pd.read_csv(f"lakefs://{REPO_NAME}/{test_commit_id}/test_weather.csv", index_col=0)

Let's train the model

In [None]:
model.fit(train.drop(dependent_variable, axis=1), train[dependent_variable].astype(bool))

test_acc = model.score(test.drop(dependent_variable, axis=1), test[dependent_variable].astype(bool))

print(f"Test accuracy: {round(test_acc, 4) * 100 } %")

We're done. We have a model trained on the new data. 

Now, we can save the commit SHAs as well as the model and accuracy metrics into an experiment tracking tool of our choice. With this, we achieve reproducible experiments and have a clear trail on what input data and hyperparameters resulted in which set of model weights.  

If in the future we decide to train another model, use different data, or invest more time in feature engineering, we can always come back to the current state to study the model performance and draw insights that might help us down the road. 