## Data Cleaning
**Data Cleaning Function (`clean_isd_lite_data`)**:
   - Iterates through JSON records and extracts relevant data such as year, month, day, hour, temperature, dew point temperature, pressure, wind direction, wind speed, cloud cover, rain 1-hour, and rain 6-hour.
   - Scales temperature, dew point temperature, pressure, rain 1-hour, and rain 6-hour values and handles missing values.

In [2]:
'''STEP1: Data Cleaning'''
import os
import json
import numpy as np
import time
import random

# Define reasonable ranges for each meteorological indicator based on the month
ranges = {
    'Temperature': {
        1: (-10, 5), 2: (-5, 10), 3: (0, 15), 4: (5, 20), 5: (10, 25), 6: (15, 30), 
        7: (20, 35), 8: (20, 35), 9: (15, 30), 10: (10, 25), 11: (0, 15), 12: (-5, 10)
    },
    'DewPointTemperature': {
        1: (-15, 0), 2: (-10, 5), 3: (-5, 10), 4: (0, 15), 5: (5, 20), 6: (10, 25), 
        7: (15, 30), 8: (15, 30), 9: (10, 25), 10: (5, 20), 11: (-5, 10), 12: (-10, 5)
    },
    'Pressure': (950, 1050),  # Unified pressure range
    'WindSpeed': (0, 20),     # Unified wind speed range
    'CloudCover': (0, 8),     # Unified cloud cover range
    'Rain1h': (0, 10),        # Unified 1-hour precipitation range
    'Rain6h': (0, 60)         # Unified 6-hour precipitation range
}

# Function to determine event_id based on weather data
def determine_event_id(record):
    temperature = record['Temperature']
    dew_point = record['DewPointTemperature']
    pressure = record['Pressure']
    wind_speed = record['WindSpeed']
    cloud_cover = record['CloudCover']
    rain1h = record['Rain1h']
    rain6h = record['Rain6h']
    
    if temperature >= 30:
        return 1  # Heat Wave
    elif temperature <= -10:
        return 2  # Cold Wave
    elif wind_speed >= 20:
        return 3  # High Wind
    elif rain6h >= 10:
        return 5  # Heavy Rainfall
    elif rain1h >= 50:
        return 5  # Heavy Rainfall
    # Placeholder logic for other events; update based on actual conditions
    elif cloud_cover > 6:
        return 7  # Thunderstorm (assumption)
    elif pressure < 960:
        return 9  # Pressure Drop (assumption)
    else:
        return 0  # No specific event

# Data cleaning function
def clean_isd_lite_data(json_data):
    cleaned_data = []

    for record in json_data:
        cleaned_record = {}
        month = record['Month']
        cleaned_record['Year'] = record['Year']
        cleaned_record['Month'] = month
        cleaned_record['Day'] = record['Day']
        cleaned_record['Hour'] = record['Hour']

        # Temperature, Dew Point Temperature, and Sea Level Pressure: Scale and handle missing values
        cleaned_record['Temperature'] = record['Temperature'] / 10.0 if record['Temperature'] != -9999 else round(random.uniform(*ranges['Temperature'][month]), 1)
        cleaned_record['DewPointTemperature'] = record['Td'] / 10.0 if record['Td'] != -9999 else round(random.uniform(*ranges['DewPointTemperature'][month]), 1)
        cleaned_record['Pressure'] = record['Pressure'] / 10.0 if record['Pressure'] != -9999 else round(random.uniform(*ranges['Pressure']), 1)

        # Wind Direction and Speed: Handle missing values
        cleaned_record['WindDirection'] = record['WindDirection'] if record['WindDirection'] != -9999 else round(random.uniform(0, 360), 1)
        cleaned_record['WindSpeed'] = record['WindSpeed'] / 10.0 if record['WindSpeed'] != -9999 else round(random.uniform(*ranges['WindSpeed']), 1)

        # Cloud Cover: Handle missing values
        cleaned_record['CloudCover'] = record['CloudCover'] if record['CloudCover'] != -9999 else round(random.uniform(*ranges['CloudCover']), 1)

        # Precipitation: Scale and handle missing and trace values
        cleaned_record['Rain1h'] = record['Rain1h'] / 10.0 if record['Rain1h'] not in [-9999, -1] else (0 if record['Rain1h'] == -1 else round(random.uniform(*ranges['Rain1h']), 1))
        cleaned_record['Rain6h'] = record['Rain6h'] / 10.0 if record['Rain6h'] not in [-9999, -1] else (0 if record['Rain6h'] == -1 else round(random.uniform(*ranges['Rain6h']), 1))

        # Determine event_id
        cleaned_record['event_id'] = determine_event_id(cleaned_record)

        cleaned_data.append(cleaned_record)

    return cleaned_data

