In [0]:
import json
import requests
import logging
import time
from datetime import datetime
from azure.storage.blob import BlobServiceClient

In [0]:
'''
Logging configuration and initialization, reduced for external libraries.
'''

logging.basicConfig(
    format="%(asctime)s - %(levelname)s: %(message)s",
    level=logging.INFO,
    handlers=[logging.StreamHandler()]
)

for lib in ["azure", "urllib3", "py4j"]:
    logging.getLogger(lib).setLevel(logging.WARNING)

logger = logging.getLogger()
logger.setLevel(logging.INFO)

In [0]:
'''
Fetching access data from Databricks secrets.
'''

AZURE_CONNECTION_STRING = dbutils.secrets.get("dev_secrets", "storageconnection")
API_KEY_OPENAQ = dbutils.secrets.get("dev_secrets", "openaq")

In [0]:
'''
Initializing the client for handling Azure Blob Storage.
'''

CONTAINER_NAME = "data"
blob_service = BlobServiceClient.from_connection_string(AZURE_CONNECTION_STRING)
container_client = blob_service.get_container_client(CONTAINER_NAME)

In [0]:
'''
Definiton of paths, API endpoints and query parameters.
'''

LOCATIONS_PATH = "openaq-locations-data"
SENSORS_PATH = "openaq-sensors-data"
API_LOCATIONS_URL = "https://api.openaq.org/v3/locations"
API_SENSORS_URL = "https://api.openaq.org/v3/sensors"
COUNTRY_CODE = "PL"
PAGE_LIMIT = 1000
REQUEST_TIMEOUT = 10

In [0]:
'''
Supported locations and air quality parameters.
'''

location_ids = {
    "7253", "10646", "9635", "7250", "7961", "9701", "9343", "10691", "7261", "10679", "10760", "10659", "9273", "10707", "10570", "9692", "10625", "10605", "7210", "7239", "9505", "7211", "9739", "9324", "10743", "10744", "10575", "7264", "10631", "6387", "7251", "10720", "7785", "10573", "10587", "10589", "9699", "6384", "9715", "9725", "7263", "9743", "41727", "9782", "7245", "6386", "10511", "6382", "9784", "10454", "7252", "7207", "9042", "10618", "6351", "10593"
}
air_quality_parameters = {"pm10", "no2", "o3"}

In [0]:
def get_latest_blob_data(path: str):
    '''
    Retrieves the latest JSON file from Azure Blob Storage.

    Args:
        path (str): Azure Blob Storage path.

    Returns:
        dict or None: JSON data or None if no data found.
    '''
    try:
        # Fetch the list of all files (blobs) in the given path.
        blob_list = list(container_client.list_blobs(name_starts_with=path))

        if not blob_list:
            logger.info(f"No existing data found in Azure Blob Storage for path: {path}")
            return None

        # Identify the latest file based on the last modified date.
        latest_blob = max(blob_list, key=lambda b: b.last_modified)

        blob_client = container_client.get_blob_client(latest_blob.name)

        # Fetch the file content as JSON and convert it into a Python dictionary.
        blob_data = json.loads(blob_client.download_blob().readall())

        logger.info(f"Latest data file retrieved: {latest_blob.name}")
        return blob_data

    except Exception as e:
        logger.exception(f"Error retrieving latest blob data from {path}: {e}")
        return None

In [0]:
def save_location_to_blob(location, is_active=True):
    '''
    Saves a single location as an individual JSON file in Azure Blob Storage.

    Args:
        location (dict): Location data to be saved.
        is_active (bool, optional): Indicates if the sensor is active. Default=True.
    '''
    try:
        location_id = str(location["id"])
        location["is_active"] = is_active

        # Format of the file name: "openaq-locations-data/location_<ID>.json"
        blob_name = f"{LOCATIONS_PATH}/location_{location_id}.json"
        container_client.upload_blob(blob_name, json.dumps(location, indent=2), overwrite=True)

        logger.debug(f"Saved location {location_id} to Azure Blob Storage: {blob_name}")
    except Exception as e:
        logger.error(f"Error saving location {location_id} to Azure Blob Storage: {e}")

In [0]:
def save_sensor_to_blob(sensor, is_active=True):
    '''
    Saves a single sensor as an individual JSON file in Azure Blob Storage.

    Args:
        sensor (dict): Sensor data to be saved.
        is_active (bool, optional): Indicates if the sensor is active. Default=True.
    '''
    try:
        sensor_id = str(sensor["sensor_id"])
        parameter = sensor["parameter"].lower()
        location_id = str(sensor["location_id"])
        
        sensor_data = {
            "sensor_id": sensor_id,
            "location_id": location_id,
            "parameter": parameter,
            "is_active": is_active
        }

        blob_name = f"{SENSORS_PATH}/sensor_{parameter}_{sensor_id}.json"
        container_client.upload_blob(blob_name, json.dumps(sensor, indent=2), overwrite=True)

        logger.debug(f"Saved sensor {sensor_id} ({parameter}) to Azure Blob Storage: {blob_name}")

    except Exception as e:
        logger.error(f"Error saving sensor {sensor_id} ({parameter}) to Azure Blob Storage: {e}")


