In [1]:
import pandas as pd
import requests
import os
import json

## 1. Data Exploration

### 1.1. Download data
Download data from the API, and load it into a pandas dataframe.

In [2]:
URL = "https://opendata.comune.bologna.it/api/explore/v2.1/catalog/datasets/disponibilita-parcheggi-storico/exports/csv?lang=it&timezone=UTC&use_labels=true&delimiter=%3B"

df = pd.read_csv(URL, sep=";")
df[['lat', 'lon']] = df['coordinate'].str.split(', ',expand=True)
df = df.drop(columns=['% occupazione', 'GUID', 'coordinate']).rename(columns={'Parcheggio': 'parcheggio', 'Data': 'data', 'Posti liberi': 'posti_liberi', 'Posti occupati': 'posti_occupati', 'Posti totali': 'posti_totali'})
df

Unnamed: 0,parcheggio,data,posti_liberi,posti_occupati,posti_totali,lat,lon
0,Riva Reno,2023-08-01T00:30:00+00:00,354.0,116.0,470,44.501153,11.336062
1,Riva Reno,2023-08-01T01:00:00+00:00,354.0,116.0,470,44.501153,11.336062
2,VIII Agosto,2023-08-01T02:30:00+00:00,379.0,246.0,625,44.500297,11.345368
3,Riva Reno,2023-08-01T02:30:00+00:00,354.0,116.0,470,44.501153,11.336062
4,Riva Reno,2023-08-01T03:00:00+00:00,354.0,116.0,470,44.501153,11.336062
...,...,...,...,...,...,...,...
67383,Autostazione,2024-03-07T16:09:00+00:00,163.0,102.0,265,44.504422,11.346514
67384,VIII Agosto,2024-03-07T16:19:00+00:00,235.0,390.0,625,44.500297,11.345368
67385,Autostazione,2024-03-07T16:39:00+00:00,168.0,97.0,265,44.504422,11.346514
67386,Autostazione,2024-03-07T16:49:00+00:00,169.0,96.0,265,44.504422,11.346514


### 1.2. Extract parkings
Extract distinct parkings from the dataframe.

In [3]:
KEYS = ['parcheggio', 'lat', 'lon']
df_parcheggi = df.groupby(['parcheggio']).first().reset_index()[KEYS]
df_parcheggi

Unnamed: 0,parcheggio,lat,lon
0,Autostazione,44.50411,11.346404
1,Riva Reno,44.501153,11.336062
2,VIII Agosto,44.500297,11.345368


### 1.3 Aggregate Parking Data
Aggregate Parking Data by date, hour, dow, and parking.

In [4]:
rdf = df.copy()
rdf['data'] = pd.to_datetime(rdf['data'])
rdf['day'] = rdf['data'].apply(lambda t: t.replace(second=0, minute=0))
rdf = rdf.drop(columns=['data','lat','lon'])
grouped = rdf.groupby(['parcheggio','day']).mean()
df_aggregated = grouped.reset_index()
df_aggregated

Unnamed: 0,parcheggio,day,posti_liberi,posti_occupati,posti_totali
0,Autostazione,2023-10-03 16:00:00+00:00,192.666667,72.333333,265.0
1,Autostazione,2023-10-03 17:00:00+00:00,198.250000,66.750000,265.0
2,Autostazione,2023-10-03 18:00:00+00:00,198.000000,67.000000,265.0
3,Autostazione,2023-10-03 19:00:00+00:00,199.750000,65.250000,265.0
4,Autostazione,2023-10-03 20:00:00+00:00,204.750000,60.250000,265.0
...,...,...,...,...,...
12840,VIII Agosto,2024-03-07 12:00:00+00:00,88.333333,536.666667,625.0
12841,VIII Agosto,2024-03-07 13:00:00+00:00,137.833333,487.166667,625.0
12842,VIII Agosto,2024-03-07 14:00:00+00:00,192.500000,432.500000,625.0
12843,VIII Agosto,2024-03-07 15:00:00+00:00,211.333333,413.666667,625.0


## 2. Platform Support - Data Ops

We use the platform support to load the data into the platform, version it, and automate the execution of the data management operations.


### 2.1. Initalization
Create the working context: data management project for the parking data processing. Project is a placeholder for the code, data, and management of the parking data operations. To keep it reproducible, we use the `git` source type to store the definition and code.