# Function to read and clean JSON files
def read_and_clean_json_files(base_directory):
    # Iterate through all JSON files in the base directory
    for filename in os.listdir(base_directory):
        if filename.endswith('.json') and not filename.startswith('cleaned_'):
            file_path = os.path.join(base_directory, filename)
            
            # Read JSON data
            with open(file_path, 'r') as file:
                json_data = json.load(file)
            
            # Clean the data
            cleaned_data = clean_isd_lite_data(json_data)
            
            print(f"Processed {filename}")
            # Save the cleaned data as a new JSON file
            cleaned_file_path = os.path.join(base_directory, f"cleaned_{filename}")
            with open(cleaned_file_path, 'w') as cleaned_file:
                json.dump(cleaned_data, cleaned_file, indent=4)
            
            # Remove the original JSON file after cleaning
            os.remove(file_path)

# Main program
if __name__ == '__main__':
    start_time = time.time()
    # Specify the base directory path
    base_directory = '/Users/a1234/Desktop/workspace/CS779/WeatherDB/dataset/2020_2024_US'
    read_and_clean_json_files(base_directory)
    # Record the end time
    end_time = time.time()
    # Calculate and print the runtime
    runtime1 = end_time - start_time
    print(f"Runtime: {runtime1} seconds")

Processed 997990-99999-2020-2024.json
Processed 721012-99999-2020-2024.json
Processed 722704-99999-2020-2024.json
Processed 997787-99999-2020-2024.json
Processed 720907-99999-2020-2024.json
Processed 720642-99999-2020-2024.json
Processed 994082-99999-2020-2023.json
Processed 997272-99999-2020-2021.json
Processed 720449-99999-2020-2024.json
Processed 994061-99999-2023-2024.json
Processed 722158-99999-2020-2023.json
Processed 998223-99999-2020-2024.json
Processed 998274-99999-2020-2023.json
Processed 997988-99999-2020-2024.json
Processed 997262-99999-2020-2024.json
Processed 720451-99999-2020-2024.json
Processed 724015-99999-2020-2024.json
Processed 997255-99999-2020-2024.json
Processed 722592-99999-2020-2024.json
Processed 720129-99999-2020-2024.json
Processed 720995-99999-2020-2024.json
Processed 747960-99999-2020-2024.json
Processed 994971-99999-2020-2024.json
Processed 720409-99999-2020-2024.json
Processed 997802-99999-2020-2024.json
Processed 721013-99999-2020-2024.json
Processed 72

In [3]:
'''STEP2: create station_id，observer_id，observation_device_id'''
import os
import json
import random
import time

# Function to extract station ID from filename
def extract_station_id(filename):
    base_name = os.path.splitext(filename)[0]
    parts = base_name.split('_')
    if len(parts) == 2 and parts[0] == 'cleaned':
        # Split by hyphen and take the first two parts to form the station ID
        station_parts = parts[1].split('-')
        if len(station_parts) >= 2:
            station_id = station_parts[0] + station_parts[1]
            # Convert station_id to integer
            try:
                station_id = int(station_id)
                return station_id
            except ValueError:
                return None
    return None

# Function to add station ID and random fields to data
def add_fields_to_data(json_data, station_id):
    updated_data = []

    for record in json_data:
        updated_record = {}

        # Add station ID
        updated_record['station_id'] = station_id

        # Retain original fields
        updated_record.update(record)

        # Add observer_id and observation_device_id
        updated_record['observer_id'] = random.choice([1, 7, 13, 19, 25, 31])
        updated_record['observation_device_id'] = random.randint(1, 10)

        updated_data.append(updated_record)

    return updated_data