In [0]:
'''
Creating a global HTTP session for connection optimization.
'''

session = requests.Session()

In [0]:
def fetch_locations(existing_locations):
    '''
    Fetches new location data from the OpenAQ API.

    Args:
        existing_locations (set): Set of stored location IDs.

    Returns:
        list: List of new locations.
    '''
    retry_attempts = 3
    new_locations = []

    while retry_attempts > 0:
        try:
            logger.info("Fetching new location data from OpenAQ API.")

            # Constructing the request URL for the OpenAQ API.
            url = f"{API_LOCATIONS_URL}?iso={COUNTRY_CODE}&limit={PAGE_LIMIT}"
            # Sending a GET request to the API with an authorization key.
            response = session.get(url, headers={"X-API-Key": API_KEY_OPENAQ}, timeout=REQUEST_TIMEOUT)

            if response.status_code == 429:
                retry_attempts -= 1
                if retry_attempts == 0:  
                    logger.error("Rate limit (429) while fetching locations exceeded max retries. Skipping.")
                    break
                logger.warning("Rate limit hit (429) while fetching locations. Retrying in 10 seconds...")
                time.sleep(10)
                continue

            response.raise_for_status()

            results = response.json().get("results", [])

            # Create a set of location IDs returned by the API.
            api_location_ids = {str(loc.get("id")) for loc in results}
            missing_ids = location_ids - api_location_ids

            for loc_id in missing_ids:
                logger.warning(f"Location {loc_id} is in `location_ids` but not in API response!")

            for loc in results:
                if str(loc.get("id")) in location_ids and str(loc.get("id")) not in existing_locations:
                    new_locations.append(loc)

            logger.info(f"Retrieved {len(new_locations)} new locations.")
            return new_locations  

        except requests.exceptions.RequestException as e:
            logger.exception(f"Error fetching location data: {e}")
            return []

In [0]:
def process_locations(existing_locations):
    '''
    Fetches and saves new locations to Azure Blob Storage.
    Deletes JSON files of locations that were removed from 'location_ids'.

    Args:
        existing_locations (set): Set of location IDs currently stored in Azure Blob Storage.
    '''
    removed_locations = existing_locations - location_ids
    saved_locations = []

    for loc_id in removed_locations:
        blob_name = f"{LOCATIONS_PATH}/location_{loc_id}.json"
        blob_client = container_client.get_blob_client(blob_name)
        try:
            location_data = json.loads(blob_client.download_blob().readall())
            
            if not location_data.get("is_active", True):
                continue

            location_data["is_active"] = False
            save_location_to_blob(location_data, is_active=False)
            logger.warning(f"Marked location {loc_id} as inactive in Azure Blob Storage.")
        except Exception as e:
            logger.error(f"Error marking location {loc_id} as inactive: {e}")

    new_locations = fetch_locations(existing_locations)

    for location in new_locations:
        save_location_to_blob(location, is_active=True)
        saved_locations.append(location["id"])

    if saved_locations:
        logger.info(f"Saved {len(saved_locations)} new locations to Azure Blob Storage: {sorted(saved_locations)}")
    else:
        logger.info("No new locations saved.")

In [0]:
def load_locations_from_blob():
    '''
    Loads location data from Azure Blob Storage by reading every JSON file.

    Returns:
        set: Set of location IDs.
    '''
    all_locations = set()

    try:
        blob_list = list(container_client.list_blobs(name_starts_with=LOCATIONS_PATH))

        if not blob_list:
            logger.info(f"No location data found in Azure Blob Storage for path: {LOCATIONS_PATH}")
            return all_locations

        for blob in blob_list:
            blob_client = container_client.get_blob_client(blob.name)
            blob_data = json.loads(blob_client.download_blob().readall())

            for loc in blob_data:
                if "id" in loc:
                    all_locations.add(str(loc["id"]))

        logger.info(f"Loaded {len(all_locations)} location IDs from all JSON files.")
        return all_locations

    except Exception as e:
        logger.exception(f"Error retrieving all location data from {LOCATIONS_PATH}: {e}")
        return all_locations

