# **Generation SG Junior Data Engineer Programme**
### **Interim Project presented by DPPS Team (5)**<br><span style="color:darkblue; font-weight:bold;">Members: Daniel | Pin Pin, Yvonne | Pin Yean, Erica | Shawn</span>


### <span style="color:darkblue; font-weight:bold;">API Data Extraction - PM2.5 across Singapore</span>
<div>This document demonstrated how the team made requests and worked with APIs in Python using common libraries to extract a single source of truth data from OpenAPI, Data.Gov.SG for our project that offers the following benefits:</div>

- **Real-time data access**: retrieve up-to-date data on demand
- **Large datasets**: integrate data from multiple sources
- **Pre-processed data**: saving significant time and resources


### **Understanding API Documentation**
When working with APIs, consulting the documentation is crucial for making successful requests. It includes:
- available endpoints
- required parameters
- authentication methods
- expected response formats
- license of the Open Data (free or paid, personal or commercial, etc)


### **Evaluating The Options Available**
<div>We accessed different APIs in Data.Gov.SG and noticed it uses <em>REST API</em> (Representational State Transfer Application Programming Interface), a popular web service that follows the <em>REST</em> architectural style, allowing applications to communicate with each other by exchanging data through standardized methods, typically using HTTP requests to access and manipulate resources on a server.</div><br>

**API request and response models:**
- **Request body**: JSON (JavaScript Object Notation) format
- **200**: Everything went okay and the result has been returned (if any)
- **400**: Server thinks we made a bad request when we send incorrect data or make other client-side errors
- **404**: Resource we tried to access wasn't found on server


### **Making GET Request To The API**
<div>We initiated a <b>GET Request</b> to the Singapore government's open data API endpoint, specifically targeting  real-time PM2.5 atmospheric particulate data. The retrieved JSON response provided comprehensive information about microscopic particles and droplets measuring 2.5 micrometres or less in diameter, captured across multiple monitoring locations. The acquired dataset enables detailed analysis of:</div>

1. The comprehensive network of air quality monitoring stations, including their total number and strategic placement
2. Precise geographical distribution of monitoring infrastructure
3. Instantaneous PM2.5 concentration readings for each monitored location
4. Exact timestamps associated with each measurement

By meticulously examining this JSON dataset, we can derive critical insights into the sophistication and extensive coverage of Singapore's environmental monitoring system, revealing the intricate dynamics of airborne particulate matter across the city-state's diverse urban and suburban landscapes.