def update_json_files(base_directory):
    # Iterate through all JSON files in the base directory
    for filename in os.listdir(base_directory):
        if filename.endswith('.json') and filename.startswith('cleaned_'):
            file_path = os.path.join(base_directory, filename)
            station_id = extract_station_id(filename)
            if station_id is None:
                continue
            
            # Read JSON data
            with open(file_path, 'r') as file:
                json_data = json.load(file)
            
            # Add new fields
            updated_data = add_fields_to_data(json_data, station_id)
            
            print(f"Processed {filename}")
            # Save the updated data to a new JSON file
            updated_file_path = os.path.join(base_directory, f"updated_{filename}")
            with open(updated_file_path, 'w') as updated_file:
                json.dump(updated_data, updated_file, indent=4)

            # Remove the original cleaned JSON file
            os.remove(file_path)

if __name__ == '__main__':
    start_time = time.time()
    # Specify the base_directory path
    base_directory = '/Users/a1234/Desktop/workspace/CS779/WeatherDB/dataset/2020_2024_US'
    update_json_files(base_directory)
    # Record the end time
    end_time = time.time()
    # Calculate and print the runtime
    runtime = end_time - start_time
    print(f"Runtime: {runtime} seconds")

Processed cleaned_720507-99999-2020-2024.json
Processed cleaned_997290-99999-2020-2024.json
Processed cleaned_998409-99999-2020-2024.json
Processed cleaned_720329-99999-2020-2024.json
Processed cleaned_998436-99999-2024-2024.json
Processed cleaned_997266-99999-2020-2024.json
Processed cleaned_997687-99999-2020-2024.json
Processed cleaned_720994-99999-2020-2024.json
Processed cleaned_997254-99999-2020-2024.json
Processed cleaned_994006-99999-2020-2022.json
Processed cleaned_997994-99999-2020-2024.json
Processed cleaned_720511-99999-2021-2024.json
Processed cleaned_721013-99999-2020-2024.json
Processed cleaned_997802-99999-2020-2024.json
Processed cleaned_720409-99999-2020-2024.json
Processed cleaned_994971-99999-2020-2024.json
Processed cleaned_747960-99999-2020-2024.json
Processed cleaned_722592-99999-2020-2024.json
Processed cleaned_720129-99999-2020-2024.json
Processed cleaned_720995-99999-2020-2024.json
Processed cleaned_997255-99999-2020-2024.json
Processed cleaned_720451-99999-202

In [1]:
import os
import json
import time
from collections import OrderedDict

def extract_time_data(json_data):
    time_data = []

    for record in json_data:
        year = record.get('Year')
        month = record.get('Month')
        day = record.get('Day')
        hour = record.get('Hour')
        
        if year is not None and month is not None and day is not None and hour is not None:
            time_data.append({
                'Year': year,
                'Month': month,
                'Day': day,
                'Hour': hour
            })

    return time_data

def determine_quarter(month):
    if 1 <= month <= 3:
        return 'Q1'
    elif 4 <= month <= 6:
        return 'Q2'
    elif 7 <= month <= 9:
        return 'Q3'
    elif 10 <= month <= 12:
        return 'Q4'
    else:
        return None

def determine_day_session(hour):
    if 0 <= hour < 6:
        return 'Night'
    elif 6 <= hour < 12:
        return 'Morning'
    elif 12 <= hour < 18:
        return 'Afternoon'
    elif 18 <= hour < 24:
        return 'Evening'
    else:
        return None

def generate_time_json(base_directory, output_file):
    all_time_data = set()
    unique_time_data = []

    # Iterate through all updated JSON files in the base directory
    for filename in os.listdir(base_directory):
        if filename.endswith('.json') and filename.startswith('updated_'):
            file_path = os.path.join(base_directory, filename)

            # Read JSON data
            with open(file_path, 'r') as file:
                json_data = json.load(file)
            
            # Extract time data
            time_data = extract_time_data(json_data)

            for entry in time_data:
                year = entry['Year']
                month = entry['Month']
                day = entry['Day']
                hour = entry['Hour']

                # Create a tuple to check for duplicates
                time_tuple = (year, month, day, hour)
                
                if time_tuple not in all_time_data:
                    all_time_data.add(time_tuple)
                    unique_time_data.append({
                        'Year': year,
                        'Month': month,
                        'Day': day,
                        'Hour': hour
                    })

    # Sort the unique time data
    unique_time_data = sorted(unique_time_data, key=lambda x: (x['Year'], x['Month'], x['Day'], x['Hour']))

    # Add time_id and other derived fields
    for time_id_counter, entry in enumerate(unique_time_data, start=1):
        entry.update({
            'time_id': time_id_counter,
            'quarter': determine_quarter(entry['Month']),
            'day_session': determine_day_session(entry['Hour'])
        })

    # Save the sorted and updated unique time data to a new JSON file
    with open(output_file, 'w') as out_file:
        json.dump(unique_time_data, out_file, indent=4)