In [0]:
def load_sensors_from_blob():
    """
    Loads all sensor data from Azure Blob Storage by reading every JSON file.

    Returns:
        set: Set of sensor IDs.
    """
    all_sensors = set()

    try:
        blob_list = list(container_client.list_blobs(name_starts_with=SENSORS_PATH))

        if not blob_list:
            logger.info(f"No sensor data found in Azure Blob Storage for path: {SENSORS_PATH}")
            return all_sensors

        for blob in blob_list:
            blob_client = container_client.get_blob_client(blob.name)
            blob_data = json.loads(blob_client.download_blob().readall())

            for sensor in blob_data:
                if "sensor_id" in sensor:
                    all_sensors.add(str(sensor["sensor_id"]))

        logger.info(f"Loaded {len(all_sensors)} sensor IDs from all JSON files.")
        return all_sensors

    except Exception as e:
        logger.exception(f"Error retrieving all sensor data from {SENSORS_PATH}: {e}")
        return all_sensors

In [0]:
def fetch_sensors_for_all_locations(existing_sensors):
    '''
    Fetches new sensor data for locations.

    Args:
        existing_sensors (set): Set of stored sensor IDs.

    Returns:
        list: List of new sensors.
    '''
    all_sensors = []
    existing_sensor_ids = {str(s) for s in existing_sensors}

    for loc_id in location_ids:
        retry_attempts = 3

        while retry_attempts > 0:
            try:
                url = f"{API_LOCATIONS_URL}/{loc_id}"
                response = session.get(url, headers={"X-API-Key": API_KEY_OPENAQ}, timeout=REQUEST_TIMEOUT)

                if response.status_code == 429:
                    retry_attempts -= 1
                    if retry_attempts == 0:  
                        logger.error(f"Rate limit (429) for location {loc_id} exceeded max retries. Skipping.")
                        break
                    logger.warning(f"Rate limit hit (429) for location {loc_id}. Retrying in 60 seconds...")
                    time.sleep(60)
                    continue

                response.raise_for_status()
                location_data = response.json().get("results", [])
                if not location_data:
                    logger.warning(f"No data found for location {loc_id}.")
                    break

                sensors = location_data[0].get("sensors", [])

                new_sensors = [
                    {
                        "sensor_id": s["id"],
                        "parameter": s["parameter"]["name"].lower(),
                        "location_id": loc_id
                    }
                    for s in sensors
                    if s.get("parameter", {}).get("name", "").lower() in air_quality_parameters
                    and str(s["id"]) not in existing_sensor_ids
                ]

                if new_sensors:
                    all_sensors.extend(new_sensors)

                break

            except requests.exceptions.RequestException as e:
                logger.error(f"Error fetching sensors for location {loc_id}: {e}")
                break  

    logger.info(f"Retrieved {len(all_sensors)} new sensors.")
    return all_sensors

In [0]:
def process_sensors(existing_sensors, existing_locations):
    '''
    Fetches and saves only new sensors to Azure Blob Storage.
    Deletes only sensors that belong to removed locations.

    Args:
        existing_sensors (dict): Dictionary mapping sensor IDs to their respective Azure Blob Storage file paths.
        existing_locations (set): Set of all location IDs currently stored in Azure Blob Storage.
    '''
    removed_sensors = []
    saved_sensors = []

    for sensor_id, sensor_file in list(existing_sensors.items()):
        blob_client = container_client.get_blob_client(sensor_file)
        try:
            sensor_data = json.loads(blob_client.download_blob().readall())
            sensor_location_id = str(sensor_data.get("location_id", ""))

            if sensor_location_id not in location_ids:
                if not sensor_data.get("is_active", True):
                    continue
                
                sensor_data["is_active"] = False
                save_sensor_to_blob(sensor_data, is_active=False)
                removed_sensors.append(sensor_id)
                logger.warning(f"Marked sensor {sensor_id} as inactive in Azure Blob Storage.")
        except Exception as e:
            logger.error(f"Error marking sensor {sensor_id} as inactive: {e}")

    new_sensors = fetch_sensors_for_all_locations(existing_sensors.keys())

    for sensor in new_sensors:
        save_sensor_to_blob(sensor, is_active=True)
        saved_sensors.append(sensor["sensor_id"])

    if saved_sensors:
        logger.info(f"Saved {len(saved_sensors)} new sensors: {sorted(saved_sensors)}")
    else:
        logger.info("No new sensors saved.")

In [0]:
def main():

    existing_location_files = list(container_client.list_blobs(name_starts_with=LOCATIONS_PATH))
    existing_location_ids = {blob.name.split("_")[-1].split(".")[0] for blob in existing_location_files}

    process_locations(existing_location_ids)

    existing_sensor_files = list(container_client.list_blobs(name_starts_with=SENSORS_PATH))
    existing_sensor_data = {blob.name.split("_")[-1].split(".")[0]: blob.name for blob in existing_sensor_files}

    process_sensors(existing_sensor_data, existing_location_ids)

In [0]:
main()