**[Link](https://api-open.data.gov.sg/v2/real-time/api/pm25)**


<span style="color:darkgreen; font-weight:bold;">JSON code</span>

In [None]:
{
  "code": 0,
  "data": {
    "regionMetadata": [
      {
        "name": "west",
        "labelLocation": {
          "latitude": 1.35735,
          "longitude": 103.7
        }
      },
      {
        "name": "east",
        "labelLocation": {
          "latitude": 1.35735,
          "longitude": 103.94
        }
      },
      {
        "name": "central",
        "labelLocation": {
          "latitude": 1.35735,
          "longitude": 103.82
        }
      },
      {
        "name": "south",
        "labelLocation": {
          "latitude": 1.29587,
          "longitude": 103.82
        }
      },
      {
        "name": "north",
        "labelLocation": {
          "latitude": 1.41803,
          "longitude": 103.82
        }
      }
    ],
    "items": [
      {
        "date": "2024-11-30",
        "updatedTimestamp": "2024-11-30T21:15:41+08:00",
        "timestamp": "2024-11-30T21:00:00+08:00",
        "readings": {
          "pm25_one_hourly": {
            "west": 6,
            "east": 11,
            "central": 24,
            "south": 12,
            "north": 9
          }
        }
      }
    ]
  },
  "errorMsg": ""
}

### **Python Libraries Used**
We selected these libraries together as it provide a powerful toolkit for data processing, web interaction, database operations and parallel execution in Python.
- **Requests**: used for making HTTP requests in Python
- **Pandas**: used for data manipulation for effectively handling structured data
- **Datetime and Timedelta**: used for working with dates, times and time intervals
- **SQLAlchemy**: a SQL toolkit and Object-Relational Mapping (ORM) to connect to relational databases
- **Logging**: provides a flexible framework for generating log messages
- **Concurrent.futures**: used for parallelizing tasks 

In [None]:
# Install the following packages in your Anaconda Prompt or Terminal:
conda install request
conda install pandas
conda install sqlalchemy

# Datetime, Logging and Concurrent Futures are standard libraries included

### **Accessing Data**
During the development of our project's next phase, we designed a specialized script to facilitate robust API data ingestion. This script represents a critical component of our data pipeline, enabling efficient and reliable extraction of essential project information through systematic API interaction.

Key Architectural Advantages:
1. **Comprehensive Data Collection**: Systematically retrieves hourly air temperature data, enabling in-depth analytical capabilities
2. **Parallel Processing Architecture**: Utilizes ThreadPoolExecutor to facilitate concurrent data fetching, substantially minimizing overall execution time
3. **Robust Error Management**: Implements sophisticated error handling through try-except mechanisms, providing granular logging and debugging capabilities
4. **Data Integrity Enforcement**: Eliminates duplicate entries proactively, ensuring clean and accurate datasets
5. **Modular and Maintainable Design**: Structures code with distinct, purpose-driven functions to enhance readability and long-term maintainability
6. **Seamless Database Integration**: Efficiently stores processed data in PostgreSQL, maintaining optimal data persistence
7. **Comprehensive Data Validation**: Conducts post-insertion record verification for immediate data quality assurance
8. **Flexible Date Range Processing**: Supports configurable start and end dates for precise data collection periods
9. **Advanced Data Transformation**: Leverages pandas for sophisticated data manipulation and structural refinement
10. **API Interaction Resilience**: Gracefully manages potential API failures through intelligent error capture and logging
11. **Enterprise-Grade Scalability**: Designed to effortlessly handle extensive date ranges and high-frequency data acquisition
12. **Universal Adaptability**: Easily customizable for diverse API data collection scenarios
13. **Automated Workflow Capabilities**: Enables autonomous, scheduled data collection and storage processes
14. **Standardized Data Normalization**: Transforms raw API responses into consistent, structured formats before database insertion


In [None]:
import requests
import pandas as pd
from datetime import datetime, timedelta
from sqlalchemy import create_engine, text

# Constants Definition
API_URL = "https://api-open.data.gov.sg/v2/real-time/api/pm25"
DB_USER = 'postgres'            # Database username
DB_PASS = 'password'            # Database password
DB_HOST = 'localhost'           # Database server address
DB_PORT = '5432'                # Database port number, default for Postgre
DB_NAME = 'data_gov_project_2'  # Name of database for connection
START_DATE = datetime(2024, 11, 1)  # Adjust to one year ago (yyyy, mm, dd)
END_DATE = datetime(2024, 11, 7)  # Today's date or desired end date (yyyy, mm, dd)

def fetch_pm25_data_for_date(api_url, date):
    """Fetch PM2.5 data for a specific date."""
    try: # Handle potential errors during data fetching
        response = requests.get(f"{api_url}?date={date}") # Performs a GET request appending date
        if response.status_code == 200: # Checks the response status
            json_data = response.json() # If request is successful, response id parsed as JSON
            items = json_data.get("data", {}).get("items", []) # Retrieves items key from data section of JSON. Return empty list if it doesn't exist
            region_metadata = json_data.get("data", {}).get("regionMetadata", []) # Retrieves metadata 
            return items, region_metadata # If data is fetched successfully, the function returns a tuple containing items and region metadata
        # If the status code is not 200, an error message is printed indicating failure and function returns 'None' for both values
        else:
            print(f"Failed to fetch data for {date}. Status code: {response.status_code}")
            return None, None
    # Catches any exceptions that may occur during request or data processing, prints an error message and returns 'None' for both values
    except Exception as e:
        print(f"Error fetching data for {date}: {e}")
        return None, None

def process_items(items, region_metadata):
    """Process the fetched items and store PM2.5 readings in a DataFrame."""
    data_frames = [] # An empty list to store DataFrames created for each item
    for item in items: # Iterates through each item in the items list
        timestamp = item.get('timestamp') # Timestamp for each item is extracted
        readings = item.get('readings', {}).get('pm25_one_hourly', {}) # Hourly readings are extracted, default to empty dictionary if don't exist
        
        for region, value in readings.items(): # Loop processes each region PM2.5 reading
            # Find region metadata
            region_info = next((r for r in region_metadata if r['name'].lower() == region.lower()), {}) # Extract metadata based on its name, case-insensitively
            latitude = region_info.get('labelLocation', {}).get('latitude') # Extract latitude
            longitude = region_info.get('labelLocation', {}).get('longitude') # Extract longtitude

            # Create a DataFrame for the reading
            df = pd.DataFrame({ # A new DataFrame is created for the following columns:
                'region': [region],
                'pm25_value': [value],
                'timestamp': [timestamp],
                'latitude': [latitude],
                'longitude': [longitude]
            })
            data_frames.append(df) # The newly DataFrame is appended to the data_frames list

    if data_frames: # Checks id any DataFrames were created
        return pd.concat(data_frames, ignore_index=True) # Concatenated into a single DataFrame and index is reset if there are DataFrame present
    else:
        return pd.DataFrame() # If no DaaFrames were created, an empty DataFrame is returned

def load_data_to_postgres(data_frame):
    """Load the provided DataFrame into the PostgreSQL database."""
    engine = create_engine(f'postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}') # Engine for connect PostgreSQL database
    try: # Handle potential errors that might occur during the data loading process
        data_frame.to_sql('pm25_readings', engine, if_exists='append', index=False) # Use Pandas method to load contents of DataFrame into table
        print(f"Successfully loaded {len(data_frame)} records to PostgreSQL.") # Print records if loading successful
    except Exception as e: # To catch any exceptions that may occur during the loading process
        print(f"Error loading data into PostgreSQL: {e}")

def verify_data_in_db():
    """
    Retrieves number of rows from 'pm25_readings' table to verify data was loaded successfully.
    """
    engine = create_engine(f'postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}') # Engine to connect to PostgreSQL databas
    try: # Handle potential errors that might occur during the data loading process
        with engine.connect() as connection: # Ensure connection is properly handled and closed after use
            result = connection.execute(text("SELECT COUNT(*) FROM pm25_readings")) # Executes SQL query to count total number of rows
            count = result.fetchone()[0] # Retrieves first row result
            print(f"Total records in 'pm25_readings' table: {count}")  # Show count of rows
    except Exception as e: # To catch any exceptions that may occur during the loading process
        print(f"Error verifying data in PostgreSQL: {e}")

def verify_database_connection():
    """Verifies the database connection by printing a message if successful."""
    engine = create_engine(f'postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}') # Engine for connect PostgreSQL database
    try: # Handle potential errors that might occur during the data loading process
        with engine.connect() as connection: # Ensure connection is properly handled and closed after use
            print("Database connection successful")
    except Exception as e: # To catch any exceptions that may occur during the loading process
        print(f"Error connecting to PostgreSQL: {e}")

# Function is created in view that SQL database has not created the table
def create_table():
    """Create the 'pm25_readings' table in the database."""
    engine = create_engine(f'postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}') # Engine for connect PostgreSQL database
    try: # Handle potential errors that might occur during the data loading process
        with engine.connect() as connection: # Ensure connection is properly handled and closed after use
            connection.execute(text(""" 
                CREATE TABLE IF NOT EXISTS pm25_readings (
                    id SERIAL PRIMARY KEY,
                    timestamp TIMESTAMP,
                    region VARCHAR(255),
                    pm25_value REAL,
                    latitude REAL,
                    longitude REAL
                )
            """))
            print("Table created successfully.")
    except Exception as e: # To catch any exceptions that may occur during the loading process
        print(f"Error creating table: {e}")

def main():
    print("Starting the script...")

    verify_database_connection()  # Verify database connection
    create_table()  # Ensure the table exists

    current_date = START_DATE # Initialize the value of Start_Date
    while current_date <= END_DATE: # Iterates over each date from current_date to end_date
        date_str = current_date.strftime("%Y-%m-%d") # Fetch data as string in the format of 'YYYY-MM-DD'
        print(f"Fetching data for {date_str}...")

        # Processing retrieved data
        items, region_metadata = fetch_pm25_data_for_date(API_URL, date_str)
        if items and region_metadata: # Checks if both items and region metadata were successfully retrieved
            data_frame = process_items(items, region_metadata) # If data available, processes items and region metadata into pandas DataFrame
            if not data_frame.empty: # Checks if the resulting DataFrame is not empty
                data_frame['timestamp'] = pd.to_datetime(data_frame['timestamp'])
                # Sort the DataFrame by timestamp in chronological order
                data_frame = data_frame.sort_values(by='timestamp', ascending=True) # Convert to datetime format
                load_data_to_postgres(data_frame)
            else: # If DataFrame is empty
                print(f"No data collected for {date_str}.")
        else: # If data fetching fails
            print(f"Failed to fetch data for {date_str}.")

        # Increment the date by one day
        current_date += timedelta(days=1) # Increase date by 1 day for next iteration of the loop
    
    print("Script completed.")

if __name__ == "__main__": # Running main function
    main()

### **PM2.5 Output Result**
<img src="https://raw.githubusercontent.com/YvonneLipLim/Images/main/PM25_Output.png" alt="Alt Text" width="800">