In [2]:
import digitalhub as dh
import os
PROJECT_NAME = "city-data"
project = dh.get_or_create_project(PROJECT_NAME)

## Open Data functions

In [3]:
new_folder ='src'
if not os.path.exists(new_folder):
    os.makedirs(new_folder)

In [4]:
%%writefile "src/download-open-data.py"

import pandas as pd
import requests
import json as js
import geopandas as gpd
import os
import boto3
import datetime

base_url = "https://opendata.comune.bologna.it/api/explore/v2.1/catalog/datasets/{name}/exports/geojson"

def download_share_geo(project, bucket, name, artifact_name):
    json = requests.get(base_url.format(name = name)).json()  
    if not os.path.exists('./data'):
        os.makedirs('./data')

    path_geojson = './data/'+name+'.geojson' 
    with open(path_geojson, 'w') as out_file:
            out_file.write(js.dumps(json, indent=4))
    gdf = gpd.read_file(path_geojson)
    # share_files(project, bucket, path_geojson, artifact_name)
    
    path_parquet = './data/'+name+'.parquet'
    gdf.to_parquet(path_parquet)
    share_files(project, bucket, path_parquet, artifact_name)  

def download_buildings(project, bucket):
    data = download_share_geo(project, bucket, "rifter_edif_pl", "rifter_buildings_censiment")

def download_addresses(project, bucket):
    data = download_share_geo(project, bucket, "rifter_civici_pt", "rifter_addresses")

def download_walls(project, bucket):
    data = download_share_geo(project, bucket, "carta-tecnica-comunale-recinzioni", "ctm_walls")

def download_buidlings_volumes(project, bucket):
    data = download_share_geo(project, bucket, "c_a944ctc_edifici_pl", "ctm_buildings_volumes")

def download_buildings_symbols(project, bucket):
    data = download_share_geo(project, bucket, "carta-tecnica-comunale-simboli-edifici-volumetrici", "ctm_buildings_symbols")

def download_terrain(project, bucket):
    data = download_share_geo(project, bucket, "carta-tecnica-comunale-divisioni-del-terreno", "ctm_terrain")
    
def download_areas(project, bucket):
    data = download_share_geo(project, bucket, "zone_urbanistiche", "proximity_areas")