if __name__ == '__main__':
    start_time = time.time()
    # Specify the base_directory path and output file path
    base_directory = '/Users/a1234/Desktop/workspace/CS779/WeatherDB/dataset/2020_2024_US'
    output_file = '/Users/a1234/Desktop/workspace/CS779/WeatherDB/dataset/time.json'
    generate_time_json(base_directory, output_file)
    # Record the end time
    end_time = time.time()
    # Calculate and print the runtime
    runtime = end_time - start_time
    print(f"Runtime: {runtime} seconds")

Runtime: 5.526558876037598 seconds


In [2]:
'''STEP4: Merge all observations into a single json file'''
import os
import json
import time

def load_time_mapping(time_file):
    with open(time_file, 'r') as file:
        time_data = json.load(file)
    
    time_mapping = {}
    for entry in time_data:
        time_tuple = (entry['Year'], entry['Month'], entry['Day'], entry['Hour'])
        time_mapping[time_tuple] = entry['time_id']
    
    return time_mapping

def update_observations(base_directory, time_mapping, output_file):
    all_observations = []

    # Iterate through all updated JSON files in the base directory
    for filename in os.listdir(base_directory):
        if filename.endswith('.json') and filename.startswith('updated_'):
            file_path = os.path.join(base_directory, filename)

            # Read JSON data
            with open(file_path, 'r') as file:
                json_data = json.load(file)
            
            # Update each record with the corresponding time_id
            for record in json_data:
                year = record.pop('Year', None)
                month = record.pop('Month', None)
                day = record.pop('Day', None)
                hour = record.pop('Hour', None)

                if year is not None and month is not None and day is not None and hour is not None:
                    time_tuple = (year, month, day, hour)
                    time_id = time_mapping.get(time_tuple)

                    if time_id is not None:
                        # Ensure time_id is the first field in the record
                        updated_record = {'time_id': time_id}
                        updated_record.update(record)
                        all_observations.append(updated_record)
            
            print(f"Processed {filename}")

    # Save the updated observations to a new JSON file
    with open(output_file, 'w') as out_file:
        json.dump(all_observations, out_file, indent=4)

if __name__ == '__main__':
    start_time = time.time()
    # Specify the base_directory path and the paths for time and observation files
    base_directory = '/Users/a1234/Desktop/workspace/CS779/WeatherDB/dataset/2020_2024_US'
    time_file = '/Users/a1234/Desktop/workspace/CS779/WeatherDB/dataset/time.json'
    output_file = '/Users/a1234/Desktop/workspace/CS779/WeatherDB/dataset/observation.json'
    
    print("Loading time mapping...")
    # Load the time mapping from time.json
    time_mapping = load_time_mapping(time_file)
    
    print("Updating observations with time_id...")
    # Update the observations with the time_id
    update_observations(base_directory, time_mapping, output_file)
    
    # Record the end time
    end_time = time.time()
    # Calculate and print the runtime
    runtime = end_time - start_time
    print(f"Runtime: {runtime} seconds")
    print(f"Updated observations saved to: {output_file}")

Loading time mapping...
Updating observations with time_id...
Processed updated_cleaned_998274-99999-2020-2023.json
Processed updated_cleaned_997988-99999-2020-2024.json
Processed updated_cleaned_997262-99999-2020-2024.json
Processed updated_cleaned_994061-99999-2023-2024.json
Processed updated_cleaned_722158-99999-2020-2023.json
Processed updated_cleaned_998223-99999-2020-2024.json
Processed updated_cleaned_720995-99999-2020-2024.json
Processed updated_cleaned_722592-99999-2020-2024.json
Processed updated_cleaned_720129-99999-2020-2024.json
Processed updated_cleaned_724015-99999-2020-2024.json
Processed updated_cleaned_720451-99999-2020-2024.json
Processed updated_cleaned_997255-99999-2020-2024.json
Processed updated_cleaned_721012-99999-2020-2024.json
Processed updated_cleaned_997990-99999-2020-2024.json
Processed updated_cleaned_722704-99999-2020-2024.json
Processed updated_cleaned_997272-99999-2020-2021.json
Processed updated_cleaned_720449-99999-2020-2024.json
Processed updated_cl