In [5]:
import mlrun 

project = mlrun.get_or_create_project("parcheggi", "./")
project.set_source(source="git://github.com/ffais/gdb-project-parkings.git#refs/heads/main", pull_at_runtime=True)
project.save()

> 2024-03-07 16:12:21,477 [info] Loading project from path: {'project_name': 'parcheggi', 'path': './'}
> 2024-03-07 16:12:21,542 [info] Project loaded successfully: {'project_name': 'parcheggi', 'path': './', 'stored_in_db': True}


<mlrun.projects.project.MlrunProject at 0x7f35f042c820>

### 2.2. Data management functions
We convert the data management ETL operations into functions - single executable operations that can be executed in the platform.

In [6]:
%%writefile "src/download-all.py"

import mlrun
import pandas as pd

@mlrun.handler(outputs=["dataset"])
def downloader(context, url: mlrun.DataItem):
    df = url.as_df(format='csv',sep=";")
    df[['lat', 'lon']] = df['coordinate'].str.split(', ',expand=True)
    df = df.drop(columns=['% occupazione', 'GUID', 'coordinate']).rename(columns={'Parcheggio': 'parcheggio', 'Data': 'data', 'Posti liberi': 'posti_liberi', 'Posti occupati': 'posti_occupati', 'Posti totali': 'posti_totali'})
    df["lat"] = pd.to_numeric(df["lat"])
    df["lon"] = pd.to_numeric(df["lon"])
    return df

Overwriting src/download-all.py


In [7]:
project.set_function("src/download-all.py", name="download-all", kind="job", image="mlrun/mlrun", handler="downloader")

<mlrun.runtimes.kubejob.KubejobRuntime at 0x7fc734e22040>

In [8]:
%%writefile "src/extract_parkings.py"

import mlrun
import pandas as pd

@mlrun.handler(outputs=["parkings"])
def extract_parkings(context, di: mlrun.DataItem):
    KEYS = ['parcheggio', 'lat', 'lon', 'posti_totali']
    df_parcheggi = di.as_df().groupby(['parcheggio']).first().reset_index()[KEYS]
    return df_parcheggi

Overwriting src/extract_parkings.py


In [9]:
project.set_function("src/extract_parkings.py", name="extract-parkings", kind="job", image="mlrun/mlrun", handler="extract_parkings")

<mlrun.runtimes.kubejob.KubejobRuntime at 0x7fc734e22910>

In [10]:
%%writefile "src/aggregate_parkings.py"
import mlrun
import pandas as pd

@mlrun.handler(outputs=["parking_data_aggregated"])
def aggregate_parkings(context, di: mlrun.DataItem):
    rdf = di.as_df()
    rdf['data'] = pd.to_datetime(rdf['data'])
    rdf['day'] = rdf['data'].apply(lambda t: t.replace(second=0, minute=0))
    rdf['hour'] = rdf['day'].dt.hour
    rdf['dow'] = rdf['day'].dt.dayofweek
    rdf = rdf.drop(columns=['data'])
    grouped = rdf.groupby(['parcheggio','day']).mean()
    df_aggregated = grouped.reset_index()
    return df_aggregated

Overwriting src/aggregate_parkings.py


In [11]:
project.set_function("src/aggregate_parkings.py", name="aggregate-parkings", kind="job", image="mlrun/mlrun", handler="aggregate_parkings")

<mlrun.runtimes.kubejob.KubejobRuntime at 0x7fc734e22ca0>

In [12]:
%%writefile "src/parkings_to_db.py"

import mlrun
import pandas as pd
from sqlalchemy import create_engine
import datetime
import os

@mlrun.handler()
def to_db(context, agg_di: mlrun.DataItem, parkings_di: mlrun.DataItem):
    USERNAME = context.get_secret('DB_USERNAME')
    PASSWORD = context.get_secret('DB_PASSWORD')
    engine = create_engine('postgresql://'+USERNAME+':'+PASSWORD+'@database-postgres-cluster/digitalhub')
    agg_df = agg_di.as_df()
    # Keep only last two calendar years
    date = datetime.date.today() - datetime.timedelta(days=365*2)
    agg_df = agg_df[agg_df['day'].dt.date >= date]
    with engine.connect() as connection: 
        try: connection.execute("DELETE FROM parkings")
        except: pass
        try: connection.execute("DELETE FROM parking_data_aggregated")
        except: pass

    agg_df.to_sql("parking_data_aggregated", engine, if_exists="append")
    parkings_di.as_df().to_sql('parkings', engine, if_exists="append")
    return

