In [20]:
import requests
import pandas as pd
import os
from datetime import datetime, timedelta
from tqdm.notebook import tqdm  # Use tqdm from the notebook module
import asyncio
import aiohttp
import re

In [5]:
def check_missing_data(csv_file_path):
    # Load the CSV into a DataFrame
    df = pd.read_csv(csv_file_path)

    # Convert the 'Timestamp' column to datetime
    df['Timestamp'] = pd.to_datetime(df['Timestamp'])

    # Create a new column for the date part of the timestamp
    df['Date'] = df['Timestamp'].dt.date

    # Group by the date and count the number of rows per day
    daily_counts = df.groupby('Date').size()

    # Check for any days with missing data (less than 288 rows)
    missing_days = daily_counts[daily_counts < 288].index.tolist()

    return missing_days

In [15]:
async def get_data_async(date_time, data_set_name, session):
    base_url = f"https://api-open.data.gov.sg/v2/real-time/api/{data_set_name}?date={date_time}"
    all_data = []

    while True:
        # Asynchronously fetch the data
        async with session.get(base_url) as response:
            data = await response.json()
            readings = data.get('data', {}).get('readings', [])

            if readings:
                for reading in readings:
                    reading_data = reading.get('data', [])
                    for entry in reading_data:
                        entry['Timestamp'] = reading.get('timestamp')  # Add Timestamp for each entry
                        all_data.append(entry)
            else:
                break  # No readings, exit loop

            # Check for paginationToken and update URL for next request
            pagination_token = data.get('data', {}).get('paginationToken')
            if pagination_token:
                base_url = f"https://api-open.data.gov.sg/v2/real-time/api/{data_set_name}?date={date_time}&paginationToken={pagination_token}"
            else:
                break  # No more pages

    # Convert the collected data into a DataFrame
    df = pd.DataFrame(all_data)

    # Pivot table if data is available
    if not df.empty:
        pivot_table = df.pivot_table(index='Timestamp', columns='stationId', values='value', aggfunc='first')
        return pivot_table
    else:
        return pd.DataFrame()  # Return empty DataFrame if no data

In [None]:
async def check_missing_month(data_set_name, year, month):
    # Format the CSV file path properly
    csv_file_path = f"../data/data_gov_sg/{data_set_name}_data/{data_set_name}_{year}-{month}.csv"

    # Check for missing data days
    missing_days = check_missing_data(csv_file_path)
    print("Missing data on the following days:", missing_days)

    # Load the existing CSV data
    old_data = pd.read_csv(csv_file_path)

    # Total number of days with missing data
    total_days = len(missing_days)
    new_data = []
    
    async with aiohttp.ClientSession() as session:
        with tqdm(total=total_days) as pbar:
            # Iterate over each missing day and attempt to fetch the data
            for missing_day in missing_days:
                timestamp_str = missing_day.strftime("%Y-%m-%d")
                try:
                    # Fetch the data asynchronously
                    df = await get_data_async(timestamp_str, data_set_name, session)
                    if not df.empty:
                        new_data.append(df)
                    pbar.update(1)  # Update progress bar for each day

                except Exception as e:
                    print(f"Error fetching {timestamp_str}: {e}. Retrying...")
                    await asyncio.sleep(5)  # Wait before retrying
                    try:
                        df = await get_data_async(timestamp_str, data_set_name, session)
                        if not df.empty:
                            new_data.append(df)
                        pbar.update(1)
                    except Exception as e2:
                        print(f"Failed to fetch {timestamp_str}: {e2}")
                        await asyncio.sleep(8)  # Longer delay for retry failure

                await asyncio.sleep(2)  # Introduce a 2-second delay between requests

        # If new data was successfully fetched, concatenate it with old data and save
        if new_data:
            # Concatenate old and new data
            combined = pd.concat([old_data] + new_data)
            
            # Remove duplicates based on the 'Timestamp' and 'stationId' columns
            combined = combined.drop_duplicates(subset=['Timestamp'])
            
            # Sort by 'Timestamp' in ascending order
            combined = combined.sort_values(by='Timestamp', ascending=True)
            
            # Save the combined data to CSV
            combined.to_csv(csv_file_path, index=False)

    print(f"Finished processing missing data for {year}-{month}.")


In [27]:
async def check_dataset(data_set_name):
    folder_path = f"../data/data_gov_sg/{data_set_name}_data/"

    # Get all CSV files in the folder
    try:
        files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]
    except Exception as e:
        print(f"Error reading directory {folder_path}: {e}")
        return

    # List to hold tasks for concurrent execution
    tasks = []

    # Extract year and month from each filename and check for missing data
    for file in files:
        # Regex to strictly match filenames like "rainfall_2020-01.csv", "rainfall_2020-12.csv"
        match = re.match(rf"{data_set_name}_(\d{{4}})-(\d{{2}})\.csv", file)
        if match:
            year = int(match.group(1))
            month = match.group(2)  # Keep the month as a two-digit string
            
            # Add the task to the list (running check_missing_month concurrently)
            tasks.append(check_missing_month(data_set_name, year, month))
        else:
            print(f"Skipping file with invalid format: {file}")
    
    # Run all tasks concurrently
    if tasks:
        await asyncio.gather(*tasks)
    else:
        print("No valid files to process")

In [28]:
await check_dataset("rainfall")

Missing data on the following days: []


0it [00:00, ?it/s]

Missing data on the following days: []


0it [00:00, ?it/s]

Missing data on the following days: [datetime.date(2020, 6, 8), datetime.date(2020, 6, 11)]


  0%|          | 0/2 [00:00<?, ?it/s]

Missing data on the following days: [datetime.date(2020, 7, 1), datetime.date(2020, 7, 2), datetime.date(2020, 7, 29), datetime.date(2020, 7, 30), datetime.date(2020, 7, 31)]


  0%|          | 0/5 [00:00<?, ?it/s]

Missing data on the following days: [datetime.date(2020, 8, 1), datetime.date(2020, 8, 2), datetime.date(2020, 8, 3), datetime.date(2020, 8, 4), datetime.date(2020, 8, 5), datetime.date(2020, 8, 6), datetime.date(2020, 8, 7)]


  0%|          | 0/7 [00:00<?, ?it/s]

Missing data on the following days: [datetime.date(2020, 9, 1)]


  0%|          | 0/1 [00:00<?, ?it/s]

Missing data on the following days: []


0it [00:00, ?it/s]

Missing data on the following days: []


0it [00:00, ?it/s]

Missing data on the following days: []


0it [00:00, ?it/s]

Finished processing missing data for 2020-04.
Finished processing missing data for 2020-05.
Finished processing missing data for 2020-10.
Finished processing missing data for 2020-11.
Finished processing missing data for 2020-12.


KeyError: Index(['stationId'], dtype='object')