In [1]:
import digitalhub as dh
import os
PROJECT_NAME = "mobility-data"
project = dh.get_or_create_project(PROJECT_NAME)

## Open Data functions

In [2]:
new_folder ='src'
if not os.path.exists(new_folder):
    os.makedirs(new_folder)

In [3]:
%%writefile "src/download-open-data.py"

import pandas as pd
import requests
import json as js
import geopandas as gpd
import os
import boto3
import datetime

base_url = "https://opendata.comune.bologna.it/api/explore/v2.1/catalog/datasets/{name}/exports/geojson"

def download_share_geo(project, bucket, name, artifact_name):
    json = requests.get(base_url.format(name = name)).json()  
    if not os.path.exists('./data'):
        os.makedirs('./data')

    path_geojson = './data/'+name+'.geojson'
    with open(path_geojson, 'w') as out_file:
        out_file.write(js.dumps(json, indent=4))
    gdf = gpd.read_file(path_geojson)
    
    path_parquet = './data/'+name+'.parquet'
    gdf.to_parquet(path_parquet)
    share_files(project, bucket, path_parquet, artifact_name)  

def share_files(project, bucket: str = "dataspace", path: str = "city", artifactName: str = 'artifact'):
    """
    Uploads specified data items to a shared S3 bucket and folder.
    Requires the environment variables for S3 endpoint and credentials (S3_ENDPOINT_URL, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY).
    args:
        bucket: The name of the bucket
        path: The path within the bucket
    """

    s3 = boto3.client('s3',
                    endpoint_url=os.environ.get('S3_ENDPOINT_URL'),
                    aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'),
                    aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY'))
    
    path_date = datetime.datetime.now().strftime("%Y-%m-%d")
    path_latest = 'latest'
    
    fname = path.split("/")[-1]
    name = fname.split(".")[0]

    print(bucket)
    s3.upload_file("./data/" +fname, bucket, '/' + artifactName + '/' + path_latest + '/' + fname, ExtraArgs={'ContentType': 'application/octet-stream'})
    s3.upload_file("./data/" +fname, bucket, '/' + artifactName + '/' + path_date + '/' + fname, ExtraArgs={'ContentType': 'application/octet-stream'})    

def download_road_areas(project, bucket):
    data = download_share_geo(project, bucket, "aree-stradali", "ctm_road_areas")

def download_curves(project, bucket):
    data = download_share_geo(project, bucket, "carta-tecnica-comunale-curve-livello-10-metri", "ctm_level_curves_10m")

def download_sidewalks(project, bucket):
    data = download_share_geo(project, bucket, "carta-tecnica-comunale-marciapiedi", "ctm_level_sidewalks")

def download_road_edges(project, bucket):
    data = download_share_geo(project, bucket, "rifter_arcstra_li", "rifter_edges")

def download_road_nodes(project, bucket):
    data = download_share_geo(project, bucket, "rifter_nodi_pt", "rifter_nodes")

def download_city_30(project, bucket):
    data = download_share_geo(project, bucket, "velocita-citta-30", "city_30")

def download_charging_stations(project, bucket):
    data = download_share_geo(project, bucket, "colonnine-elettriche", "charging_stations")

def download_bike_path(project, bucket):
    data = download_share_geo(project, bucket, "piste-ciclopedonali", "bike_path")

def download_incidents(project, bucket):
    data = download_share_geo(project, bucket, "incidenti_new", "car_incidents")

def download_bike_parking_places(project, bucket):
    data = download_share_geo(project, bucket, "rastrelliere-per-biciclette", "bike_parking_places")

def download_car_parkings(project, bucket):
    data = download_share_geo(project, bucket, "parcheggi", "car_parkings")

def download_bus_stops_tper(project, bucket):
    data = download_share_geo(project, bucket, "tper-fermate-autobus", "tper_bus_stops")

def download_train_stops_tper(project, bucket):
    data = download_share_geo(project, bucket, "stazioniferroviarie_20210401", "tper_train_stops")

Overwriting src/download-open-data.py


In [4]:
func_download_road_areas = project.new_function(name="download-road-areas",
                                               kind="python",
                                               python_version="PYTHON3_10",
                                               source={"source": "src/download-open-data.py", "handler": "download_road_areas"},
                                               requirements= ["geopandas"])

In [5]:
run_download_road_areas = func_download_road_areas.run(action="job", parameters={'bucket': 'datalake'}, local_execution=False)

In [6]:
func_download_road_edges = project.new_function(name="download-road-edges",
                                               kind="python",
                                               python_version="PYTHON3_10",
                                               source={"source": "src/download-open-data.py", "handler": "download_road_edges"},
                                               requirements= ["geopandas"])

In [7]:
run_download_road_edges = func_download_road_edges.run(action="job", parameters={'bucket': 'datalake'}, local_execution=False)

In [8]:
func_download_road_nodes = project.new_function(name="download-road-nodes",
                                               kind="python",
                                               python_version="PYTHON3_10",
                                               source={"source": "src/download-open-data.py", "handler": "download_road_nodes"},
                                               requirements= ["geopandas"])

In [9]:
run_download_road_nodes = func_download_road_nodes.run(action="job", parameters={'bucket': 'datalake'}, local_execution=False)

In [10]:
func_download_curves = project.new_function(name="download-curves",
                                               kind="python",
                                               python_version="PYTHON3_10",
                                               source={"source": "src/download-open-data.py", "handler": "download_curves"},
                                               requirements= ["geopandas"])

In [11]:
run_download_curves = func_download_curves.run(action="job", parameters={'bucket': 'datalake'}, local_execution=False)

In [12]:
func_download_sidewalks = project.new_function(name="download-sidewalks",
                                               kind="python",
                                               python_version="PYTHON3_10",
                                               source={"source": "src/download-open-data.py", "handler": "download_sidewalks"},
                                               requirements= ["geopandas"])

In [13]:
run_download_sidewalks = func_download_sidewalks.run(action="job", parameters={'bucket': 'datalake'}, local_execution=False)

In [18]:
func_download_city30 = project.new_function(name="download-city30",
                                               kind="python",
                                               python_version="PYTHON3_10",
                                               source={"source": "src/download-open-data.py", "handler": "download_city_30"},
                                               requirements= ["geopandas"])

In [19]:
run_download_city30 = func_download_city30.run(action="job", parameters={'bucket': 'datalake'}, local_execution=False)

In [20]:
func_download_charging_stations = project.new_function(name="download-charging-stations",
                                               kind="python",
                                               python_version="PYTHON3_10",
                                               source={"source": "src/download-open-data.py", "handler": "download_charging_stations"},
                                               requirements= ["geopandas"])

In [21]:
run_download_charging_stations = func_download_charging_stations.run(action="job", parameters={'bucket': 'datalake'}, local_execution=False)

In [22]:
func_download_bike_path = project.new_function(name="download-bike-path",
                                               kind="python",
                                               python_version="PYTHON3_10",
                                               source={"source": "src/download-open-data.py", "handler": "download_bike_path"},
                                               requirements= ["geopandas"])

In [23]:
run_download_bike_path = func_download_bike_path.run(action="job", parameters={'bucket': 'datalake'}, local_execution=False)

In [25]:
func_download_incidents = project.new_function(name="download-incidents",
                                               kind="python",
                                               python_version="PYTHON3_10",
                                               source={"source": "src/download-open-data.py", "handler": "download_incidents"},
                                               requirements= ["geopandas"])

In [26]:
run_download_incidents = func_download_incidents.run(action="job", parameters={'bucket': 'datalake'}, local_execution=False)

In [28]:
func_download_bike_parking_places = project.new_function(name="download-bike-parking-places",
                                               kind="python",
                                               python_version="PYTHON3_10",
                                               source={"source": "src/download-open-data.py", "handler": "download_bike_parking_places"},
                                               requirements= ["geopandas"])

In [29]:
run_download_bike_parking_places = func_download_bike_parking_places.run(action="job", parameters={'bucket': 'datalake'}, local_execution=False)

In [31]:
func_download_car_parkings = project.new_function(name="download-bike-parking-places",
                                               kind="python",
                                               python_version="PYTHON3_10",
                                               source={"source": "src/download-open-data.py", "handler": "download_car_parkings"},
                                               requirements= ["geopandas"])

In [32]:
run_download_car_parkings = func_download_car_parkings.run(action="job", parameters={'bucket': 'datalake'}, local_execution=False)

In [35]:
func_download_bus_stops_tper = project.new_function(name="download-bus-stops-tper",
                                               kind="python",
                                               python_version="PYTHON3_10",
                                               source={"source": "src/download-open-data.py", "handler": "download_bus_stops_tper"},
                                               requirements= ["geopandas"])

In [36]:
run_download_bus_stops_tper = func_download_bus_stops_tper.run(action="job", parameters={'bucket': 'datalake'}, local_execution=False)

In [37]:
func_download_train_stops_tper = project.new_function(name="download-train-stops-tper",
                                               kind="python",
                                               python_version="PYTHON3_10",
                                               source={"source": "src/download-open-data.py", "handler": "download_train_stops_tper"},
                                               requirements= ["geopandas"])

In [38]:
run_download_train_stops_tper = func_download_train_stops_tper.run(action="job", parameters={'bucket': 'datalake'}, local_execution=False)

## Google Drive Functions

Upload the token file as aritifact in the project.

### Define and Build Functions: Download Traffic Spire

In [41]:
token_uri = project.get_artifact('token')

In [42]:
%%writefile "src/download-spire.py"

import os

from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaIoBaseDownload

import json
import boto3

import pandas as pd
import datetime

# If modifying these scopes, delete the file token.json.
SCOPES = ["https://www.googleapis.com/auth/drive.readonly"]

def getGService(project, token_uri):
    creds = None
    token = token_uri.as_file()
    try:
        token_info = json.load(open(token))
        creds = Credentials.from_authorized_user_info(token_info, SCOPES)
        service = build("drive", "v3", credentials=creds)
    except HttpError as error:
        print(f"An error occurred: {error}")

    return service

def upload_file(s3, bucket: str, path: str, local_path: str, item_name: str):
    """
    Uploads specified data items to a shared S3 bucket and folder.
    Requires the environment variables for S3 endpoint and credentials (S3_ENDPOINT_URL, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY).
    args:
        bucket: The name of the bucket
        path: The path within the bucket
    """

    name = path + '/' + item_name
    s3.upload_file(local_path, bucket, name, ExtraArgs={'ContentType': 'application/octet-stream'})


def extract_date_flussi(item_name):
    try:
        return datetime.datetime.strptime(item_name, 'FLUSSI%Y%m%d.txt')
    except Exception as e:
        try:
            return datetime.datetime.strptime(item_name, 'FLUSS%Y%m%d.txt')
        except Exception as e2:
            return None

def extract_date_accuracy(item_name):
    try:
        return datetime.datetime.strptime(item_name, 'accur%Y%m%d.txt')
    except Exception as e:
        try:
            return datetime.datetime.strptime(item_name, 'accur%Y%m%d.txt')
        except Exception as e2:
            return None
       
def process_file(service, item_id, item_name, date_extractor):
    r = service.files().get_media(fileId=item_id)
    local_path = 'myfile'
    with open(local_path, "wb") as fh:
        downloader = MediaIoBaseDownload(fh, r)
        done = False
        while not done:
            status, done = downloader.next_chunk()

    data = date_extractor(item_name)
    if data == None:
        print(f"Error: unknown file : {item_name}")
        return
    
    f = open(f"myfile", "r")
    sensor = None
    entries = []
    count = 0
    for x in f:
        if x.startswith('Section'):
            sensor = x.split('Section ')[1].strip()
            count = 0
        else:
            arr = x.split('\t')
            lc = 0
            for i in range(len(arr)):
                if arr[i].strip() != '':
                    d = data.strftime("%Y-%m-%d")
                    t = datetime.time(hour = (count + i) // 12, minute = i % 12 * 5).strftime("%H:%M")
                    entries.append({
                        'sensor_id': sensor,
                        'date': d,
                        'time': t,
                        'start': d + ' ' + t,
                        'value': int(int(arr[i]) / 12)
                    })
                    lc += 1
            count += lc

    df = pd.DataFrame(entries)
    fname = data.strftime("%Y-%m")
    if not os.path.exists(f"data/{fname}.parquet"):
        df.to_parquet(f"data/{fname}.parquet")
    else:
        rdf = pd.read_parquet(f"data/{fname}.parquet")
        rdf = pd.concat([rdf, df])
        rdf.to_parquet(f"data/{fname}.parquet")
    
def process_folder(service, folder, date_extractor):
    """Downloads recursively all content from a specific year folder on Google Drive."""
    files = service.files()
    request = files.list(q=f"'{folder['id']}' in parents", 
                         supportsAllDrives=True, includeItemsFromAllDrives=True, 
                         fields="nextPageToken, files(id, name, mimeType)")
    print(f"folder {str(folder['name'])}")
    while request is not None:
        results = request.execute()
        
        items = results.get("files", [])
        for item in items:
            item_name = item["name"]
            item_id = item["id"]
            item_type = item["mimeType"]

            # If it's a folder, recursively download its content as it is month folder
            if item_type == "application/vnd.google-apps.folder":
                print(f"Downloading folder {item_name}...")
                process_folder(service, item, date_extractor)
            else:
                print(f"Downloading file {item_name}...")
                process_file(service, item_id, item_name, date_extractor)
        request = files.list_next(request, results)

def process_all(project, token_uri, query: str, s3, bucket: str, destination_path: str, date_extractor):
    service = getGService(project, token_uri)
    results = (service.files()
               .list(q=query, pageSize=1, fields="files(id, name, mimeType)", supportsAllDrives=True, includeItemsFromAllDrives=True)
               .execute())
    
    root_items = results.get("files", [])
    for item in root_items:
        files = service.files()
        request = files.list(q=f"'{item['id']}' in parents", 
                     supportsAllDrives=True, includeItemsFromAllDrives=True, 
                     fields="nextPageToken, files(id, name, mimeType)")
        while request is not None:
            results = request.execute()
            years = results.get("files", [])
            for year in years:
                # if year["name"] == "2024":
                #     continue
                    
                process_folder(service, year, date_extractor)
                rdf = pd.DataFrame()
                for i in range(1, 13):
                    mf = datetime.date(int(year["name"]), i, 1).strftime("%Y-%m")
                    if os.path.exists(f"data/{mf}.parquet"):
                        rdf = pd.concat([rdf, pd.read_parquet(f"data/{mf}.parquet")])
                rdf.to_parquet(f"data/{year['name']}.parquet")
                upload_file(s3, bucket, destination_path + "/" + year["name"], f"data/{year['name']}.parquet", "trafic-spire.parquet")
                if year["name"] == datetime.datetime.now().strftime("%Y"):
                    upload_file(s3, bucket, destination_path + "/latest", f"data/{year['name']}.parquet", "trafic-spire.parquet")
                    
            request = files.list_next(request, results)

def get_spire(project, token_uri, bucket):
    
    base_folder = './data'
        
    if not os.path.exists(base_folder):
        os.makedirs(base_folder)

    s3 = boto3.client('s3',
                endpoint_url=os.environ.get('S3_ENDPOINT_URL'),
                aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'),
                aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY'))


    process_all(project, token_uri, "mimeType='application/vnd.google-apps.folder' and name='Flussi spire'", s3, bucket, "mobility-data/trafic-spire", extract_date_flussi)


def get_spire_accur(project, token_uri, bucket):
    base_folder = './data'
        
    if not os.path.exists(base_folder):
        os.makedirs(base_folder)

    s3 = boto3.client('s3',
                endpoint_url=os.environ.get('S3_ENDPOINT_URL'),
                aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'),
                aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY'))


    process_all(project, token_uri, "mimeType='application/vnd.google-apps.folder' and name='Diagnostica'", s3, bucket, "mobility-data/trafic-spire-accur", extract_date_accuracy)

Overwriting src/download-spire.py


In [46]:
func_download_traffic_spire = project.new_function(
    name="download-traffic-spire",
    kind="python",
    python_version="PYTHON3_10",
    source={"source": "src/download-spire.py", "handler": "get_spire"},
    requirements=['google-api-python-client', 'google_auth_oauthlib']
)

In [47]:
#!pip install google-api-python-client

In [48]:
run_download_traffic_spire = func_download_traffic_spire.run(action="job", inputs={'token_uri': token_uri.key}, parameters={'bucket': 'datalake'}, local_execution=False)

In [49]:
func_download_traffic_spire_accuracy = project.new_function(
    name="download-traffic-spire-accuracy",
    kind="python",
    python_version="PYTHON3_10",
    source={"source": "src/download-spire.py", "handler": "get_spire_accur"},
    requirements=['google-api-python-client', 'google_auth_oauthlib']
)

In [50]:
run_download_traffic_spire_accuracy = func_download_traffic_spire_accuracy.run(action="job", inputs={'token_uri': token_uri.key}, parameters={'bucket': 'datalake'}, local_execution=False)