In [None]:
import os

import xarray as xr
import matplotlib.pyplot as plt
import pandas as pd

import regionmask

import numpy as np
import pandas as pd
import geopandas as gpd
import dask_geopandas as dgpd
from shapely.geometry import Point
from dask import dataframe as dd
from mpl_toolkits.basemap import Basemap

import polars as pl
import gc 
import pyarrow as pa

from typing import List


# Convert file format 

In [None]:
ds = xr.open_dataset('/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/power_nasa_data.nc')

df_pandas = ds.to_dataframe().reset_index()

# Convert the Pandas DataFrame to a Polars DataFrame
df_polars = pl.from_pandas(df_pandas)

df_polars.write_parquet("/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/02_intermediate/power_nasa_data.parquet")

# Merge raw files

## POWER NASA 
ALready merged

## EDDI

In [None]:
def merge_parquet_files(parquet_files: List[str]) -> pl.DataFrame:
    """
    Merge multiple Parquet files into a single DataFrame.

    Args:
        parquet_files (List[str]): List of file paths to the Parquet files.

    Returns:
        pl.DataFrame: Merged DataFrame containing data from all the Parquet files.
    """
    # Initialize an empty list to store the dataframes
    dataframes = []

    # Iterate over the Parquet files
    for file_path in parquet_files:
        try:
            # Read the Parquet file
            df = pl.read_parquet(file_path)
            
            # Extract the time period from the file name
            time_period = file_path.split("_")[-1].split(".")[0]
            
            # Rename the 'eddi' column with the time period
            df = df.rename({"eddi": f"eddi_{time_period}"})
            
            # Check if 'time' column is already in Date format
            if df.dtypes[df.columns.index("time")] != pl.Date:
                # Convert 'time' column to Date format
                df = df.with_columns(pl.col("time").cast(pl.Date))
            
            # Append the dataframe to the list
            dataframes.append(df)
        except Exception as e:
            print(f"Error processing file: {file_path}")
            print(f"Error message: {str(e)}")
            continue

    # Check if any dataframes were successfully loaded
    if not dataframes:
        raise ValueError("No valid dataframes found in the provided Parquet files.")

    # Merge all the dataframes based on 'time', 'lat', and 'lon'
    merged_df = dataframes[0]
    for df in dataframes[1:]:
        try:
            merged_df = merged_df.join(df, on=["time", "lat", "lon"])
        except Exception as e:
            print(f"Error merging dataframe: {str(e)}")
            raise

    return merged_df

In [None]:
# Define the paths to your Parquet files
parquet_files = [
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_01mn.parquet",
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_01wk.parquet",
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_02mn.parquet",
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_02wk.parquet",
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_03mn.parquet",
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_06mn.parquet",
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_09mn.parquet",
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_12mn.parquet"
]

merged_df = merge_parquet_files(parquet_files)

In [None]:
output_file_path = '/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/02_intermediate/merged_eddi_data.parquet'
merged_df.write_parquet(output_file_path)

In [None]:
import os

def delete_files(file_list):
    """
    Delete the specified files.

    This function attempts to delete each file in the provided list.
    It handles exceptions for file not found, permission errors, and other potential issues.

    Args:
    file_list (list): A list of file paths (strings) to be deleted.

    Returns:
    None. The function prints the status of each deletion attempt.
    """
    for file_path in file_list:
        try:
            os.remove(file_path)
            print(f"Successfully deleted: {file_path}")
        except FileNotFoundError:
            print(f"File not found: {file_path}")
        except PermissionError:
            print(f"Permission denied: {file_path}")
        except Exception as e:
            print(f"Error deleting {file_path}: {str(e)}")

# Example usage
delete_files(parquet_files)

## Nino

In [None]:
# Example usage:
nino12 = xr.open_dataset('/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/inino12_daily.nc')
nino3 = xr.open_dataset('/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/inino3_daily.nc')
nino34 = xr.open_dataset('/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/inino34_daily.nc')
nino4 = xr.open_dataset('/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/inino4_daily.nc')

