In [1]:
import requests
import json
from datetime import datetime
#import findspark
#findspark.init('/opt/spark-3.5.1-bin-hadoop3')
#from pyspark.sql import SparkSession
#from pyspark.sql.types import *
#from pyspark.sql.functions import *
from minio import Minio
from io import BytesIO
import pandas as pd
import os
from geopy.geocoders import Nominatim, GoogleV3
from geopy.extra.rate_limiter import RateLimiter
from sqlalchemy import create_engine, MetaData, Table, insert, select, text
from sqlalchemy.exc import OperationalError, SQLAlchemyError
from sqlalchemy.dialects.postgresql import insert as pg_insert


In [2]:
### Configs API ###

# Base URL
api_url = "https://broker.fiware.urbanplatform.portodigital.pt/v2/entities"

# API params
api_params = {
    'q': 'allowedVehicleType==twoWheeledVehicle',
    'type': 'OnStreetParking',
    'limit': '1000'
}

# Postcode matching URL
match_postcode_url = 'https://raw.githubusercontent.com/dssg-pt/mp-mapeamento-cp7/main/output_data/cod_post_freg_matched.csv'

####### Configs MinIO ########

# Define the MinIO endpoint URL and access keys
minio_endpoint = "localhost:9000"
access_key = "admin"
secret_key = "password123"

# Get current date
date = datetime.today().strftime('%Y%m%d%H%M%S')

# Define the bucket name and object name
bucket_name = "raw"
object_name = f"twowheel/twowheel{date}.json"

output_bucket_name = "preprocessed"


# 1. Save raw Json to bucket

In [4]:
# Function to fetch data from API
def get_data_from_api(api_url, api_params):
    try:
        # Call API with the base URL and params
        response = requests.get(api_url, params=api_params)
        # Print the final URL
        print(f"Final URL: {response.url}")
        # Printe the API respose
        print(f"API Response Code: {response.status_code}")
        response.raise_for_status()
        return response.json()

    # Handle errors
    except requests.exceptions.HTTPError as http_err:
        print(f"HTTP error occurred: {http_err}")
    except requests.exceptions.ConnectionError as conn_err:
        print(f"Connection error occurred: {conn_err}")
    except requests.exceptions.Timeout as timeout_err:
        print(f"Timeout error occurred: {timeout_err}")
    except requests.exceptions.RequestException as req_err:
        print(f"An error occurred: {req_err}")
    return None

In [5]:
def save_json_to_minio(minio_endpoint, access_key, secret_key, bucket_name, object_name, json_data):
    try:
        # Create Minio client
        minio_client = Minio(minio_endpoint, access_key=access_key, secret_key=secret_key, secure=False)

        # Check if the bucket already exists
        if not minio_client.bucket_exists(bucket_name):
            # If not, create the bucket
            minio_client.make_bucket(bucket_name)

        # Convert the json_data to a string and then to bytes
        json_string = json.dumps(json_data)
        json_bytes = json_string.encode('utf-8')

        # Convert bytes to a byte stream
        byte_stream = BytesIO(json_bytes)

        # Save the json_data to the MinIO bucket
        minio_client.put_object(bucket_name, object_name, byte_stream, length=len(json_bytes), content_type='application/json')

        print(f"'{object_name}' is successfully uploaded to bucket '{bucket_name}'.")

    except Exception as e:
        print(f"An error occurred while saving JSON to MinIO: {e}")
    return None


In [6]:
# Fetch data from API
json_data = get_data_from_api(api_url, api_params)

Final URL: https://broker.fiware.urbanplatform.portodigital.pt/v2/entities?q=allowedVehicleType%3D%3DtwoWheeledVehicle&type=OnStreetParking&limit=1000
API Response Code: 200


In [7]:
# Save the JSON data to MinIO
save_json_to_minio(minio_endpoint, access_key, secret_key, bucket_name, object_name, json_data)

'twowheel/twowheel20240606225259.json' is successfully uploaded to bucket 'raw'.


# 2. Read JSON from bucket and preprocess (PANDAS)

In [32]:
def read_json_from_minio(minio_endpoint, access_key, secret_key, bucket_name, object_name):
    try:
        # Create Minio client
        minio_client = Minio(minio_endpoint, access_key=access_key, secret_key=secret_key, secure=False)

        # Read the JSON data from the MinIO bucket
        json_data = minio_client.get_object(bucket_name, object_name)

        # Read the content of the file-like object and decode JSON
        json_content = json_data.read().decode('utf-8')

        # Parse the JSON content
        json_parsed = json.loads(json_content)

        return json_parsed
    except Exception as e:
        print(f"An error occurred while reading JSON from MinIO: {e}")
        return None


In [33]:
# Create dataframe from JSON, use get for the keys so that it doesn't break if they don't exist
def create_dataframe_from_json(json_data):
    try:
        extracted_data = []

        for json_obj in json_data:
            extracted_info = {
                'id': json_obj.get('id'),
                'street_address': json_obj['address']['value'].get('streetAddress'),
                'allowed_vehicle_type': json_obj.get('allowedVehicleType', {}).get('value', [None])[0],
                'available_spot_number': json_obj.get('availableSpotNumber', {}).get('value'),
                'data_provider': json_obj.get('dataProvider', {}).get('value'),
                'description': json_obj.get('description', {}).get('value'),
                'latitude': json_obj['location']['value'].get('coordinates', [None, None])[1],
                'longitude': json_obj['location']['value'].get('coordinates', [None, None])[0],
                'occupied_spot_number': json_obj.get('occupiedSpotNumber', {}).get('value'),
                'total_spot_number': json_obj.get('totalSpotNumber', {}).get('value')
            }
            extracted_data.append(extracted_info)

        df = pd.DataFrame(extracted_data)
        return df
    except Exception as e:
        print(f"An error occurred while creating the dataframe: {e}")
        return None


## 2.1a Read single JSON

In [9]:
# Define the bucket name and object name
bucket_name = "raw"
object_name = "twowheel/twowheel20240604221915.json"

In [10]:
# Open JSON from Bucket
json_data = read_json_from_minio(minio_endpoint, access_key, secret_key, bucket_name, object_name)

An error occurred while reading JSON from MinIO: S3 operation failed; code: NoSuchKey, message: The specified key does not exist., resource: /raw/twowheel/twowheel_20240604221915.json, request_id: 17D68B2EC0FAC9F4, host_id: dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8, bucket_name: raw, object_name: twowheel/twowheel_20240604221915.json


In [10]:
# create dataframe:
df_raw = create_dataframe_from_json(json_data)

In [12]:
df_raw

222

## 2.1b Read and concatenate JSON files from MinIO

In [34]:
def read_bucket_json_df_concatenate(minio_endpoint, access_key, secret_key, bucket_name):
    try:
        # Create Minio client
        minio_client = Minio(minio_endpoint, access_key=access_key, secret_key=secret_key, secure=False)

        # List all objects in the bucket
        objects = minio_client.list_objects(bucket_name, recursive=True)

        # Get a glob of folder
        file_list = {obj.object_name[-19:-5]: obj.object_name for obj in objects}

        df_list = []

        for date, object_name in file_list.items():
            try:
                # Read the JSON data from MinIO
                json_data = read_json_from_minio(minio_endpoint, access_key, secret_key, bucket_name, object_name)

                # Create a dataframe from the JSON data
                df = create_dataframe_from_json(json_data)
                df["date"] = pd.to_datetime(date, format='%Y%m%d%H%M%S')

                df_list.append(df)
            except Exception as e:
                print(f"An error occurred while processing object '{object_name}': {e}")

        return pd.concat(df_list, ignore_index=True)

    except Exception as e:
        print(f"An error occurred while reading bucket '{bucket_name}': {e}")
        return None


In [35]:
# Concatenate the dataframes
final_df_raw = read_bucket_json_df_concatenate(minio_endpoint, access_key, secret_key, bucket_name)

In [36]:
final_df_raw

