In [1]:
import pandas as pd
import numpy as np
from scipy.spatial import KDTree
import os
from joblib import Parallel, delayed
from typing import Tuple, List
import warnings

In [2]:
try:
    from google.colab import drive
    drive.mount('/content/drive')
    COLAB_ENV = True
except:
    COLAB_ENV = False
    pass

In [3]:
def idw_interpolation(latitude, longitude, df_temp_without_gauge, kdtree, p=2):
    row = [latitude, longitude]
    distances, indices = kdtree.query(row, k=5)
    weights = 1 / (distances + 1e-6) ** p
    values = df_temp_without_gauge.iloc[indices]['rain_mm'].values
    return (np.sum(weights * values) / np.sum(weights))

In [4]:
def process_date(ref_date, df_total, crossvalidation_path):
    df_temp = df_total[(df_total['datetime'] == ref_date)]
    gauges_loop = list(df_temp['gauge_code'].unique())
    result_list = []
    for gauge_code in gauges_loop[:]:
        df_result = pd.DataFrame(columns=['gauge_code','datetime', "lat", "long",  'interpolated_rain_mm', 'rain_mm'])

        index = (df_temp[df_temp['gauge_code']==gauge_code]).index[0]
        latitude = df_temp.loc[index, "lat"]
        longitude = df_temp.loc[index, "long"]
        ground_value = df_temp.loc[index, "rain_mm"]

        df_temp_without_gauge = df_temp[df_temp['gauge_code'] != gauge_code]

        locations = df_temp_without_gauge[['lat', 'long']].values
        kdtree = KDTree(locations)        

        interpolated_value = idw_interpolation(latitude, longitude, df_temp_without_gauge, kdtree)
        del df_temp_without_gauge, locations, kdtree
        
        df_result.loc[len(df_result)] = [gauge_code, ref_date, latitude, longitude, interpolated_value, ground_value]
        result_list.append(df_result)
    del df_temp
    
    df_final_result = pd.concat(result_list, ignore_index=True).drop_duplicates(ignore_index=True).sort_values('datetime', ignore_index=True)
    del result_list
    output_path = os.path.join(crossvalidation_path, f"{ref_date.date().strftime('%Y_%m_%d')}_crossvalidation.h5")
    print(output_path)
    df_final_result.to_hdf(output_path
                            , key = 'table_crossvalidation'
                            , mode = 'w'
                            , append = False
                            , complevel = 9
                            , encoding="utf-8")
    del df_final_result

In [5]:
def load_data_in_chunks(file_path: str, chunk_size: int = 11_000_000) -> pd.DataFrame:
    """Load HDF5 data in chunks with memory optimization"""
    dfs = []
    with pd.HDFStore(file_path, mode='r') as store:
        total_rows = store.get_storer('table_data').nrows
        print(f"Rows in table_data: {total_rows:,}")
        
        for chunk in store.select('table_data', chunksize=chunk_size):
            dfs.append(chunk)
            print(f"Processed chunk {len(dfs)} (approx. {len(dfs)*chunk_size:,}/{total_rows:,} rows)")
    
    return pd.concat(dfs, ignore_index=True)

In [6]:
def filter_and_prepare_data(df_data: pd.DataFrame, df_info: pd.DataFrame, 
                          start_date: str, end_date: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Filter data by date range and prepare date lists"""
    # Filter by date range
    mask = (df_data['datetime'] >= start_date) & (df_data['datetime'] <= end_date)
    df_data = df_data.loc[mask].sort_values('datetime', ignore_index=True)
    
    # Create date list dataframe
    df_date_list = (df_data[['datetime']]
                   .drop_duplicates()
                   .assign(year=lambda x: x['datetime'].dt.year)
                   .sort_values('datetime', ignore_index=True))
    
    return df_data, df_date_list

In [7]:
def process_year(year: int, df_year: pd.DataFrame, crossvalidation_path: str) -> None:
    """Process data for a single year"""
    date_ls = df_year['datetime'].unique().tolist()
    date_ls.sort()
    
    print(f'\nProcessing year: {year}')
    print(f'Number of dates: {len(date_ls):,}')
    print(f'Number of rows: {len(df_year):,}')
    
    Parallel(n_jobs=-2)(delayed(process_date)(ref_date, df_year, crossvalidation_path) 
                       for ref_date in date_ls)

In [8]:
def main(file_path: str, crossvalidation_path: str) -> None:
    """Main processing pipeline"""
    # Constants
    START_DATE = '1963-04-14'
    END_DATE = '1963-04-14'
    
    # Suppress HDF5 warnings
    warnings.filterwarnings('ignore', category=pd.errors.PerformanceWarning)
    
    try:
        # Load metadata
        df_info = pd.read_hdf(file_path, key='table_info')
        
        # Load main data in chunks
        print("\nLoading data...")
        df_data = load_data_in_chunks(file_path)
        
        # Filter and prepare data
        print("\nFiltering data...")
        df_data, df_date_list = filter_and_prepare_data(df_data, df_info, START_DATE, END_DATE)
        
        # Process by year
        print("\nStarting parallel processing...")
        for year, year_group in df_date_list.groupby('year'):
            df_year_data = df_data[df_data['datetime'].isin(year_group['datetime'])]
            df_year_data = pd.merge(df_year_data, df_info, on='gauge_code', how='inner')
            process_year(year, df_year_data, crossvalidation_path)
            
        print('\n\nAll done!')
        
    except Exception as e:
        print(f"\nError occurred: {str(e)}")
        raise

In [9]:
if __name__ == "__main__":
    # Define paths
    if COLAB_ENV:
        BASE_PATH = '/content/drive/MyDrive/BRain-D/Scripts e Dados/1 - Organized data gauge/BRAZIL'
    else:
        BASE_PATH = './1 - Organized data gauge/BRAZIL'
    
    FILE_PATH = os.path.join(BASE_PATH, 'DATASETS/BRAZIL_DAILY_1961_2024_QC.h5')
    CROSSVAL_PATH = os.path.join(BASE_PATH, 'CROSSVALIDATION')
    
    main(FILE_PATH, CROSSVAL_PATH)


Loading data...
Rows in table_data: 106,295,715
Processed chunk 1 (approx. 11,000,000/106,295,715 rows)
Processed chunk 2 (approx. 22,000,000/106,295,715 rows)
Processed chunk 3 (approx. 33,000,000/106,295,715 rows)
Processed chunk 4 (approx. 44,000,000/106,295,715 rows)
Processed chunk 5 (approx. 55,000,000/106,295,715 rows)
Processed chunk 6 (approx. 66,000,000/106,295,715 rows)
Processed chunk 7 (approx. 77,000,000/106,295,715 rows)
Processed chunk 8 (approx. 88,000,000/106,295,715 rows)
Processed chunk 9 (approx. 99,000,000/106,295,715 rows)
Processed chunk 10 (approx. 110,000,000/106,295,715 rows)

Filtering data...

Starting parallel processing...

Processing year: 1963
Number of dates: 1
Number of rows: 3,696


All done!
