In [1]:
import pandas as pd
import numpy as np
from scipy.spatial.distance import cdist
from multiprocessing import Pool
from fill_nulls_multiprocess import process_date_subset  # Import the function

In [2]:
df = pd.read_csv("data_gcs_combine_update_district.csv", index_col=0)

In [3]:
def precompute_distances_with_numpy(stations, threshold):
    """
    Precompute the pairwise distances and weights (1/distance^2) between stations,
    filter pairs based on the distance threshold, and remove self-pairs.

    Parameters:
        stations (pd.DataFrame): DataFrame containing 'region', 'latitude', and 'longitude'.
        threshold (float): Distance threshold in kilometers.

    Returns:
        pd.DataFrame: Filtered distance matrix with columns ['station1', 'station2', 'distance', 'weight'].
    """
    coords = stations[['latitude', 'longitude']].to_numpy()
    distances = cdist(coords, coords, metric='euclidean')  # Replace with great-circle distance if necessary
    
    # Convert to a DataFrame
    distance_matrix = pd.DataFrame(distances, index=stations['region'], columns=stations['region'])

    # Rename index and columns to avoid duplication
    distance_matrix.index.name = 'station1'
    distance_matrix.columns.name = 'station2'

    # Convert to long format using stack
    distance_matrix = distance_matrix.stack().reset_index()
    distance_matrix.columns = ['station1', 'station2', 'distance']

    # Remove self-pairs
    distance_matrix = distance_matrix[distance_matrix['station1'] != distance_matrix['station2']]

    # Filter by threshold
    distance_matrix = distance_matrix[distance_matrix['distance'] <= threshold]

    # Calculate weights (1/distance^2)
    distance_matrix['weight'] = distance_matrix['distance'].apply(lambda d: 1 / (d ** 2))

    return distance_matrix

In [4]:
def parallel_fill_nulls(df, fields, distance_matrix, n_processes=4):
    """
    Fill null values using multiprocessing by splitting the DataFrame by date.
    """
    df_filled = df.copy()
    dates = df_filled['date'].unique()

    # Prepare arguments for multiprocessing
    args = [(date, df_filled[df_filled['date'] == date], fields, distance_matrix) for date in dates]

    # Use multiprocessing
    with Pool(n_processes) as pool:
        results = pool.map(process_date_subset, args)

    # Combine results
    df_filled = pd.concat(results, ignore_index=True)
    return df_filled

In [5]:
stations = df[['region', 'latitude', 'longitude']].drop_duplicates()

In [6]:
fields_to_fill = [
    'Mean Pressure (hPa)', 
    'Total Rainfall (mm)', 
    'Mean Relative Humidity (%)', 
    'Maximum Temperature (°C)', 
    'Minimum Temperature (°C)', 
    'Mean Temperature (°C)', 
    'Prevailing Wind Direction (°)', 
    'Mean Wind Speed (km/h)']

In [7]:
threshold = 30  # Define a distance threshold

In [8]:
distance_matrix = precompute_distances_with_numpy(stations, threshold)

In [9]:
filled_df = parallel_fill_nulls(df, fields_to_fill, distance_matrix, n_processes=4)

In [10]:
filled_df.to_csv("data_gcs_combine_update_district_fillna.csv")