Overwriting src/parkings_to_db.py


In [13]:
project.set_function("src/parkings_to_db.py", name="to-db", kind="job", image="mlrun/mlrun", handler="to_db", requirements=["sqlalchemy", "psycopg2-binary"])
project.set_secrets({"DB_USERNAME": "digitalhub_owner_user", "DB_PASSWORD": "XXXXXX"})
project.save()
project.build_function("to-db", base_image="mlrun/mlrun")

> 2024-02-14 13:36:45,080 [info] Started building image: .mlrun/func-parcheggi-to-db:latest
[36mINFO[0m[0000] Retrieving image manifest mlrun/mlrun:1.4.0  
[36mINFO[0m[0000] Retrieving image mlrun/mlrun:1.4.0 from registry index.docker.io 
[36mINFO[0m[0001] Built cross stage deps: map[]                
[36mINFO[0m[0001] Retrieving image manifest mlrun/mlrun:1.4.0  
[36mINFO[0m[0001] Returning cached image manifest              
[36mINFO[0m[0001] Executing 0 build triggers                   
[36mINFO[0m[0001] Unpacking rootfs as cmd RUN echo 'Installing /empty/requirements.txt...'; cat /empty/requirements.txt requires it. 
[36mINFO[0m[0020] RUN echo 'Installing /empty/requirements.txt...'; cat /empty/requirements.txt 
[36mINFO[0m[0020] Taking snapshot of full filesystem...        
[36mINFO[0m[0030] cmd: /bin/sh                                 
[36mINFO[0m[0030] args: [-c echo 'Installing /empty/requirements.txt...'; cat /empty/requirements.txt] 
[36mINFO[0m[0030]

BuildStatus(ready=True, outputs={'image': '.mlrun/func-parcheggi-to-db:latest'})

### 2.3 Data Management Pipeline
We create a data management pipeline that executes the data management functions in the platform.

In [14]:
%%writefile "src/parking_data_pipeline.py"

from kfp import dsl
import mlrun

URL = "https://opendata.comune.bologna.it/api/explore/v2.1/catalog/datasets/disponibilita-parcheggi-storico/exports/csv?lang=it&timezone=UTC&use_labels=true&delimiter=%3B"

@dsl.pipeline(name="Parking data pipeline")
def parking_pipeline():
    project = mlrun.get_current_project()

    run_download = project.run_function("download-all",inputs={'url':URL}, outputs=["dataset"])

    run_parkings = project.run_function("extract-parkings", inputs={'di':run_download.outputs["dataset"]}, outputs=["parkings"])

    run_aggregate = project.run_function("aggregate-parkings", inputs={'di':run_download.outputs["dataset"]}, outputs=["parking_data_aggregated"])
    
    project.run_function("to-db", inputs={'agg_di': run_aggregate.outputs["parking_data_aggregated"], 'parkings_di': run_parkings.outputs["parkings"]})


Overwriting src/parking_data_pipeline.py


In [15]:
project.set_workflow("parking-data-pipeline","./src/parking_data_pipeline.py", handler="parking_pipeline")
project.save()

<mlrun.projects.project.MlrunProject at 0x7fc6ea1a7070>

Now we schedule the data management pipeline to be executed every day for the data taken from API. The execution will take place on the platform using Kubernetes for the execution of single tasks and Kubeflow Pipeline for the orchestration of the data management pipeline.

``Note: to be executed remotely, the changes need to be pushed to the git repository.``

In [16]:
project.run("parking-data-pipeline", schedule="0 0 * * *")

> 2024-02-14 13:37:28,083 [info] executing workflow scheduling 'workflow-runner-pipeline' remotely with kfp engine
> 2024-02-14 13:37:28,086 [info] Storing function: {'name': 'pipeline', 'uid': '5fad70c5943e48a69ca55f436ea24d18', 'db': None}
> 2024-02-14 13:37:28,170 [info] task schedule modified: {'schedule': '0 0 * * *', 'project': 'parcheggi', 'name': 'pipeline'}