In [None]:
def process_and_convert(dataset, variable_name, start_date='1981-09-01'):
    """
    Process an xarray Dataset and convert it to a Polars DataFrame.

    This function works with datasets that have either 'time' or 'TIME' as the time dimension.

    Args:
    dataset (xarray.Dataset): The input dataset.
    variable_name (str): The name of the variable to process.
    start_date (str): The start date for data selection. Default is '1981-09-01'.

    Returns:
    polars.DataFrame: The processed data as a Polars DataFrame.
    """
    # Determine the time dimension name
    time_dim = 'time' if 'time' in dataset.dims else 'TIME'

    # Find the maximum valid date dynamically
    max_valid_date = dataset[variable_name].dropna(dim=time_dim, how='all')[time_dim].max().values
    
    # Trim the dataset from the first known valid date to the last known valid date
    valid_data = dataset.sel({time_dim: slice(start_date, max_valid_date)})
    
    # Convert the trimmed xarray Dataset to a pandas DataFrame
    df_valid = valid_data[variable_name].to_dataframe(name=variable_name)
    
    # Reset the index to convert the datetime index into a regular column
    df_valid_reset = df_valid.reset_index()
    
    # Convert the pandas DataFrame with reset index to a Polars DataFrame
    pl_valid = pl.from_pandas(df_valid_reset)
    
    return pl_valid

# Process each dataset
pl_nino12 = process_and_convert(nino12, 'Nino12')
pl_nino3 = process_and_convert(nino3, 'Nino3')
pl_nino34 = process_and_convert(nino34, 'Nino34')
pl_nino4 = process_and_convert(nino4, 'Nino4')


In [None]:
def merge_datasets(*datasets):
    """
    Merge multiple Polars DataFrames on their time column.

    This function works with datasets that have either 'time' or 'TIME' as the time column.

    Args:
    *datasets: Variable number of Polars DataFrames to merge.

    Returns:
    polars.DataFrame: The merged DataFrame.
    """
    if not datasets:
        return None

    # Determine the time column name from the first dataset
    time_col = 'time' if 'time' in datasets[0].columns else 'TIME'

    # Initial dataset to start merging from
    merged_df = datasets[0]
    
    # Iterate over remaining datasets and merge them one by one
    for data in datasets[1:]:
        # Ensure the joining column has the same name in both DataFrames
        if time_col not in data.columns:
            data = data.rename({'TIME': 'time'} if 'TIME' in data.columns else {'time': 'TIME'})
        
        # Use Polars' join function with a custom suffix to prevent name clashes
        merged_df = merged_df.join(data, on=time_col, how="outer", suffix="_right")
        
        # If '_right' columns are created (which contain the same data), drop them
        right_cols = [col for col in merged_df.columns if col.endswith('_right')]
        merged_df = merged_df.drop(right_cols)

    return merged_df

    # Merge the dataframes
pl_merged_nino = merge_datasets(pl_nino12, pl_nino3, pl_nino34, pl_nino4)

In [None]:
pl_merged_nino


In [None]:
pl_merged_nino.write_parquet("/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/02_intermediate/merged_nino_data.parquet")

## IOD

In [None]:
wtio = xr.open_dataset('/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/wtio.nc')
setio = xr.open_dataset('/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/setio.nc')
dmi = xr.open_dataset('/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/dmi.nc')
swio = xr.open_dataset('/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/swio.nc')

In [None]:
# Process each dataset
pl_wtio = process_and_convert(wtio, 'WTIO')
pl_setio = process_and_convert(setio, 'SETIO')
pl_dmi = process_and_convert(dmi, 'DMI')
pl_swio = process_and_convert(swio, 'SWIO')

In [None]:
pl_merged_iod = merge_datasets(pl_wtio, pl_setio, pl_dmi, pl_swio)

In [None]:
pl_merged_iod

In [None]:
pl_merged_iod.write_parquet("/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/02_intermediate/merged_iod_data.parquet")