Unnamed: 0,id,street_address,allowed_vehicle_type,available_spot_number,data_provider,description,latitude,longitude,occupied_spot_number,total_spot_number,date
0,urn:ngsi-ld:OnStreetParking:porto:cmp:7cfee26e...,Pc Mouzinho De Albuquerque 56,twoWheeledVehicle,10,cmp,Praça de Mouzinho de Albuquerque,41.158887,-8.628684,0,10,2024-05-31 20:58:09
1,urn:ngsi-ld:OnStreetParking:porto:cmp:a7c593b1...,Passeio Das Virtudes 12,twoWheeledVehicle,5,cmp,Passeio das Virtudes,41.144293,-8.618336,5,10,2024-05-31 20:58:09
2,urn:ngsi-ld:OnStreetParking:porto:cmp:2a64f748...,Cpo Martires Da Patria 174,twoWheeledVehicle,-3,cmp,Campo dos Mártires da Pátria,41.145245,-8.617373,13,10,2024-05-31 20:58:09
3,urn:ngsi-ld:OnStreetParking:porto:cmp:37f0cf74...,Rua de Sá da Bandeira 637,twoWheeledVehicle,4,cmp,Pátio do Bolhão,41.151680,-8.606654,6,10,2024-05-31 20:58:09
4,urn:ngsi-ld:OnStreetParking:porto:cmp:e4a90d10...,R D João Coutinho 30,twoWheeledVehicle,7,cmp,Rua do Engenheiro Ferreira Dias,41.177007,-8.648385,3,10,2024-05-31 20:58:09
...,...,...,...,...,...,...,...,...,...,...,...
1771,urn:ngsi-ld:OnStreetParking:porto:cmp:5d4b6ef9...,Lg Alfandega 22,twoWheeledVehicle,23,cmp,Largo da Alfândega,41.143987,-8.621967,7,30,2024-06-07 21:10:02
1772,urn:ngsi-ld:OnStreetParking:porto:cmp:b8d167eb...,R Mouzinho Da Silveira 171,twoWheeledVehicle,6,cmp,Rua de Mouzinho da Silveira,41.143726,-8.612946,4,10,2024-06-07 21:10:02
1773,urn:ngsi-ld:OnStreetParking:porto:cmp:fa3f8cb7...,Rua do Clube dos Fenianos 4000-407,twoWheeledVehicle,15,cmp,Praça do General Humberto Delgado,41.149670,-8.611032,5,20,2024-06-07 21:10:02
1774,urn:ngsi-ld:OnStreetParking:porto:cmp:41374df4...,R Inf D Henrique 36,twoWheeledVehicle,10,cmp,Rua do Infante D. Henrique,41.141235,-8.614237,6,16,2024-06-07 21:10:02


In [37]:
final_df_raw.describe()

Unnamed: 0,available_spot_number,latitude,longitude,occupied_spot_number,total_spot_number,date
count,1776.0,1776.0,1776.0,1776.0,1776.0,1776
mean,7.073198,41.158596,-8.626108,3.625,10.698198,2024-06-04 08:44:27
min,-35.0,41.140887,-8.688397,0.0,9.0,2024-05-31 20:58:09
25%,5.75,41.149379,-8.646125,1.0,10.0,2024-06-02 15:00:53.500000
50%,8.0,41.157308,-8.620489,2.0,10.0,2024-06-04 10:10:29.500000
75%,10.0,41.165495,-8.606391,5.0,10.0,2024-06-06 02:43:15.500000
max,24.0,41.178858,-8.574214,45.0,30.0,2024-06-07 21:10:02
std,4.461067,0.010367,0.027123,4.488122,2.667513,


## 2.2 Geocode and preprocess data

In [38]:
# Helper function to geocode and extract post code from lat lon using Geopy Nominatim
def get_postcode_info_latlon(lat, lon):
    geolocator = Nominatim(user_agent="get_location")
    geocode = RateLimiter(geolocator.reverse, min_delay_seconds=1)
    try:
        location = geocode(f"{lat}, {lon}",exactly_one=True)
        address = location.raw.get('address', {})
        post_code = address.get('postcode', '')
        return pd.Series([post_code])
    except Exception as e:
        return pd.Series([''])

In [39]:
def complete_geocode_table(df):
    # Isolate and get unique lat/lon combinations
    df = df[["latitude", "longitude", "total_spot_number"]].groupby(["latitude", "longitude"]).first().reset_index()
    # get post code from APIO
    df["postal_code"] = df.apply(lambda x: get_postcode_info_latlon(x["latitude"], x["longitude"]), axis=1)
    # Convert to the format in the source table
    df["postal_code"] = df["postal_code"].str.replace("-","").astype(int)
    #Get source table
    df_match_postcode=pd.read_csv(match_postcode_url,encoding='utf-8')
    #Cleanup and rename
    to_geocode = df.merge(df_match_postcode[['CodigoPostal', 'Concelho', 'Freguesia Final (Pós RATF)']], left_on="postal_code", right_on='CodigoPostal', how="left")
    to_geocode = to_geocode.drop(columns=["CodigoPostal"]).rename(columns={"Concelho": "concelho", "Freguesia Final (Pós RATF)": "freguesia"})
    return to_geocode


In [40]:
def process_geocode_data(df_raw, to_geocode):
    try:
        # Calculate occupied spots percentage
        df_raw["occupied_perc"] = df_raw["occupied_spot_number"] / df_raw["total_spot_number"] * 100

        # Convert date to datetime and set as index
        df_raw['date'] = pd.to_datetime(df_raw['date'])
        df_raw.set_index('date', inplace=True)

        # Define the window size
        window_size = 2

        # Apply rolling calculations
        df_processed = df_raw.groupby(['id', 'street_address', 'latitude', 'longitude']).apply(
            lambda x: x.rolling(window=window_size, min_periods=1).agg({
                'occupied_spot_number': ['mean', 'max', 'min'],
                'occupied_perc': ['mean', 'max', 'min']
            })
        ).reset_index()

        # Flatten the multi-level columns
        df_processed.columns = ['id', 'street_address', 'latitude', 'longitude', 'date',
                                   'occupied_spot_number_mean', 'occupied_spot_number_max', 'occupied_spot_number_min',
                                   'occupied_perc_mean', 'occupied_perc_max', 'occupied_perc_min']

        # Add Freguesia and concelho
        df_processed = df_processed.merge(to_geocode[["latitude","longitude","concelho","freguesia", "total_spot_number"]], on=["latitude","longitude"], how="left")

        return df_processed

    except Exception as e:
        print(f"An error occurred while processing the data: {e}")
        return None

In [41]:
to_geocode = complete_geocode_table(final_df_raw)

In [42]:
to_geocode

Unnamed: 0,latitude,longitude,total_spot_number,postal_code,concelho,freguesia
0,41.140887,-8.617493,20,4050492,Porto,"União das freguesias de Cedofeita, Santo Ildef..."
1,41.141235,-8.614237,16,4050029,Porto,"União das freguesias de Cedofeita, Santo Ildef..."
2,41.141871,-8.604642,10,4000279,Porto,"União das freguesias de Cedofeita, Santo Ildef..."
3,41.142254,-8.615234,20,4050116,Porto,"União das freguesias de Cedofeita, Santo Ildef..."
4,41.142632,-8.608252,10,4000279,Porto,"União das freguesias de Cedofeita, Santo Ildef..."
...,...,...,...,...,...,...
217,41.177946,-8.609206,10,4200491,Porto,Paranhos
218,41.178148,-8.606578,10,4200135,Porto,Paranhos
219,41.178485,-8.586920,10,4200601,Porto,Paranhos
220,41.178844,-8.600135,20,4400319,Vila Nova de Gaia,União das freguesias de Santa Marinha e São Pe...


In [43]:
to_geocode[to_geocode.freguesia.isnull()]

Unnamed: 0,latitude,longitude,total_spot_number,postal_code,concelho,freguesia
95,41.155089,-8.61847,10,4050,,
150,41.162704,-8.611365,10,4200167,,
195,41.174638,-8.59894,10,4200298,,
208,41.176995,-8.596201,10,4200298,,
210,41.177056,-8.594558,10,4200298,,
216,41.177601,-8.597936,20,4200298,,


In [44]:
# Manual corrections
to_geocode.at[95, "concelho"] = "Porto"
to_geocode.at[95, "freguesia"] = "União das Freguesias de Cedofeita, Santo Ildefonso, Sé, Miragaia, São Nicolau e Vitória"
to_geocode.at[150, "concelho"] = "Porto"
to_geocode.at[150, "freguesia"] = "União das Freguesias de Cedofeita, Santo Ildefonso, Sé, Miragaia, São Nicolau e Vitória"
to_geocode.at[195, "concelho"] = "Porto"
to_geocode.at[195, "freguesia"] = "Paranhos"
to_geocode.at[208, "concelho"] = "Porto"
to_geocode.at[208, "freguesia"] = "Paranhos"
to_geocode.at[210, "concelho"] = "Porto"
to_geocode.at[210, "freguesia"] = "Paranhos"
to_geocode.at[216, "concelho"] = "Porto"
to_geocode.at[216, "freguesia"] = "Paranhos"

In [45]:
to_geocode[to_geocode.freguesia.isnull()]

Unnamed: 0,latitude,longitude,total_spot_number,postal_code,concelho,freguesia