def share_files(project, bucket: str = "dataspace", path: str = "city", artifactName: str = 'artifact'):
    """
    Uploads specified data items to a shared S3 bucket and folder.
    Requires the environment variables for S3 endpoint and credentials (S3_ENDPOINT_URL, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY).
    args:
        bucket: The name of the bucket
        path: The path within the bucket
    """

    s3 = boto3.client('s3',
                    endpoint_url=os.environ.get('S3_ENDPOINT_URL'),
                    aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'),
                    aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY'))
    
    path_date = datetime.datetime.now().strftime("%Y-%m-%d")
    path_latest = 'latest'
    
    fname = path.split("/")[-1]
    name = fname.split(".")[0]

    print(bucket)
    s3.upload_file("./data/" +fname, bucket, '/' + artifactName + '/' + path_latest + '/' + fname, ExtraArgs={'ContentType': 'application/octet-stream'})
    s3.upload_file("./data/" +fname, bucket, '/' + artifactName + '/' + path_date + '/' + fname, ExtraArgs={'ContentType': 'application/octet-stream'})


Writing src/download-open-data.py


In [7]:
func_download_buildings = project.new_function(name="download-buildings",
                                               kind="python",
                                               python_version="PYTHON3_10",
                                               source={"source": "src/download-open-data.py", "handler": "download_buildings"},
                                               requirements= ["geopandas"])

In [8]:
run_download_buildings = func_download_buildings.run(action="job", parameters={'bucket': 'datalake'}, local_execution=False)

In [9]:
func_download_addresses = project.new_function(name="download-addresses",
                                               kind="python",
                                               python_version="PYTHON3_10",
                                               source={"source": "src/download-open-data.py", "handler": "download_addresses"},
                                               requirements= ["geopandas"])

In [10]:
run_download_addresses = func_download_addresses.run(action="job", parameters={'bucket': 'datalake'}, local_execution=False)

In [11]:
func_download_walls = project.new_function(name="download-walls",
                                               kind="python",
                                               python_version="PYTHON3_10",
                                               source={"source": "src/download-open-data.py", "handler": "download_walls"},
                                               requirements= ["geopandas"])

In [12]:
run_download_walls = func_download_walls.run(action="job", parameters={'bucket': 'datalake'}, local_execution=False)

In [13]:
func_download_buidlings_volumes = project.new_function(name="download-buidlings-volumes",
                                               kind="python",
                                               python_version="PYTHON3_10",
                                               source={"source": "src/download-open-data.py", "handler": "download_buidlings_volumes"},
                                               requirements= ["geopandas"])

In [14]:
run_download_buidlings_volumes = func_download_buidlings_volumes.run(action="job", parameters={'bucket': 'datalake'}, local_execution=False)

In [15]:
func_download_buildings_symbols = project.new_function(name="download-buildings-symbols",
                                               kind="python",
                                               python_version="PYTHON3_10",
                                               source={"source": "src/download-open-data.py", "handler": "download_buildings_symbols"},
                                               requirements= ["geopandas"])

In [16]:
run_download_buildings_symbols = func_download_buildings_symbols.run(action="job", parameters={'bucket': 'datalake'}, local_execution=False)

In [17]:
func_download_terrain = project.new_function(name="download-terrain",
                                               kind="python",
                                               python_version="PYTHON3_10",
                                               source={"source": "src/download-open-data.py", "handler": "download_terrain"},
                                               requirements= ["geopandas"])

In [18]:
run_download_terrain = func_download_terrain.run(action="job", parameters={'bucket': 'datalake'}, local_execution=False)

In [19]:
func_download_areas = project.new_function(name="download-proximity-areas",
                                               kind="python",
                                               python_version="PYTHON3_10",
                                               source={"source": "src/download-open-data.py", "handler": "download_areas"},
                                               requirements= ["geopandas"])

In [20]:
run_download_areas = func_download_areas.run(action="job", parameters={'bucket': 'datalake'}, local_execution=False)

## Google Drive Functions

Upload the token file as aritifact in the project.

In [21]:
token_uri = project.get_artifact('token')

In [26]:
%%writefile "src/download-data.py"

import os

from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaIoBaseDownload
import json
import boto3

import pandas as pd

# If modifying these scopes, delete the file token.json.
SCOPES = ["https://www.googleapis.com/auth/drive.readonly"]


def getGService(project, token_uri):
    creds = None
    token = token_uri.as_file()
    try:
        token_info = json.load(open(token))
        creds = Credentials.from_authorized_user_info(token_info, SCOPES)
        service = build("drive", "v3", credentials=creds)
    except HttpError as error:
        print(f"An error occurred: {error}")

    return service

def upload_file(s3, bucket: str, path: str, local_path: str, item_name: str):
    """
    Uploads specified data items to a shared S3 bucket and folder.
    Requires the environment variables for S3 endpoint and credentials (S3_ENDPOINT_URL, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY).
    args:
        bucket: The name of the bucket
        path: The path within the bucket
    """

    name = path + '/' + item_name
    s3.upload_file(local_path, bucket, name, ExtraArgs={'ContentType': 'application/octet-stream'})

def copy_rec(service, folder_id, s3, bucket: str, destination_path: str):
    """Downloads recursively all content from a specific folder on Google Drive."""
    files = service.files()
    request = files.list(q=f"'{folder_id}' in parents", 
                         supportsAllDrives=True, includeItemsFromAllDrives=True, 
                         fields="nextPageToken, files(id, name, mimeType)")
    while request is not None:
        results = request.execute()
        
        items = results.get("files", [])
        for item in items:
            item_name = item["name"]
            item_id = item["id"]
            item_type = item["mimeType"]

            # If it's a folder, recursively download its content
            if item_type == "application/vnd.google-apps.folder":
                print(f"Downloading folder {item_name}...")
                if item_name != 'back':
                    copy_rec(service, item_id, s3, bucket, destination_path + '/' + item_name)
            else:
                try:
                    head = s3.head_object(Bucket=bucket, Key=destination_path + '/' + item_name)
                except:
                    print(f"Downloading file {item_name}...")
                    # Download the file into the specified local folder
                    r = service.files().get_media(fileId=item_id)
                    local_path = 'myfile'
                    with open(local_path, "wb") as fh:
                        downloader = MediaIoBaseDownload(fh, r)
                        done = False
                        while not done:
                            status, done = downloader.next_chunk()
                    upload_file(s3, bucket, destination_path, local_path, item_name)
                    # return
        request = files.list_next(request, results)

def copy_files(project, query: str, s3, bucket: str, destination_path: str, token_uri):
    service = getGService(project, token_uri)
    results = (service.files()
               .list(q=query, pageSize=1, fields="files(id, name, mimeType)", supportsAllDrives=True, includeItemsFromAllDrives=True)
               .execute())
    print(results.get("files"))
    root_items = results.get("files", [])
    for item in root_items:
        copy_rec(service, item["id"], s3, bucket, destination_path)

def doc_files(s3, bucket: str, prefix: str, name: str):
    array = []
    res = s3.list_objects(Bucket=bucket, Prefix=prefix, MaxKeys=1000)
    array = array + res['Contents']
    while 'NextMarker' in res:
        res = s3.list_objects(Bucket=bucket, Prefix=prefix, MaxKeys=1000, Marker=res['NextMarker'])
        array = array + res.Contents
    df = pd.DataFrame.from_records(array)
    df.drop(columns=['ETag', 'StorageClass', 'Owner'], inplace=True)
    df.to_parquet(name +'.parquet')
    s3.upload_file(name +'.parquet', bucket, prefix + '/' + name +'.parquet', ExtraArgs={'ContentType': 'application/octet-stream'})
        
def get_lidar(project, token_uri, folder, bucket, target: str = 'data'):
    s3 = boto3.client('s3',
                endpoint_url=os.environ.get('S3_ENDPOINT_URL'),
                aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'),
                aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY'))
    copy_files(project, f"mimeType='application/vnd.google-apps.folder' and name='{folder}'", s3, bucket, f"city-data/lidar/{target}", token_uri)
    doc_files(s3, bucket, f"city-data/lidar/{target}", "lidar")
        
def get_dtm(project, token_uri, folder, bucket, target: str = 'data'):
    s3 = boto3.client('s3',
                endpoint_url=os.environ.get('S3_ENDPOINT_URL'),
                aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'),
                aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY'))
    copy_files(project, f"mimeType='application/vnd.google-apps.folder' and name='{folder}'", s3, bucket, f"city-data/dtm/{target}", token_uri)
    doc_files(s3, bucket, f"city-data/dtm/{target}", "dtm")
    
def get_dsm(project, token_uri, folder, bucket, target: str = 'data'):
    s3 = boto3.client('s3',
                endpoint_url=os.environ.get('S3_ENDPOINT_URL'),
                aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'),
                aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY'))

    copy_files(project, f"mimeType='application/vnd.google-apps.folder' and name='{folder}'", s3, bucket, f"city-data/dsm/{target}", token_uri)
    doc_files(s3, bucket, f"city-data/dsm/{target}", "dsm")

Writing src/download-data.py


In [27]:
func_download_lidar = project.new_function(
    name="download-lidar",
    kind="python",
    python_version="PYTHON3_10",
    source={"source": "src/download-data.py", "handler": "get_lidar"},
    requirements=['google-api-python-client', 'google_auth_oauthlib']
)

In [28]:
#!pip install google-api-python-client

In [29]:
run_lidar = func_download_lidar.run(action="job", inputs={'token_uri': token_uri.key}, parameters={'folder': 'LAS_CLASSIFICATO_TEST', 'bucket': 'datalake'}, local_execution=False)

In [30]:
func_download_dtm = project.new_function(
    name="download-dtm",
    kind="python",
    python_version="PYTHON3_10",
    source={"source": "src/download-data.py", "handler": "get_dtm"},
    requirements=['google-api-python-client', 'google_auth_oauthlib']
)

In [31]:
run_dtm = func_download_dtm.run(action="job", inputs={'token_uri': token_uri.key}, parameters={'folder': 'DTM_0.5_TEST', 'bucket': 'datalake'}, local_execution=False)

In [32]:
func_download_dsm = project.new_function(
    name="download-dsm",
    kind="python",
    python_version="PYTHON3_10",
    source={"source": "src/download-data.py", "handler": "get_dsm"},
    requirements=['google-api-python-client', 'google_auth_oauthlib']
)

In [33]:
run_dsm = func_download_dsm.run(action="job", inputs={'token_uri': token_uri.key}, parameters={'folder': 'DSM_0.5_TEST', 'bucket': 'datalake'}, local_execution=False)