Processamento dei dati json scaricati in csv

In [4]:
import json
import psycopg2
import datetime
import logging
import os
from dotenv import load_dotenv
from azure.storage.blob import BlobServiceClient
from psycopg2.extras import execute_values

def safe_int(value):
    try:
        return int(value)
    except (ValueError, TypeError):
        return None

def safe_fromtimestamp(ts):
    try:
        # Assicurati che ts sia int e positivo (Unix epoch)
        if ts is None or ts <= 0:
            return None
        return datetime.datetime.fromtimestamp(int(ts)).strftime('%Y-%m-%d %H:%M:%S')
    except Exception:
        return None

# Configure logging
logging.basicConfig(level=logging.INFO)
load_dotenv()

ModuleNotFoundError: No module named 'dotenv'

In [2]:
print(os.getenv("POSTGRES_HOST"))

# PostgreSQL connection
conn = psycopg2.connect(
    host = os.getenv("POSTGRES_HOST"),
    port = os.getenv("POSTGRES_PORT"),
    dbname = os.getenv("POSTGRES_USER"),
    user = os.getenv("POSTGRES_PASSWORD"),
    password = os.getenv("POSTGRES_DB")
)

# Azure Blob Storage connection
connection_string = os.getenv("BLOBSTORAGE_CONNECTIONSTRING")
container_name = os.getenv("BLOBSTORAGE_CONTAINERNAME")

blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container_name)

None


OperationalError: connection to server at "localhost" (::1), port 5432 failed: Connection refused (0x0000274D/10061)
	Is the server running on that host and accepting TCP/IP connections?
connection to server at "localhost" (127.0.0.1), port 5432 failed: Connection refused (0x0000274D/10061)
	Is the server running on that host and accepting TCP/IP connections?


In [None]:
cursor = conn.cursor()
cursor.execute("DELETE FROM railway.timetable")
conn.commit()
logging.info("Deleted all records from railway.timetable")

cursor = conn.cursor()
cursor.execute("DELETE FROM railway.train")
conn.commit()
logging.info("Deleted all records from railway.train")

cursor = conn.cursor()
cursor.execute("DELETE FROM railway.station")
conn.commit()
logging.info("Deleted all records from railway.station")

In [None]:
import time
import psycopg2
from geopy.geocoders import Nominatim

# === Inizializza geolocalizzatore ===
geolocator = Nominatim(user_agent="station_mapper")

def get_city_region(station_name):
    """Deduci città e regione da nome stazione tramite Nominatim."""
    try:
        location = geolocator.geocode(f"{station_name}, Italy", timeout=10)
        if location:
            address = geolocator.reverse((location.latitude, location.longitude), language="it").raw.get('address', {})
            city = address.get('city') or address.get('town') or address.get('village') or ''
            region = address.get('state', '')
            return city, region
    except Exception as e:
        print(f"Geocoding error for '{station_name}': {e}")
    return '', ''

# === Connessione DB ===
conn = psycopg2.connect(
    dbname="postgres",
    user="uniud",
    password="AndSpo166930!!",
    host="datascienceuniud.postgres.database.azure.com",
    port="5432"
)
cursor = conn.cursor()

# === Estrai stazioni incomplete ===
cursor.execute("""
    SELECT id, name FROM railway.station
    WHERE (city IS NULL OR city = '')
      AND (region IS NULL OR region = '');
""")
stations_to_update = cursor.fetchall()

print(f"Found {len(stations_to_update)} station(s) to update.\n")

# === Aggiorna ciascuna stazione ===
for station_id, name in stations_to_update:
    print(f"Looking up: {name}")
    city, region = get_city_region(name)

    if city or region:
        cursor.execute("""
            UPDATE railway.station
            SET city = %s, region = %s
            WHERE id = %s;
        """, (city, region, station_id))
        conn.commit()
        print(f"Updated station ID {station_id} with city='{city}', region='{region}'")
    else:
        print(f"Could not find location info for '{name}'")

    time.sleep(1)  # Rispetta rate limit Nominatim

cursor.close()
conn.close()

print("\nAll done.")


In [None]:
blobName_list = container_client.list_blob_names()

