In [9]:
import requests
import json
import psycopg2
from psycopg2.extras import execute_values
from urllib.parse import urlencode

In [None]:
# 1. Fetch data from Dati Lombardia API
def fetch_data_from_api(api_url, limit=1000, order="Data DESC"):
    """
    Fetch data from the API with specified limit and order
    
    Parameters:
    - api_url: Base URL for the API
    - limit: Number of records to return (default: 1000)
    - order: Field and direction to sort by (default: "datastop DESC" for latest records)
    """
    # Construct query parameters
    params = {
        "$limit": limit,
        "$order": order
    }
    
    # Append parameters to URL
    full_url = f"{api_url}?{urlencode(params)}"
    print(f"Requesting data from: {full_url}")
    
    response = requests.get(full_url)
    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(f"API request failed with status code {response.status_code}")

In [3]:
# 2. Process API data and filter out problematic columns
def process_api_data(data_list):
    processed_data = []
    
    # List of columns to exclude
    columns_to_exclude = [":@computed_region_6hky_swhk"]
    
    for item in data_list:
        processed_item = {}
        
        # Copy only the desired fields, skipping problematic ones
        for key, value in item.items():
            # Skip excluded columns
            if key in columns_to_exclude:
                continue
                
            # Handle nested dictionaries by converting to JSON strings
            if isinstance(value, dict):
                processed_item[key] = json.dumps(value)
            else:
                processed_item[key] = value
                
        processed_data.append(processed_item)
        
    return processed_data

In [None]:
# 3. Connect to PostgreSQL database
def connect_to_postgres():
    conn = psycopg2.connect(
        host="localhost",       
        database="lombardia_air_quality", 
        user="airdata_user",    
        password="user"
    )
    return conn

In [5]:
# 4. Create table if it doesn't exist
def create_table_if_not_exists(conn, table_name, data_sample):
    cursor = conn.cursor()
    
    # Create schema SQL statement based on the data structure
    columns = []
    for key, value in data_sample.items():
        column_type = "TEXT"  # Default type
        
        # Try to infer the data type
        if isinstance(value, int):
            column_type = "INTEGER"
        elif isinstance(value, float):
            column_type = "NUMERIC"
        elif isinstance(value, bool):
            column_type = "BOOLEAN"
        elif isinstance(value, dict):
            column_type = "JSONB"  # Use JSONB for nested structures
        # Special case for timestamp fields
        elif key == "datastart" or key == "datastop":
            column_type = "TIMESTAMP"

        columns.append(f"\"{key}\" {column_type}")

    create_table_sql = f"""
    DROP TABLE IF EXISTS {table_name};
    CREATE TABLE {table_name} (
        id SERIAL PRIMARY KEY,
        {', '.join(columns)},
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );
    """
    
    cursor.execute(create_table_sql)
    conn.commit()
    cursor.close()

In [8]:
# 5. Insert data into table
def insert_data(conn, table_name, data_list):
    cursor = conn.cursor()
    
    if not data_list:
        print("No data to insert")
        return
    
    # Get column names from the first data item
    columns = list(data_list[0].keys())
    
    # Prepare values for insertion
    values = [[item.get(col) for col in columns] for item in data_list]
    
    # Create the SQL query
    insert_query = f"""
    INSERT INTO {table_name} ({', '.join([f'"{col}"' for col in columns])})
    VALUES %s
    """
    
    # Execute the query with all values
    execute_values(cursor, insert_query, values)
    
    conn.commit()
    print(f"Inserted {len(data_list)} records into {table_name}")
    cursor.close()

In [16]:

# API URL
api_url = "https://www.dati.lombardia.it/resource/g2hp-ar79.json"

# Define the table name for your data
table_name = "station"

try:
    # Fetch data from API (latest 1000 records)
    print("Fetching latest 1000 records from API...")
    raw_data = fetch_data_from_api(api_url, limit=1000, order="Data DESC")
    
    # Debug: Inspect the data structure and count
    print(f"Received {len(raw_data)} records from API")
    print("Sample data item structure:")
    if raw_data:
        print(json.dumps(raw_data[0], indent=2))
    
    # Process the data to handle nested structures and filter out problematic columns
    print("Processing data...")
    processed_data = process_api_data(raw_data)
    
    # Connect to PostgreSQL
    print("Connecting to PostgreSQL...")
    conn = connect_to_postgres()
    
    # Create table if it doesn't exist (using actual data to infer schema)
    print("Creating table if it doesn't exist...")
    if processed_data:
        create_table_if_not_exists(conn, table_name, processed_data[0])
    
    # Insert data into table
    print("Inserting data into table...")
    insert_data(conn, table_name, processed_data)
    
    # Close connection
    conn.close()
    print("Process completed successfully!")
    
except Exception as e:
    print(f"Error: {str(e)}")


Fetching latest 1000 records from API...
Requesting data from: https://www.dati.lombardia.it/resource/g2hp-ar79.json?%24limit=1000&%24order=Data+DESC
Received 1000 records from API
Sample data item structure:
{
  "idsensore": "30166",
  "data": "2025-01-01T00:00:00.000",
  "valore": "2.5",
  "stato": "VA",
  "idoperatore": "1"
}
Processing data...
Connecting to PostgreSQL...
Creating table if it doesn't exist...
Inserting data into table...
Inserted 1000 records into station
Process completed successfully!