In [46]:
df_processed = process_geocode_data(final_df_raw, to_geocode)

  df_processed = df_raw.groupby(['id', 'street_address', 'latitude', 'longitude']).apply(


In [47]:
df_processed.head(2)

Unnamed: 0,id,street_address,latitude,longitude,date,occupied_spot_number_mean,occupied_spot_number_max,occupied_spot_number_min,occupied_perc_mean,occupied_perc_max,occupied_perc_min,concelho,freguesia,total_spot_number
0,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-05-31 20:58:09,3.0,3.0,3.0,30.0,30.0,30.0,Porto,Ramalde,10
1,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-06-01 12:48:22,2.5,3.0,2.0,25.0,30.0,20.0,Porto,Ramalde,10


In [48]:
df_processed.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1776 entries, 0 to 1775
Data columns (total 14 columns):
 #   Column                     Non-Null Count  Dtype         
---  ------                     --------------  -----         
 0   id                         1776 non-null   object        
 1   street_address             1776 non-null   object        
 2   latitude                   1776 non-null   float64       
 3   longitude                  1776 non-null   float64       
 4   date                       1776 non-null   datetime64[ns]
 5   occupied_spot_number_mean  1776 non-null   float64       
 6   occupied_spot_number_max   1776 non-null   float64       
 7   occupied_spot_number_min   1776 non-null   float64       
 8   occupied_perc_mean         1776 non-null   float64       
 9   occupied_perc_max          1776 non-null   float64       
 10  occupied_perc_min          1776 non-null   float64       
 11  concelho                   1776 non-null   object        
 12  fregue

In [49]:
# Save dataframe to csv -> Just for PowerBI prototype purposes
df_processed.drop(columns=["id"]).to_csv("twowheel20240601.csv", index=False, encoding='utf-16')

## 2.3. Save preprocessed table to bucket

In [50]:
def save_parquet_to_minio(df, minio_endpoint, access_key, secret_key, bucket_name, parquet_output_path = f"2wheel_{date}.parquet", object_name = f"twowheel/2wheel_{date}.parquet", secure=False):
    try:
        # Create Minio client
        minio_client = Minio(minio_endpoint, access_key=access_key, secret_key=secret_key, secure=secure)

        # Create a dynamic file name including the current date and time
        date = datetime.today().strftime('%Y%m%d%H%M%S')
        parquet_output_path = parquet_output_path
        object_name = object_name

        # Save the preprocessed DataFrame as Parquet file
        df.to_parquet(parquet_output_path)

        # Upload the Parquet file to MinIO
        minio_client.fput_object(bucket_name, object_name, parquet_output_path)
        print(f"Preprocessed data is saved as Parquet in the MinIO bucket: {bucket_name}/{object_name}")

        # Remove the local Parquet file after upload
        if os.path.exists(parquet_output_path):
            os.remove(parquet_output_path)
    except Exception as e:
        print(f"An error occurred: {e}")


In [51]:
save_parquet_to_minio(df_processed, minio_endpoint, access_key, secret_key, output_bucket_name)

Preprocessed data is saved as Parquet in the MinIO bucket: preprocessed/twowheel/2wheel_20240610230610.parquet


# 3- Read and send preprocessed to Postgres database

In [9]:
# PostgreSQL connection details
host = 'localhost'
port = 5432
database = 'data_warehouse'
user = 'postgres'
password = 'password123'

# Preprocess table to use
input_bucket_name = "preprocessed"
#input_object_name = "twowheel/2wheel_20240608093101.parquet"
input_object_name = "twowheel/2wheel_20240614185451.parquet"


In [4]:
# Read Parquet file into memory

def read_parquet_from_minio(minio_endpoint, access_key, secret_key, bucket_name, object_name):
    try:
        # Create Minio client
        minio_client = Minio(minio_endpoint, access_key=access_key, secret_key=secret_key, secure=False)

        # Get the Parquet file from MinIO as a stream
        response = minio_client.get_object(bucket_name, object_name)

        # Read the Parquet file into a Pandas DataFrame directly from the stream
        data = response.read()
        df = pd.read_parquet(BytesIO(data))

        # Close the response
        response.close()
        response.release_conn()

        return df
    except Exception as e:
        print(f"An error occurred while reading Parquet from MinIO: {e}")
        return None

In [2]:
def connect_to_postgresql(host, port, database, user, password, schema):
    # Define the database URL
    db_url = f'postgresql://{user}:{password}@{host}:{port}/{database}'

    try:
        # Create the SQLAlchemy engine with the specified schema
        engine = create_engine(db_url, connect_args={"options": f"-csearch_path={schema}"})

        # Print success message
        print("Connected to PostgreSQL successfully!")

        # Return the engine object
        return engine

    except Exception as e:
        print(f"An error occurred while connecting to PostgreSQL: {e}")
        return None

In [10]:
df_processed = read_parquet_from_minio(minio_endpoint, access_key, secret_key, input_bucket_name, input_object_name)

In [11]:
df_processed.head()

Unnamed: 0,id,street_address,latitude,longitude,date,occupied_spot_number_mean,occupied_spot_number_max,occupied_spot_number_min,occupied_perc_mean,occupied_perc_max,occupied_perc_min,concelho,freguesia,total_spot_number
0,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-05-31 20:58:09,3.0,3.0,3.0,30.0,30.0,30.0,Porto,Ramalde,10
1,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-06-01 12:48:22,2.5,3.0,2.0,25.0,30.0,20.0,Porto,Ramalde,10
2,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-06-02 23:45:04,2.5,3.0,2.0,25.0,30.0,20.0,Porto,Ramalde,10
3,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-06-03 22:01:44,3.0,3.0,3.0,30.0,30.0,30.0,Porto,Ramalde,10
4,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-06-04 22:19:15,5.0,7.0,3.0,50.0,70.0,30.0,Porto,Ramalde,10


In [49]:
df_processed.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1776 entries, 0 to 1775
Data columns (total 13 columns):
 #   Column                     Non-Null Count  Dtype         
---  ------                     --------------  -----         
 0   id                         1776 non-null   object        
 1   street_address             1776 non-null   object        
 2   latitude                   1776 non-null   float64       
 3   longitude                  1776 non-null   float64       
 4   date                       1776 non-null   datetime64[ns]
 5   occupied_spot_number_mean  1776 non-null   float64       
 6   occupied_spot_number_max   1776 non-null   float64       
 7   occupied_spot_number_min   1776 non-null   float64       
 8   occupied_perc_mean         1776 non-null   float64       
 9   occupied_perc_max          1776 non-null   float64       
 10  occupied_perc_min          1776 non-null   float64       
 11  concelho                   1776 non-null   object        
 12  fregue

In [70]:
conn = connect_to_postgresql(host, port, database, user, password)

Connected to PostgreSQL successfully!


In [71]:
# Define the table name
table_name = 'preprocessed_data'

# Send the preprocessed DataFrame to SQL
df_processed.to_sql(table_name, conn, if_exists='replace', index=False)

# Close the connection
conn.close()

print("Preprocessed data is sent to SQL successfully!")

Preprocessed data is sent to SQL successfully!


## 3.2 Final version

### 3.2.1 Upload INE table to bucket as parquet

In [4]:
input_bucket_name = "preprocessed"

In [15]:
ine = pd.read_csv("ine.csv", encoding='utf-16')

In [16]:
ine.head()

Unnamed: 0,freguesia,genero,faixa_etaria,count,concelho,ano
0,Revinhade,M,30 - 39 anos,55,Felgueiras,2021
1,Idães,M,30 - 39 anos,162,Felgueiras,2021
2,União das freguesias de Margaride (Santa Eulál...,M,30 - 39 anos,1169,Felgueiras,2021
3,"União das freguesias de Pedreira, Rande e Sern...",M,30 - 39 anos,212,Felgueiras,2021
4,União das freguesias de Vila Cova da Lixa e Bo...,M,30 - 39 anos,400,Felgueiras,2021


In [17]:
# Remove ano as it is always the same
ine = ine.drop(columns=["ano"])

In [18]:
save_parquet_to_minio(ine, minio_endpoint, access_key, secret_key, input_bucket_name, parquet_output_path = f"ine_{date}.parquet", object_name = f"ine/ine_{date}.parquet")

Preprocessed data is saved as Parquet in the MinIO bucket: preprocessed/ine/ine_20240610213356.parquet


## 3.2.2 Feed the SQL database with the twowheel data and the INE data

In [6]:
# PostgreSQL connection details
host = 'localhost'
port = 5432
database = 'datawarehouse'
schema='testschema'
user = 'postgres'
password = 'password123'

# Load current files
object_name_2wheel = "twowheel/2wheel_20240610230610.parquet"
object_name_ine = "ine/ine_20240610213356.parquet"

In [53]:
# Read the 2 wheel Parquet file from MinIO
data = read_parquet_from_minio(minio_endpoint, access_key, secret_key, input_bucket_name, object_name_2wheel)

In [54]:
data.head(2)

Unnamed: 0,id,street_address,latitude,longitude,date,occupied_spot_number_mean,occupied_spot_number_max,occupied_spot_number_min,occupied_perc_mean,occupied_perc_max,occupied_perc_min,concelho,freguesia,total_spot_number
0,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-05-31 20:58:09,3.0,3.0,3.0,30.0,30.0,30.0,Porto,Ramalde,10
1,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-06-01 12:48:22,2.5,3.0,2.0,25.0,30.0,20.0,Porto,Ramalde,10


In [59]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1776 entries, 0 to 1775
Data columns (total 14 columns):
 #   Column                     Non-Null Count  Dtype         
---  ------                     --------------  -----         
 0   id                         1776 non-null   object        
 1   street_address             1776 non-null   object        
 2   latitude                   1776 non-null   float64       
 3   longitude                  1776 non-null   float64       
 4   date                       1776 non-null   datetime64[ns]
 5   occupied_spot_number_mean  1776 non-null   float64       
 6   occupied_spot_number_max   1776 non-null   float64       
 7   occupied_spot_number_min   1776 non-null   float64       
 8   occupied_perc_mean         1776 non-null   float64       
 9   occupied_perc_max          1776 non-null   float64       
 10  occupied_perc_min          1776 non-null   float64       
 11  concelho                   1776 non-null   object        
 12  fregue

In [60]:
data.columns

Index(['id', 'street_address', 'latitude', 'longitude', 'date',
       'occupied_spot_number_mean', 'occupied_spot_number_max',
       'occupied_spot_number_min', 'occupied_perc_mean', 'occupied_perc_max',
       'occupied_perc_min', 'concelho', 'freguesia', 'total_spot_number'],
      dtype='object')

In [56]:
# Load the population parquet
population_data = read_parquet_from_minio(minio_endpoint, access_key, secret_key, input_bucket_name, object_name_ine)c

In [57]:
population_data.head(2)

Unnamed: 0,freguesia,genero,faixa_etaria,count,concelho
0,Revinhade,M,30 - 39 anos,55,Felgueiras
1,Idães,M,30 - 39 anos,162,Felgueiras


In [61]:
population_data.columns

Index(['freguesia', 'genero', 'faixa_etaria', 'count', 'concelho'], dtype='object')

In [None]:
# Simplify and try only fact and dim location
if data is not None:
    # Ensure all column names are lowercase
    data.columns = map(str.lower, data.columns)

    # Establish connection to the PostgreSQL database
    engine = connect_to_postgresql(host, port, database, user, password, schema)

    if engine is not None:
        metadata = MetaData(schema=schema)
        try:
            metadata.reflect(bind=engine)

            # Access the required tables
            dim_location = Table('dimlocation', metadata, autoload_with=engine, schema=schema)
            fact_two_wheel = Table('facttwowheel', metadata, autoload_with=engine, schema=schema)

            with engine.connect() as connection:
                # Insert unique locations into DimLocation with conflict resolution
                unique_locations = data[['latitude', 'longitude', 'concelho', 'freguesia']].drop_duplicates()
                location_inserts = unique_locations.to_dict(orient='records')
                print("Inserting into dimlocation:", location_inserts)  # Log insert data for debugging
                insert_statement = pg_insert(dim_location).values(location_inserts)
                upsert_statement = insert_statement.on_conflict_do_nothing(index_elements=['latitude', 'longitude'])
                connection.execute(upsert_statement)
                connection.commit()  # Commit the transaction to ensure data is saved

                # Verify data exists before insertion
                if not data.empty:
                    # Log the data being inserted into FactTwoWheel
                    print("Inserting data into FactTwoWheel:")
                    print(data.head())

                    # Insert data into FactTwoWheel
                    fact_inserts = []
                    for index, row in data.iterrows():
                        fact_inserts.append({
                            'date': row['date'].date(),  # Directly use the date part of the Timestamp
                            'latitude': row['latitude'],
                            'longitude': row['longitude'],
                            'meanoccupation': row['occupied_spot_number_mean'],
                            'percentoccupation': row['occupied_perc_mean'],
                            'minoccupation': row['occupied_spot_number_min'],
                            'maxoccupation': row['occupied_spot_number_max'],
                            'totalspotnumber': row['total_spot_number']
                        })
                    try:
                        connection.execute(pg_insert(fact_two_wheel).on_conflict_do_nothing(), fact_inserts)
                        connection.commit()  # Commit the transaction to ensure data is saved
                    except SQLAlchemyError as e:
                        print(f"An error occurred during fact_two_wheel insertion: {e}")
                else:
                    print("No data to insert into FactTwoWheel.")

            print("Two-wheel data insertion completed successfully!")

        except KeyError as e:
            print(f"Column not found in the DataFrame: {e}")
        except SQLAlchemyError as e:
            print(f"An error occurred with SQLAlchemy: {e}")
        except Exception as e:
            print(f"An unexpected error occurred: {e}")

### Attempt 2 Merge data tables and then populate

In [20]:
# PostgreSQL connection details
host = 'localhost'
port = 5432
database = 'datawarehouse'
schema='testschema1'
user = 'postgres'
password = 'password123'

# Load current files
object_name_2wheel = "twowheel/2wheel_20240610230610.parquet"
object_name_ine = "ine/ine_20240610213356.parquet"

In [8]:
# Read the 2 wheel Parquet file from MinIO
data = read_parquet_from_minio(minio_endpoint, access_key, secret_key, input_bucket_name, object_name_2wheel)

In [9]:
# Load the population parquet
population_data = read_parquet_from_minio(minio_endpoint, access_key, secret_key, input_bucket_name, object_name_ine)

In [15]:
# Merge tables on freguesia
full_table = pd.merge(data, population_data, on=["concelho", "freguesia"])
# Create a unique location_id for each combination of latitude and longitude
full_table['locationID'] = full_table.groupby(['latitude', 'longitude']).ngroup() + 1

In [22]:
full_table.head(30)

Unnamed: 0,id,street_address,latitude,longitude,date,occupied_spot_number_mean,occupied_spot_number_max,occupied_spot_number_min,occupied_perc_mean,occupied_perc_max,occupied_perc_min,concelho,freguesia,total_spot_number,genero,faixa_etaria,count,locationid
0,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-05-31 20:58:09,3.0,3.0,3.0,30.0,30.0,30.0,Porto,Ramalde,10,M,30 - 39 anos,2378,176
1,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-05-31 20:58:09,3.0,3.0,3.0,30.0,30.0,30.0,Porto,Ramalde,10,H,30 - 39 anos,2278,176
2,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-05-31 20:58:09,3.0,3.0,3.0,30.0,30.0,30.0,Porto,Ramalde,10,M,90 - 99 anos,420,176
3,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-05-31 20:58:09,3.0,3.0,3.0,30.0,30.0,30.0,Porto,Ramalde,10,H,90 - 99 anos,146,176
4,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-05-31 20:58:09,3.0,3.0,3.0,30.0,30.0,30.0,Porto,Ramalde,10,M,10 - 19 anos,1884,176
5,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-05-31 20:58:09,3.0,3.0,3.0,30.0,30.0,30.0,Porto,Ramalde,10,H,10 - 19 anos,1872,176
6,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-05-31 20:58:09,3.0,3.0,3.0,30.0,30.0,30.0,Porto,Ramalde,10,M,20 - 29 anos,2127,176
7,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-05-31 20:58:09,3.0,3.0,3.0,30.0,30.0,30.0,Porto,Ramalde,10,H,20 - 29 anos,2167,176
8,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-05-31 20:58:09,3.0,3.0,3.0,30.0,30.0,30.0,Porto,Ramalde,10,M,60 - 69 anos,2937,176
9,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-05-31 20:58:09,3.0,3.0,3.0,30.0,30.0,30.0,Porto,Ramalde,10,H,60 - 69 anos,2103,176


In [18]:
def insert_data_to_db(merged_data, host, port, database, user, password, schema):
    if merged_data is not None:
        # Ensure all column names are lowercase
        merged_data.columns = map(str.lower, merged_data.columns)

        # Establish connection to the PostgreSQL database
        engine = connect_to_postgresql(host, port, database, user, password, schema)

        if engine is not None:
            metadata = MetaData(schema=schema)
            try:
                metadata.reflect(bind=engine)

                # Access the required tables
                dim_location = Table('dimlocation', metadata, autoload_with=engine, schema=schema)
                fact_two_wheel = Table('facttwowheel', metadata, autoload_with=engine, schema=schema)
                dim_population = Table('dimpopulation', metadata, autoload_with=engine, schema=schema)

                with engine.connect() as connection:
                    # Insert unique locations into DimLocation with conflict resolution
                    unique_locations = merged_data[['locationid', 'latitude', 'longitude', 'concelho', 'freguesia']].drop_duplicates()
                    location_inserts = unique_locations.to_dict(orient='records')
                    print("Inserting into dimlocation:", location_inserts)  # Log insert data for debugging
                    insert_statement = pg_insert(dim_location).values(location_inserts)
                    upsert_statement = insert_statement.on_conflict_do_nothing(index_elements=['latitude', 'longitude'])
                    connection.execute(upsert_statement)
                    connection.commit()  # Commit the transaction to ensure data is saved

                    # Verify data exists before insertion
                    if not merged_data.empty:
                        # Log the data being inserted into FactTwoWheel
                        print("Inserting data into FactTwoWheel:")
                        print(merged_data.head())

                        # Insert data into FactTwoWheel and DimPopulation
                        fact_inserts = []
                        population_inserts = []
                        for index, row in merged_data.iterrows():
                            fact_inserts.append({
                                'date': row['date'].date(),
                                'latitude': row['latitude'],
                                'longitude': row['longitude'],
                                'meanoccupation': row['occupied_spot_number_mean'],
                                'percentoccupation': row['occupied_perc_mean'],
                                'minoccupation': row['occupied_spot_number_min'],
                                'maxoccupation': row['occupied_spot_number_max'],
                                'totalspotnumber': row['total_spot_number'],
                                'locationid': row['locationid']
                            })
                            population_inserts.append({
                                'freguesia': row['freguesia'],
                                'locationid': row['locationid'],
                                'genero': row['genero'],
                                'faixa_etaria': row['faixa_etaria'],
                                'count': row['count']
                            })

                        try:
                            connection.execute(pg_insert(fact_two_wheel).on_conflict_do_nothing(), fact_inserts)
                            connection.execute(pg_insert(dim_population).on_conflict_do_nothing(), population_inserts)
                            connection.commit()  # Commit the transaction to ensure data is saved
                        except SQLAlchemyError as e:
                            print(f"An error occurred during data insertion: {e}")
                    else:
                        print("No data to insert into FactTwoWheel or DimPopulation.")

                print("Data insertion completed successfully!")

            except KeyError as e:
                print(f"Column not found in the DataFrame: {e}")
            except SQLAlchemyError as e:
                print(f"An error occurred with SQLAlchemy: {e}")
            except Exception as e:
                print(f"An unexpected error occurred: {e}")

In [None]:
insert_data_to_db(full_table, host, port, database, user, password, schema)

## Final 2 table version

In [8]:
# PostgreSQL connection details
host = 'localhost'
port = 5432
database = 'datawarehouse'
schema='testschema'
user = 'postgres'
password = 'password123'

input_bucket_name = "preprocessed"

# Load current files
object_name_2wheel = "twowheel/2wheel_20240610230610.parquet"

In [9]:
# Read the 2 wheel Parquet file from MinIO
data = read_parquet_from_minio(minio_endpoint, access_key, secret_key, input_bucket_name, object_name_2wheel)

In [16]:
data

Unnamed: 0,id,street_address,latitude,longitude,date,occupied_spot_number_mean,occupied_spot_number_max,occupied_spot_number_min,occupied_perc_mean,occupied_perc_max,occupied_perc_min,concelho,freguesia,total_spot_number,locationid
0,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-05-31 20:58:09,3.0,3.0,3.0,30.0,30.0,30.0,Porto,Ramalde,10,181
1,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-06-01 12:48:22,2.5,3.0,2.0,25.0,30.0,20.0,Porto,Ramalde,10,181
2,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-06-02 23:45:04,2.5,3.0,2.0,25.0,30.0,20.0,Porto,Ramalde,10,181
3,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-06-03 22:01:44,3.0,3.0,3.0,30.0,30.0,30.0,Porto,Ramalde,10,181
4,urn:ngsi-ld:OnStreetParking:porto:cmp:001b74a9...,Av Dr Antunes Guimarães 912,41.170801,-8.656739,2024-06-04 22:19:15,5.0,7.0,3.0,50.0,70.0,30.0,Porto,Ramalde,10,181
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1771,urn:ngsi-ld:OnStreetParking:porto:cmp:fe5304ff...,R Dr Aarao Lacerda 502,41.167479,-8.659702,2024-06-03 22:01:44,0.5,1.0,0.0,5.0,10.0,0.0,Porto,"União das freguesias de Aldoar, Foz do Douro e...",10,175
1772,urn:ngsi-ld:OnStreetParking:porto:cmp:fe5304ff...,R Dr Aarao Lacerda 502,41.167479,-8.659702,2024-06-04 22:19:15,0.0,0.0,0.0,0.0,0.0,0.0,Porto,"União das freguesias de Aldoar, Foz do Douro e...",10,175
1773,urn:ngsi-ld:OnStreetParking:porto:cmp:fe5304ff...,R Dr Aarao Lacerda 502,41.167479,-8.659702,2024-06-05 20:00:01,0.0,0.0,0.0,0.0,0.0,0.0,Porto,"União das freguesias de Aldoar, Foz do Douro e...",10,175
1774,urn:ngsi-ld:OnStreetParking:porto:cmp:fe5304ff...,R Dr Aarao Lacerda 502,41.167479,-8.659702,2024-06-06 22:52:59,1.5,3.0,0.0,15.0,30.0,0.0,Porto,"União das freguesias de Aldoar, Foz do Douro e...",10,175


In [18]:
def populate_tables(data, host, port, database, user, password, schema):
    # Ensure all column names are lowercase
    data.columns = map(str.lower, data.columns)

    # Create LocationID in the data DataFrame
    location_groups = data.groupby(['latitude', 'longitude']).ngroup() + 1
    data['locationid'] = location_groups

    # Establish connection to the PostgreSQL database
    engine = connect_to_postgresql(host, port, database, user, password, schema)

    if engine is not None:
        metadata = MetaData(schema=schema)
        try:
            metadata.reflect(bind=engine)

            # Access the required tables
            dim_location = Table('dimlocation', metadata, autoload_with=engine, schema=schema)
            fact_two_wheel = Table('facttwowheel', metadata, autoload_with=engine, schema=schema)

            with engine.connect() as connection:
                transaction = connection.begin()
                try:
                    # Insert unique locations into DimLocation with conflict resolution
                    unique_locations = data[['locationid', 'latitude', 'longitude', 'concelho', 'freguesia']].drop_duplicates()
                    location_inserts = unique_locations.to_dict(orient='records')
                    insert_statement = pg_insert(dim_location).values(location_inserts)
                    upsert_statement = insert_statement.on_conflict_do_nothing(index_elements=['latitude', 'longitude'])
                    connection.execute(upsert_statement)

                    # Commit the transaction to save data to DimLocation
                    transaction.commit()
                except SQLAlchemyError as e:
                    transaction.rollback()
                    print(f"Error during DimLocation insertion: {e}")

                # Start a new transaction for FactTwoWheel data insertion
                transaction = connection.begin()
                try:
                    # Verify data exists before insertion
                    if not data.empty:
                        # Insert data into FactTwoWheel
                        fact_inserts = []
                        for index, row in data.iterrows():
                            fact_inserts.append({
                                'date': row['date'].date(),  # Directly use the date part of the Timestamp
                                'latitude': row['latitude'],
                                'longitude': row['longitude'],
                                'meanoccupation': row['occupied_spot_number_mean'],
                                'percentoccupation': row['occupied_perc_mean'],
                                'minoccupation': row['occupied_spot_number_min'],
                                'maxoccupation': row['occupied_spot_number_max'],
                                'totalspotnumber': row['total_spot_number'],
                                'locationid': row['locationid']  # Use the precomputed LocationID
                            })

                        connection.execute(pg_insert(fact_two_wheel).on_conflict_do_nothing(), fact_inserts)

                        # Commit the transaction to save data to FactTwoWheel
                        transaction.commit()
                    else:
                        print("No data to insert into FactTwoWheel.")
                except SQLAlchemyError as e:
                    transaction.rollback()
                    print(f"Error during FactTwoWheel insertion: {e}")

            print("Data insertion completed successfully!")

        except KeyError as e:
            print(f"Column not found in the DataFrame: {e}")
        except SQLAlchemyError as e:
            print(f"Error with SQLAlchemy: {e}")
        except Exception as e:
            print(f"Unexpected error: {e}")

In [19]:
populate_tables(data, host, port, database, user, password, schema)

Connected to PostgreSQL successfully!
Data insertion completed successfully!


# SPARK and Geocode tests (Legacy for reference only)

### Geocoding tests

In [None]:
# Check the outputs of the geocoding
geolocator = Nominatim(user_agent="get_location")
geocode = RateLimiter(geolocator.reverse, min_delay_seconds=1)
location = geocode("41.158887, -8.628684",exactly_one=True)
address = location.raw

In [None]:
address

{'place_id': 247184657,
 'licence': 'Data © OpenStreetMap contributors, ODbL 1.0. http://osm.org/copyright',
 'osm_type': 'node',
 'osm_id': 8903663010,
 'lat': '41.1585937',
 'lon': '-8.6280332',
 'class': 'amenity',
 'type': 'bank',
 'place_rank': 30,
 'importance': 9.99999999995449e-06,
 'addresstype': 'amenity',
 'name': 'Banco CTT',
 'display_name': 'Banco CTT, 74, Praça Mouzinho de Albuquerque, Ramada Alta, Cedofeita, Cedofeita, Santo Ildefonso, Sé, Miragaia, São Nicolau e Vitória, Porto, 4100-999, Portugal',
 'address': {'amenity': 'Banco CTT',
  'house_number': '74',
  'road': 'Praça Mouzinho de Albuquerque',
  'neighbourhood': 'Ramada Alta',
  'suburb': 'Cedofeita',
  'city_district': 'Cedofeita, Santo Ildefonso, Sé, Miragaia, São Nicolau e Vitória',
  'city': 'Porto',
  'county': 'Porto',
  'ISO3166-2-lvl6': 'PT-13',
  'postcode': '4100-999',
  'country': 'Portugal',
  'country_code': 'pt'},
 'boundingbox': ['41.1585437', '41.1586437', '-8.6280832', '-8.6279832']}

In [None]:
# Testing Google version

api_key = 'AIzaSyDbmerooiniS0lhSGKr-ogdyXgTgKGJL6w'

geolocator = GoogleV3(api_key=api_key)
geocode = RateLimiter(geolocator.reverse, min_delay_seconds=1)

location = geocode("41.158887, -8.628684", exactly_one=True)
address_google = location.raw

In [None]:
address_google

{'address_components': [{'long_name': '41',
   'short_name': '41',
   'types': ['street_number']},
  {'long_name': 'Praça de Mouzinho de Albuquerque',
   'short_name': 'Praça de Mouzinho de Albuquerque',
   'types': ['route']},
  {'long_name': 'Porto',
   'short_name': 'Porto',
   'types': ['locality', 'political']},
  {'long_name': 'União das freguesias de Cedofeita, Santo Ildefonso, Sé, Miragaia, São Nicolau e Vitória',
   'short_name': 'União das freguesias de Cedofeita, Santo Ildefonso, Sé, Miragaia, São Nicolau e Vitória',
   'types': ['administrative_area_level_3', 'political']},
  {'long_name': 'Porto',
   'short_name': 'Porto',
   'types': ['administrative_area_level_2', 'political']},
  {'long_name': 'Porto',
   'short_name': 'Porto',
   'types': ['administrative_area_level_1', 'political']},
  {'long_name': 'Portugal',
   'short_name': 'PT',
   'types': ['country', 'political']},
  {'long_name': '4050',
   'short_name': '4050',
   'types': ['postal_code', 'postal_code_prefix'

In [None]:
# Helper function to geocode and extract freguesia and concelho from lat lon using Geopy Nominatim
def get_location_info_latlon(lat, lon):
    geolocator = Nominatim(user_agent="get_location")
    geocode = RateLimiter(geolocator.reverse, min_delay_seconds=1)
    try:
        location = geocode(f"{lat}, {lon}",exactly_one=True)
        address = location.raw.get('address', {})
        # Freguesia is in 'suburb'
        freguesia = address.get('suburb', '')
        # Concelho is in 'county'
        concelho = address.get('city', '')
        # Post code
        post_code = address.get('postcode', '')
        return pd.Series([freguesia, concelho, post_code])
    except Exception as e:
        return pd.Series(['', '', ''])

In [None]:
google_api_key = 'AIzaSyDbmerooiniS0lhSGKr-ogdyXgTgKGJL6w'

In [None]:
# Initialize global row counters
nominatim_counter = 0
google_counter = 0

def get_location_info_latlon_v2(lat, lon, google_api_key):
    global nominatim_counter, google_counter
    try:
        # Initialize Nominatim geocoder
        nominatim_geolocator = Nominatim(user_agent="get_location")
        nominatim_geocode = RateLimiter(nominatim_geolocator.reverse, min_delay_seconds=1)

        # Try with Nominatim
        location = nominatim_geocode(f"{lat}, {lon}", exactly_one=True)
        if location:
            address = location.raw.get('address', {})
            # Freguesia in priority order: city_district, suburb, town
            freguesia = address.get('city_district', '') or address.get('suburb', '') or address.get('town', '')
            concelho = address.get('city', '') or address.get('county', '')
            post_code = address.get('postcode', '')  # Post code

            # If Nominatim provides all needed information
            if freguesia and concelho and post_code:
                nominatim_counter += 1
                return pd.Series([freguesia, concelho, post_code])

        # If Nominatim fails, initialize Google Maps geocoder
        google_geolocator = GoogleV3(api_key=google_api_key)
        google_geocode = RateLimiter(google_geolocator.reverse, min_delay_seconds=1)

        # Try with Google Maps
        location = google_geocode((lat, lon))
        if location:
            address_components = location[0].raw['address_components']
            freguesia = ''
            concelho = ''
            post_code = ''
            for component in address_components:
                if 'sublocality_level_1' in component['types'] or 'locality' in component['types']:
                    freguesia = component['long_name']
                elif 'administrative_area_level_2' in component['types']:
                    concelho = component['long_name']
                elif 'postal_code' in component['types']:
                    post_code = component['long_name']
            google_counter += 1
            return pd.Series([freguesia, concelho, post_code])

        return pd.Series(['', '', ''])
    except Exception as e:
        return pd.Series(['', '', ''])

In [None]:
mapping_dict = {"4410001-4410999": {"concelho": "Vila Nova de Gaia", "freguesia": "Arcozelo"},
    "4430001-4430999": {"concelho": "Vila Nova de Gaia", "freguesia": "Avintes"},
    "4410001-4410999": {"concelho": "Vila Nova de Gaia", "freguesia": "Canelas"},
    "4400001-4400999": {"concelho": "Vila Nova de Gaia", "freguesia": "Canidelo"},
    "4415001-4415999": {"concelho": "Vila Nova de Gaia", "freguesia": ["Crestuma", "União das Freguesias de Grijó e Sermonde", "União das Freguesias de Pedroso e Seixezelo", "União das Freguesias de Sandim, Olival, Lever e Crestuma", "União das Freguesias de Serzedo e Perosinho"]},
    "4405001-4405999": {"concelho": "Vila Nova de Gaia", "freguesia": ["União das Freguesias de Gulpilhares e Valadares", "Madalena"]},
    "4430001-4430999": {"concelho": "Vila Nova de Gaia", "freguesia": ["União das Freguesias de Mafamude e Vilar do Paraíso", "Oliveira do Douro"]},
    "4400001-4400999": {"concelho": "Vila Nova de Gaia", "freguesia": "União das Freguesias de Santa Marinha e São Pedro da Afurada"},
    "4430001-4430999": {"concelho": "Vila Nova de Gaia", "freguesia": "Vilar de Andorinho"}}

In [None]:
def map_postal_code(postal_code, mapping):
    postal_code_num = int(postal_code.replace('-', ''))
    for key, value in mapping.items():
        start, end = map(int, key.split('-'))
        if start <= postal_code_num <= end:
            return pd.Series([value['concelho'], value['freguesia']])
    return pd.Series([None, None])

In [None]:
# Function to geocode and extract freguesia and concelho using street address using Geopy Nominatim
def get_location_info_address(address):
    geolocator = Nominatim(user_agent="get_location_address")
    geocode = RateLimiter(geolocator.geocode, min_delay_seconds=1)
    try:
        location = geocode(address, exactly_one=True)
        if location:
            address_detail = location.raw.get('address', {})
            freguesia = address_detail.get('suburb', '')
            concelho = address_detail.get('city', '')
            post_code = address_detail.get('postcode', '')
            return pd.Series([freguesia, concelho, post_code])
        else:
            return pd.Series(['', '', ''])
    except Exception as e:
        return pd.Series(['', '', ''])

In [None]:
mapping_dict = {"4410001-4410999": {"concelho": "Vila Nova de Gaia", "freguesia": "Arcozelo"},
    "4430001-4430999": {"concelho": "Vila Nova de Gaia", "freguesia": "Avintes"},
    "4410001-4410999": {"concelho": "Vila Nova de Gaia", "freguesia": "Canelas"},
    "4400001-4400999": {"concelho": "Vila Nova de Gaia", "freguesia": "Canidelo"},
    "4415001-4415999": {"concelho": "Vila Nova de Gaia", "freguesia": ["Crestuma", "União das Freguesias de Grijó e Sermonde", "União das Freguesias de Pedroso e Seixezelo", "União das Freguesias de Sandim, Olival, Lever e Crestuma", "União das Freguesias de Serzedo e Perosinho"]},
    "4405001-4405999": {"concelho": "Vila Nova de Gaia", "freguesia": ["União das Freguesias de Gulpilhares e Valadares", "Madalena"]},
    "4430001-4430999": {"concelho": "Vila Nova de Gaia", "freguesia": ["União das Freguesias de Mafamude e Vilar do Paraíso", "Oliveira do Douro"]},
    "4400001-4400999": {"concelho": "Vila Nova de Gaia", "freguesia": "União das Freguesias de Santa Marinha e São Pedro da Afurada"},
    "4430001-4430999": {"concelho": "Vila Nova de Gaia", "freguesia": "Vilar de Andorinho"}}

In [None]:
def map_postal_code(postal_code, mapping):
    postal_code_num = int(postal_code.replace('-', ''))
    for key, value in mapping.items():
        start, end = map(int, key.split('-'))
        if start <= postal_code_num <= end:
            return pd.Series([value['concelho'], value['freguesia']])
    return pd.Series([None, None])

In [None]:

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def get_postcode_info_latlon_google(lat, lon, google_api_key):
    try:
        logging.info(f"Processing lat: {lat}, lon: {lon}")
        google_geolocator = GoogleV3(api_key=google_api_key)
        google_geocode = RateLimiter(google_geolocator.reverse, min_delay_seconds=1)

        location = google_geocode((lat, lon))
        logging.info(f"API response for lat: {lat}, lon: {lon} - {location}")

        if location and isinstance(location, list) and len(location) > 0:
            address_components = location[0].raw['address_components']
            freguesia = ''
            concelho = ''
            post_code = ''
            for component in address_components:
                if 'administrative_area_level_3' in component['types'] and 'political' in component['types']:
                    freguesia = component['long_name']
                elif 'administrative_area_level_2' in component['types'] and 'political' in component['types']:
                    concelho = component['long_name']
                elif 'postal_code' in component['types']:
                    post_code = component['long_name']
            logging.info(f"Found address: freguesia={freguesia}, concelho={concelho}, post_code={post_code}")
            return pd.Series([freguesia, concelho, post_code], index=['freguesia', 'concelho', 'post_code'])
        else:
            logging.warning(f"No valid location found for lat: {lat}, lon: {lon}")
            return pd.Series(['', '', ''], index=['freguesia', 'concelho', 'post_code'])
    except Exception as e:
        logging.error(f"Error processing lat: {lat}, lon: {lon} - {e}")
        return pd.Series(['', '', ''], index=['freguesia', 'concelho', 'post_code'])

## Spark tests

In [10]:
json_data[0]

{'id': 'urn:ngsi-ld:OnStreetParking:porto:cmp:7cfee26e-b86c-4814-aadc-4b7d11e6010d',
 'type': 'OnStreetParking',
 'address': {'type': 'PostalAddress',
  'value': {'addressCountry': None,
   'addressLocality': None,
   'addressRegion': None,
   'postOfficeBoxNumber': None,
   'postalCode': None,
   'streetAddress': 'Pc Mouzinho De Albuquerque 56'},
  'metadata': {}},
 'allowedVehicleType': {'type': 'Array',
  'value': ['twoWheeledVehicle'],
  'metadata': {}},
 'availableSpotNumber': {'type': 'Number', 'value': 10, 'metadata': {}},
 'dataProvider': {'type': 'Text', 'value': 'cmp', 'metadata': {}},
 'description': {'type': 'Text',
  'value': 'Praça de Mouzinho de Albuquerque',
  'metadata': {}},
 'location': {'type': 'geo:json',
  'value': {'type': 'Point', 'coordinates': [-8.628683966, 41.158887288]},
  'metadata': {}},
 'occupiedSpotNumber': {'type': 'Number', 'value': 0, 'metadata': {}},
 'totalSpotNumber': {'type': 'Number', 'value': 10, 'metadata': {}},
 'vehicle_ids': {'type': 'Arra

In [11]:
transformed_data = []
for entry in json_data:
    transformed_entry = {
        'id': entry['id'],
        'type': entry['type'],
        'address': json.dumps(entry.get('address', {}).get('value', {})),
        'allowedVehicleType': entry.get('allowedVehicleType', {}).get('value', []),
        'availableSpotNumber': entry.get('availableSpotNumber', {}).get('value', None),
        'dataProvider': entry.get('dataProvider', {}).get('value', None),
        'description': entry.get('description', {}).get('value', ''),
        'location': json.dumps(entry.get('location', {}).get('value', {})),
        'occupiedSpotNumber': entry.get('occupiedSpotNumber', {}).get('value', None),
        'totalSpotNumber': entry.get('totalSpotNumber', {}).get('value', None),
        'vehicle_ids': entry.get('vehicle_ids', {}).get('value', [])
        }
    transformed_data.append(transformed_entry)

In [12]:
transformed_data

[{'id': 'urn:ngsi-ld:OnStreetParking:porto:cmp:7cfee26e-b86c-4814-aadc-4b7d11e6010d',
  'type': 'OnStreetParking',
  'address': '{"addressCountry": null, "addressLocality": null, "addressRegion": null, "postOfficeBoxNumber": null, "postalCode": null, "streetAddress": "Pc Mouzinho De Albuquerque 56"}',
  'allowedVehicleType': ['twoWheeledVehicle'],
  'availableSpotNumber': 10,
  'dataProvider': 'cmp',
  'description': 'Praça de Mouzinho de Albuquerque',
  'location': '{"type": "Point", "coordinates": [-8.628683966, 41.158887288]}',
  'occupiedSpotNumber': 0,
  'totalSpotNumber': 10,
  'vehicle_ids': []},
 {'id': 'urn:ngsi-ld:OnStreetParking:porto:cmp:a7c593b1-1dd2-4a01-bd21-48c35fa0ad93',
  'type': 'OnStreetParking',
  'address': '{"addressCountry": null, "addressLocality": null, "addressRegion": null, "postOfficeBoxNumber": null, "postalCode": null, "streetAddress": "Passeio Das Virtudes 12"}',
  'allowedVehicleType': ['twoWheeledVehicle'],
  'availableSpotNumber': 4,
  'dataProvider':

In [13]:
# Define schema for the DataFrame
schema = StructType([
        StructField('id', StringType(), True),
        StructField('type', StringType(), True),
        StructField('address', StringType(), True),
        StructField('allowedVehicleType', ArrayType(StringType()), True),
        StructField('availableSpotNumber', IntegerType(), True),
        StructField('dataProvider', StringType(), True),
        StructField('description', StringType(), True),
        StructField('location', StringType(), True),
        StructField('occupiedSpotNumber', IntegerType(), True),
        StructField('totalSpotNumber', IntegerType(), True),
        StructField('vehicle_ids', ArrayType(StringType()), True)
    ])

NameError: name 'StructType' is not defined

In [None]:
# Define the MinIO endpoint URL and access keys
minio_endpoint = "http://localhost:9000"
access_key = "admin"
secret_key = "password123"

# Define the bucket name and object name
bucket_name = "test"
output_path = f"s3a://{bucket_name}/output_data.parquet"

# Configure Spark session
spark = SparkSession.builder \
    .appName('Write Parquet to MinIO') \
    .config("spark.hadoop.fs.s3a.access.key", access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", secret_key) \
    .config("spark.hadoop.fs.s3a.endpoint", minio_endpoint) \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .getOrCreate()

your 131072x1 screen size is bogus. expect trouble
24/05/25 00:46:44 WARN Utils: Your hostname, Surface-DSO resolves to a loopback address: 127.0.1.1; using 172.31.77.62 instead (on interface eth0)
24/05/25 00:46:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/25 00:46:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
# Create a Spark DataFrame from JSON data
df = spark.createDataFrame(transformed_data, schema)

# Print schema and data
df.printSchema()
df.show()


root
 |-- id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- address: string (nullable = true)
 |-- allowedVehicleType: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- availableSpotNumber: integer (nullable = true)
 |-- dataProvider: string (nullable = true)
 |-- description: string (nullable = true)
 |-- location: string (nullable = true)
 |-- occupiedSpotNumber: integer (nullable = true)
 |-- totalSpotNumber: integer (nullable = true)
 |-- vehicle_ids: array (nullable = true)
 |    |-- element: string (containsNull = true)



                                                                                

+--------------------+---------------+--------------------+-------------------+-------------------+------------+--------------------+--------------------+------------------+---------------+--------------------+
|                  id|           type|             address| allowedVehicleType|availableSpotNumber|dataProvider|         description|            location|occupiedSpotNumber|totalSpotNumber|         vehicle_ids|
+--------------------+---------------+--------------------+-------------------+-------------------+------------+--------------------+--------------------+------------------+---------------+--------------------+
|urn:ngsi-ld:OnStr...|OnStreetParking|{"addressCountry"...|[twoWheeledVehicle]|                 10|         cmp|Praça de Mouzinho...|{"type": "Point",...|                 0|             10|                  []|
|urn:ngsi-ld:OnStr...|OnStreetParking|{"addressCountry"...|[twoWheeledVehicle]|                  2|         cmp|Passeio das Virtudes|{"type": "Point",...|  

In [None]:
df.select("description", "location", "availableSpotNumber", "occupiedSpotNumber","totalSpotNumber").show()

[Stage 1:>                                                          (0 + 1) / 1]

+--------------------+--------------------+-------------------+------------------+---------------+
|         description|            location|availableSpotNumber|occupiedSpotNumber|totalSpotNumber|
+--------------------+--------------------+-------------------+------------------+---------------+
|Praça de Mouzinho...|{"type": "Point",...|                 10|                 0|             10|
|Passeio das Virtudes|{"type": "Point",...|                  2|                 8|             10|
|Campo dos Mártire...|{"type": "Point",...|                  3|                 7|             10|
|     Pátio do Bolhão|{"type": "Point",...|                  8|                 2|             10|
|Rua do Engenheiro...|{"type": "Point",...|                  8|                 2|             10|
|Rua de D. João Co...|{"type": "Point",...|                  9|                 1|             10|
|  Rua de Júlio Dinis|{"type": "Point",...|                  5|                 5|             10|
|Rua da In

                                                                                

In [None]:
# Write the DataFrame to MinIO in Parquet format
df.write.mode('overwrite').parquet(output_path)

24/05/24 00:01:41 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
24/05/24 00:01:41 WARN BasicProfileConfigLoader: Your profile name includes a 'profile ' prefix. This is considered part of the profile name in the Java SDK, so you will need to include this prefix in your profile name when you reference this profile from your Java code.
24/05/24 00:01:42 WARN BasicProfileConfigLoader: Your profile name includes a 'profile ' prefix. This is considered part of the profile name in the Java SDK, so you will need to include this prefix in your profile name when you reference this profile from your Java code.
24/05/24 00:01:44 WARN BasicProfileConfigLoader: Your profile name includes a 'profile ' prefix. This is considered part of the profile name in the Java SDK, so you will need to include this prefix in your profile name when you reference this profile from your Java code.
                                         

## Testing spark to read the parquet file created in the bucket

In [None]:
### USING THE SAME SESSION AS ABOVE FOR SAVING ###
# Read Parquet file from MinIO
df = spark.read.schema(schema).parquet(output_path)

# Show the DataFrame schema and content
df.printSchema()
df.show()

24/05/25 00:49:07 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
24/05/25 00:49:07 WARN BasicProfileConfigLoader: Your profile name includes a 'profile ' prefix. This is considered part of the profile name in the Java SDK, so you will need to include this prefix in your profile name when you reference this profile from your Java code.
24/05/25 00:49:08 WARN BasicProfileConfigLoader: Your profile name includes a 'profile ' prefix. This is considered part of the profile name in the Java SDK, so you will need to include this prefix in your profile name when you reference this profile from your Java code.
24/05/25 00:49:10 WARN BasicProfileConfigLoader: Your profile name includes a 'profile ' prefix. This is considered part of the profile name in the Java SDK, so you will need to include this prefix in your profile name when you reference this profile from your Java code.


root
 |-- id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- address: string (nullable = true)
 |-- allowedVehicleType: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- availableSpotNumber: integer (nullable = true)
 |-- dataProvider: string (nullable = true)
 |-- description: string (nullable = true)
 |-- location: string (nullable = true)
 |-- occupiedSpotNumber: integer (nullable = true)
 |-- totalSpotNumber: integer (nullable = true)
 |-- vehicle_ids: array (nullable = true)
 |    |-- element: string (containsNull = true)



                                                                                

+--------------------+---------------+--------------------+-------------------+-------------------+------------+--------------------+--------------------+------------------+---------------+--------------------+
|                  id|           type|             address| allowedVehicleType|availableSpotNumber|dataProvider|         description|            location|occupiedSpotNumber|totalSpotNumber|         vehicle_ids|
+--------------------+---------------+--------------------+-------------------+-------------------+------------+--------------------+--------------------+------------------+---------------+--------------------+
|urn:ngsi-ld:OnStr...|OnStreetParking|{"addressCountry"...|[twoWheeledVehicle]|                  3|         cmp|Avenida de Montev...|{"type": "Point",...|                 7|             10|[urn:ngsi-ld:Vehi...|
|urn:ngsi-ld:OnStr...|OnStreetParking|{"addressCountry"...|[twoWheeledVehicle]|                  7|         cmp| Rua da Constituição|{"type": "Point",...|  

## Transformations

In [None]:

location_schema = StructType([
    StructField('type', StringType(), True),
    StructField('coordinates', ArrayType(StringType()), True),
    StructField('metadata', StructType([]), True)
])

# Extract the JSON from the 'location' column
df = df.withColumn('location_json', from_json(col('location'), location_schema))

# Select and create new columns from the extracted JSON
df = df.withColumn('latitude', col('location_json.coordinates').getItem(0)) \
       .withColumn('longitude', col('location_json.coordinates').getItem(1))

df = df.drop('location')

# Strip spaces from the description column
df = df.withColumn('description', trim(col('description')))


In [None]:
# Show the DataFrame schema and content
df_to_agg = df.select("description", 'latitude', 'longitude', "availableSpotNumber", "occupiedSpotNumber","totalSpotNumber")

In [None]:
# Aggregate spots by location, round means
df_agg = df_to_agg.groupBy('description', 'latitude', 'longitude').agg(
    mean('availableSpotNumber').alias('mean_available_spots'),
    mean('occupiedSpotNumber').alias('mean_occupied_spots'),
    median('availableSpotNumber').alias('median_available_spots'),
    median('occupiedSpotNumber').alias('median_occupied_spots'),
)
df_agg.show(50, truncate=False)

[Stage 3:>                                                          (0 + 1) / 1]

+-----------------------------------+------------+------------+--------------------+-------------------+----------------------+---------------------+
|description                        |latitude    |longitude   |mean_available_spots|mean_occupied_spots|median_available_spots|median_occupied_spots|
+-----------------------------------+------------+------------+--------------------+-------------------+----------------------+---------------------+
|                                   |-8.662276771|41.150084663|10.0                |0.0                |10.0                  |0.0                  |
|Alameda da Cruz Vermelha Portuguesa|-8.574213738|41.173690905|10.0                |0.0                |10.0                  |0.0                  |
|Alameda das Antas                  |-8.58632669 |41.163312325|10.0                |0.0                |10.0                  |0.0                  |
|Alameda de Eça de Queirós          |-8.593701737|41.162499468|7.0                 |3.0             

                                                                                

In [None]:
# Create a postgres connection
import psycopg2

# PostgreSQL connection details
host = 'localhost'
port = 5432
database = 'postgres'
user = 'postgres'
password = 'password123'

# Create a connection
conn = psycopg2.connect(
    host=host,
    port=port,
    database=database,
    user=user,
    password=password
)

# Test the connection
if conn.status == psycopg2.extensions.STATUS_READY:
    print("Connected to PostgreSQL successfully!")
else:
    print("Failed to connect to PostgreSQL.")
c

Connected to PostgreSQL successfully!


In [None]:
# Create a sql table matching spark dataframe with autoincremental ID
create_table_query = """
CREATE TABLE IF NOT EXISTS parking_spots (
    id SERIAL PRIMARY KEY,
    description VARCHAR(255),
    latitude VARCHAR(255),
    longitude VARCHAR(255),
    mean_available_spots FLOAT,
    mean_occupied_spots FLOAT,
    median_available_spots FLOAT,
    median_occupied_spots FLOAT
);
"""

In [None]:
# Close connection to database
conn.close()

In [None]:
spark.stop()

In [None]:
from pyspark.sql import SparkSession

# Path to the PostgreSQL JDBC driver JAR file
jdbc_jar_path = '/opt/spark-3.5.1-bin-hadoop3/jars/postgresql-42.7.3.jar'

# Create Spark session with the correct path to the JAR file
spark = SparkSession.builder \
    .appName('PostgreSQL Connection') \
    .config('spark.jars', jdbc_jar_path) \
    .getOrCreate()

# Configure PostgreSQL connection details
host = 'localhost'
port = 5432
database = 'postgres'
user = 'postgres'
password = 'password123'

# Define the JDBC URL
url = f'jdbc:postgresql://{host}:{port}/{database}'

# Define the table name
table_name = 'parking_spots'

# Assuming df_agg is already defined somewhere in your script.
# Example: df_agg = spark.read.csv('path_to_csv_file').groupBy(...).agg(...)

# Write the df_agg DataFrame to the specified PostgreSQL table
df_agg.write \
    .format('jdbc') \
    .option('url', url) \
    .option('dbtable', table_name) \
    .option('user', user) \
    .option('password', password) \
    .option('driver', 'org.postgresql.Driver') \
    .mode('overwrite')\
    .save()

Py4JJavaError: An error occurred while calling o301.save.
: java.lang.ClassNotFoundException: org.postgresql.Driver
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:103)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:254)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:258)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:47)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