for i, blobName in enumerate(blobName_list):

    # === Extract unique station names and train info ===
    stations = {}
    trains = {}
    timetable = []
    
    print(f"Processing blob: {blobName}")
    blobClient = blob_service_client.get_blob_client(container=container_name, blob=blobName)
    blobBytes = blobClient.download_blob().readall()

    try:
        data = json.loads(blobBytes)
        trainsJson = data.get("treni", [])
        

        for train in trainsJson:
            trainNumber = train.get("n")
            trainType = train.get("c")
            cancellationMessage = train.get("dl")

            if trainNumber and trainNumber not in trains:
                trains[trainNumber] = {
                    "id": trainNumber,
                    "type": trainType
                }

            for s, stop in enumerate(train.get("fr", [])):
                name = stop.get("n")
                if name and name not in stations:
                    stations[name] = {
                        "name": name,
                        "city": "",
                        "region": ""
                    }

                arrival_delay = None if stop.get("ra") in ["N", "S", "n.d."] else safe_int(stop.get("ra"))
                departure_delay = None if stop.get("rp") in ["N", "S", "n.d."] else safe_int(stop.get("rp"))
                arrival_time = None if stop.get("oa") in [0, "0", "n.d."] else safe_int(stop.get("oa"))
                departure_time = None if stop.get("op") in [0, "0", "n.d."] else safe_int(stop.get("op"))
                deleted = True if (stop.get("op") == 0 and cancellationMessage) else False

                timetable.append({
                    "train_id": trainNumber,
                    "stop_number": s,
                    "day_number": i,
                    "station_name": name,
                    "arrival_datetime": safe_fromtimestamp(arrival_time),
                    "departure_datetime": safe_fromtimestamp(departure_time),
                    "arrival_delay": arrival_delay,
                    "departure_delay": departure_delay,
                    "deleted": deleted
                })

    except Exception as e:
        print(f"Error parsing file {blobName}: {e}")

    # === Insert new stations only ===
    print(f"Stations to insert: {len(stations)}")

    cursor = conn.cursor()

    insert_station_sql = """
    INSERT INTO railway.station (name, city, region)
    VALUES %s
    ON CONFLICT (name) DO NOTHING;
    """

    station_to_insert = [(s['name'], s['city'], s['region']) for s in stations.values()]

    execute_values(cursor, insert_station_sql, station_to_insert)
    conn.commit()

    print("Stations commited")

    # === Insert new trains only ===
    print(f"Trains to insert: {len(trains)}")

    cursor = conn.cursor()

    insert_train_sql = """
    INSERT INTO railway.train (id, type)
    VALUES %s
    ON CONFLICT (id) DO NOTHING;
    """

    train_to_insert = [(t['id'], t['type']) for t in trains.values()]

    execute_values(cursor, insert_train_sql, train_to_insert)
    conn.commit()

    print("Trains commited")

    # === Recupera mapping nome -> id stazione dal DB ===
    cursor.execute("SELECT id, name FROM railway.station")
    station_rows = cursor.fetchall()
    station_name_to_id = {name: sid for sid, name in station_rows}

    # === Costruisci i dati da inserire nella tabella timetable ===
    timetable_to_insert = []

    for row in timetable:
        station_id = station_name_to_id.get(row["station_name"])
        if not station_id:
            continue

        timetable_to_insert.append((
            station_id,
            row["train_id"],
            row["stop_number"],
            row["day_number"],
            row["arrival_datetime"],
            row["departure_datetime"],
            row["arrival_delay"],
            row["departure_delay"],
            row["deleted"]
        ))

    # === Inserisci nella tabella railway.timetable ===
    print(f"Timetable records to insert: {len(timetable_to_insert)}")

    cursor = conn.cursor()

    insert_timetable_sql = """
    INSERT INTO railway.timetable (
        station_id,
        train_id,
        stop_number,
        day_number,
        arrival_datetime,
        departure_datetime,
        arrival_delay,
        departure_delay,
        deleted
    ) VALUES %s
    ON CONFLICT DO NOTHING;
    """

    execute_values(cursor, insert_timetable_sql, timetable_to_insert)
    conn.commit()

    print("Timetable commited")

    cursor.close()

conn.close()

In [None]:
import json
import pandas as pd
from azure.storage.blob import BlobServiceClient
from io import BytesIO
import psycopg2
from psycopg2.extras import execute_values

# === Azure Blob Storage connection ===
connection_string = "DefaultEndpointsProtocol=https;AccountName=spolaorandreapersonal;AccountKey=2PYZixgZHGr9yNowk+nZ6d2XfGqwpuR/Gd4XYxw50gSLfckTD9xMl5ExkHUyZ7OmzahrvD7bQ2wE+AStlxVKCQ==;EndpointSuffix=core.windows.net"
container_name = "datascience"

blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container_name)
blobName_list = container_client.list_blob_names()

# === PostgreSQL connection ===
conn = psycopg2.connect(
    dbname="postgres",
    user="uniud",
    password="AndSpo166930!!",
    host="datascienceuniud.postgres.database.azure.com",
    port="5432"
)
cursor = conn.cursor()

# === Extract unique station names ===
stations = {}

for i, blobName in enumerate(blobName_list):
    if i >= 1:
        break
        
    print(f"Processing blob: {blobName}")
    blobClient = blob_service_client.get_blob_client(container=container_name, blob=blobName)
    blobBytes = blobClient.download_blob().readall()

    try:
        data = json.loads(blobBytes)
        trainNumber = data.get("n")
        trains = data.get("treni", [])
        for train in trains:
            for stop in train.get("fr", []):
                name = stop.get("n")
                arrivalTime = stop.get("oa")
                departureTime = stop.get("op")
                arrivalDelay = if stop.get("ra")
                departureDelay = stop.get("rp")

                # === Insert new stations only ===
                insert_query = """
                INSERT INTO railway.station (name, city, region)
                VALUES ({}, {}, {}, {})
                ON CONFLICT (name) DO NOTHING;
                ""
                
    except Exception as e:
        print(f"Error parsing file {blobName}: {e}")







execute_values(cursor, insert_query, records_to_insert)
conn.commit()

print(f"{cursor.rowcount} new station(s) inserted. Existing ones were skipped.")

cursor.close()
conn.close()
