In [1]:
import re
import os
import shutil
import requests
import polars as pl
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor
import rasterio
import numpy as np

from rasterio.transform import Affine

In [2]:
def fetch_file_links(year_url: str, file_type: str) -> list[str]:
    """
    Fetches file links from a given URL that match a specific file type and filter by additional criteria.
    
    Parameters:
    - year_url: The base URL to search for file links.
    - file_type: The file extension to filter links by.
    
    Returns:
    A list of URLs that match the specified criteria.
    """
    session = requests.Session()
    try:
        response = session.get(year_url)
        response.raise_for_status()  # Raises a HTTPError if the status is 4xx, 5xx
        soup = BeautifulSoup(response.text, 'html.parser')
        links = soup.find_all('a', href=True)
        file_links = [f"{year_url}{link['href']}" for link in links if link['href'].endswith('.asc')]
        filtered_links = [link for link in file_links if file_type in link]
        return filtered_links
    except requests.exceptions.HTTPError as http_err:
        print(f"HTTP error occurred: {http_err}")
    except requests.exceptions.RequestException as err:
        print(f"Request error occurred: {err}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
    return []

In [3]:
def download_file(url, destination_folder):
    filename = url.split("/")[-1]
    destination_path = os.path.join(destination_folder, filename)
    try:
        response = requests.get(url)
        response.raise_for_status()
        with open(destination_path, 'wb') as f:
            f.write(response.content)
        print(f"File downloaded successfully: {filename}")
    except requests.exceptions.RequestException as e:
        print(f"Failed to download the file: {filename}. Error: {e}")

In [4]:
def append_and_process_files(source_dir, transform, row_start, row_end, col_start, col_end):
    master_df = pl.DataFrame()
    for filename in os.listdir(source_dir):
        if filename.endswith('.asc'):
            file_path = os.path.join(source_dir, filename)
            with rasterio.open(file_path) as src:
                data = src.read(1)  # Read the first band
                filtered_data = data[row_start:row_end, col_start:col_end]
            df = create_polars_dataframe(file_path, filtered_data, row_start, row_end, col_start, col_end, transform)
            master_df = master_df.vstack(df) if not master_df.is_empty() else df
    return master_df


In [5]:
def create_polars_dataframe(file_name: str, data: np.ndarray, row_start: int, row_end: int, col_start: int, col_end: int, transform: Affine):
    """
    Creates a Polars DataFrame from filtered raster data with columns for time, latitude, longitude, and EDDI values.

    Parameters:
    - file_name (str): The name of the file, used to extract the time information.
    - data (numpy.ndarray): The filtered raster data array.
    - row_start (int): The starting index for rows of the filtered data.
    - row_end (int): The ending index for rows of the filtered data.
    - col_start (int): The starting index for columns of the filtered data.
    - col_end (int): The ending index for columns of the filtered data.
    - transform (Affine): The affine transformation used to convert indices to geographic coordinates.

    Returns:
    - pl.DataFrame: A Polars DataFrame with columns for time, latitude, longitude, and EDDI values.
    """
    # Extract the date from the file name using regular expressions
    date_match = re.search(r'\d{8}', file_name)
    if date_match:
        date = date_match.group(0)
        # Transform the date to 'yyyy-mm-dd' format
        formatted_date = f"{date[:4]}-{date[4:6]}-{date[6:]}"
    else:
        raise ValueError("Date not found in file name.")

    # Generate latitude and longitude arrays
    lon_arr = []
    lat_arr = []
    for row in range(row_start, row_end):
        for col in range(col_start, col_end):
            lon, lat = transform * (col, row)
            lon_arr.append(lon)
            lat_arr.append(lat)

    # Flatten the data array to match the latitude and longitude arrays
    eddi_values = data.flatten()

    # Create a Polars DataFrame
    df = pl.DataFrame({
        "time": [formatted_date] * len(eddi_values),
        "lat": lat_arr,
        "lon": lon_arr,
        "eddi": eddi_values
    })

    return df

In [6]:
def geographic_to_index(lon, lat, transform):
    """
    Convert geographic coordinates to raster indices.

    Parameters:
    - lon (float): The longitude to convert.
    - lat (float): The latitude to convert.
    - transform (Affine): The affine transform associated with the raster, defining how geographic coordinates are transformed into raster indices.

    Returns:
    - (int, int): The zero-based column index and row index corresponding to the given longitude and latitude.
    """
    col, row = ~transform * (lon, lat)
    return int(col), int(row)

In [7]:
# Main function for dowloading data 
def download_and_process_raster_data(base_url, start_year, end_year, file_type, destination_folder):
    # Define the affine transform for a raster where each pixel represents a 0.5x0.5 degree area
    transform = Affine(0.5, 0.0, -180, 0.0, -0.5, 90)

    # Define geographic bounds and calculate index ranges
    lat_range = (-12, 22)  # South to North
    lon_range = (23, 52)  # West to East
    col_start, row_end = geographic_to_index(lon_range[0], lat_range[0], transform)
    col_end, row_start = geographic_to_index(lon_range[1], lat_range[1], transform)

    for year in range(start_year, end_year + 1):
        year_url = f"{base_url}{year}/"
        year_destination_folder = os.path.join(destination_folder, str(year))
        
        if not os.path.exists(year_destination_folder):
            os.makedirs(year_destination_folder)
        
        file_links = fetch_file_links(year_url, file_type)
        with ThreadPoolExecutor(max_workers=8) as executor:
            executor.map(lambda url: download_file(url, year_destination_folder), file_links)
        
        # Process and append raster data to a Polars DataFrame
        master_df = append_and_process_files(year_destination_folder, transform, row_start, row_end, col_start, col_end)
        
        # Save the DataFrame as a Parquet file
        parquet_path = os.path.join(destination_folder, f"EDDI_{file_type}_{year}.parquet")
        master_df.write_parquet(parquet_path)
        print(f"Data for year {year} processed and saved as Parquet at {parquet_path}.")
        
        shutil.rmtree(year_destination_folder)  # Optional: remove the directory after processing

In [8]:
def merge_parquet_files(directory_path: str, output_file_path: str) -> None:
    """
    Merges multiple Parquet files located in a specified directory into a single Parquet file.

    Args:
    directory_path (str): The path to the directory containing the Parquet files.
    output_file_path (str): The path where the merged Parquet file will be saved.

    Returns:
    None
    """

    # List all Parquet files in the directory
    parquet_files = [file for file in os.listdir(directory_path) if file.endswith('.parquet')]

    # Read each Parquet file into a DataFrame and store in a list
    dataframes = [pl.read_parquet(os.path.join(directory_path, file)) for file in parquet_files]

    # Concatenate all DataFrames vertically
    merged_df = pl.concat(dataframes)

    # Write the merged DataFrame to a new Parquet file
    merged_df.write_parquet(output_file_path)

    print(f"Merged file saved to {output_file_path}")


In [9]:
import shutil
import os

def delete_directory(dir_path: str) -> None:
    """
    Deletes a directory and all of its contents.
    
    Args:
    dir_path (str): The path to the directory to be deleted.
    
    Raises:
    FileNotFoundError: If the directory does not exist.
    PermissionError: If the deletion is not allowed due to permission restrictions.
    """
    if os.path.exists(dir_path):
        shutil.rmtree(dir_path)
        print(f"Directory '{dir_path}' has been deleted.")
    else:
        raise FileNotFoundError(f"The directory {dir_path} does not exist.")

# Specify the directory to delete
# directory_path = "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/eddi"

# Delete the directory
# try:
#     delete_directory(directory_path)
# except Exception as e:
#     print(f"An error occurred: {e}")


In [13]:
    base_url = "https://downloads.psl.noaa.gov/Projects/EDDI/global_archive/NCEP/"
    destination_folder = "/teamspace/studios/this_studio/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/eddi"
    start_year = 1981
    end_year = 2024
    file_types = ["01wk", "02wk", "01mn", "02mn", "03mn", "06mn", "09mn", "12mn"]

    for file_type in file_types:
        download_and_process_raster_data(base_url, start_year, end_year, file_type, destination_folder)

        # Construct the directory and output file paths
        directory_path = '/teamspace/studios/this_studio/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/eddi'
        output_file_path = f'/teamspace/studios/this_studio/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_{file_type}.parquet'

        # Call the function to merge the files
        merge_parquet_files(directory_path, output_file_path)

        # Delete the directory
        try:
            delete_directory(directory_path)
        except Exception as e:
            print(f"An error occurred: {e}")

File downloaded successfully: EDDI_01wk_19810101.asc
File downloaded successfully: EDDI_01wk_19810105.asc
File downloaded successfully: EDDI_01wk_19810108.asc
File downloaded successfully: EDDI_01wk_19810107.asc
File downloaded successfully: EDDI_01wk_19810106.asc
File downloaded successfully: EDDI_01wk_19810102.asc
File downloaded successfully: EDDI_01wk_19810104.asc
File downloaded successfully: EDDI_01wk_19810103.asc
File downloaded successfully: EDDI_01wk_19810109.asc
File downloaded successfully: EDDI_01wk_19810111.asc
File downloaded successfully: EDDI_01wk_19810113.asc
File downloaded successfully: EDDI_01wk_19810116.asc
File downloaded successfully: EDDI_01wk_19810114.asc
File downloaded successfully: EDDI_01wk_19810112.asc
File downloaded successfully: EDDI_01wk_19810110.asc
File downloaded successfully: EDDI_01wk_19810117.asc
File downloaded successfully: EDDI_01wk_19810120.asc
File downloaded successfully: EDDI_01wk_19810119.asc
File downloaded successfully: EDDI_01wk_198101

KeyboardInterrupt: 

File downloaded successfully: EDDI_01wk_19840811.asc
File downloaded successfully: EDDI_01wk_19840812.asc
File downloaded successfully: EDDI_01wk_19840813.asc
File downloaded successfully: EDDI_01wk_19840814.asc
File downloaded successfully: EDDI_01wk_19840816.asc
File downloaded successfully: EDDI_01wk_19840817.asc
File downloaded successfully: EDDI_01wk_19840815.asc
File downloaded successfully: EDDI_01wk_19840818.asc
File downloaded successfully: EDDI_01wk_19840820.asc
File downloaded successfully: EDDI_01wk_19840819.asc
File downloaded successfully: EDDI_01wk_19840821.asc
File downloaded successfully: EDDI_01wk_19840822.asc
File downloaded successfully: EDDI_01wk_19840823.asc
File downloaded successfully: EDDI_01wk_19840824.asc
File downloaded successfully: EDDI_01wk_19840825.asc
File downloaded successfully: EDDI_01wk_19840826.asc
File downloaded successfully: EDDI_01wk_19840803.asc
File downloaded successfully: EDDI_01wk_19840827.asc
File downloaded successfully: EDDI_01wk_198408

# Testing functions

In [14]:
import polars as pl
# Define the path to your Parquet file
parquet_file_path_1 = "/teamspace/studios/this_studio/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw//merged_eddi_data_01wk.parquet"
parquet_file_path_2 = "/teamspace/studios/this_studio/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw//merged_eddi_data_02wk.parquet"


# Use Polars to read the Parquet file
df_1 = pl.read_parquet(parquet_file_path_1)
df_2 = pl.read_parquet(parquet_file_path_2)

In [7]:
df_1

time,lat,lon,eddi
str,f64,f64,f32
"""1981-01-04""",22.0,23.0,0.000288
"""1981-01-04""",22.0,23.5,-0.115403
"""1981-01-04""",22.0,24.0,-0.29267
"""1981-01-04""",22.0,24.5,-0.29267
"""1981-01-04""",22.0,25.0,-0.415945
…,…,…,…
"""2024-04-15""",-11.5,49.5,-999.0
"""2024-04-15""",-11.5,50.0,-999.0
"""2024-04-15""",-11.5,50.5,-999.0
"""2024-04-15""",-11.5,51.0,-999.0


In [8]:
df_2

time,lat,lon,eddi
str,f64,f64,f32
"""1981-01-07""",22.0,23.0,-0.761296
"""1981-01-07""",22.0,23.5,-0.479941
"""1981-01-07""",22.0,24.0,-0.479941
"""1981-01-07""",22.0,24.5,-0.353626
"""1981-01-07""",22.0,25.0,-0.232805
…,…,…,…
"""2024-04-29""",-11.5,49.5,-999.0
"""2024-04-29""",-11.5,50.0,-999.0
"""2024-04-29""",-11.5,50.5,-999.0
"""2024-04-29""",-11.5,51.0,-999.0


In [16]:
import polars as pl

# Define the path to your Parquet files
parquet_file_path_1 = "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_01wk.parquet"
parquet_file_path_2 = "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_02wk.parquet"
parquet_file_path_3 = "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_01mn.parquet"

# Read the Parquet files
df_1 = pl.read_parquet(parquet_file_path_1)
df_2 = pl.read_parquet(parquet_file_path_2)
df_3 = pl.read_parquet(parquet_file_path_3)

# Rename 'eddi' column
df_1 = df_1.rename({"eddi": "eddi_01wk"})
df_2 = df_2.rename({"eddi": "eddi_02wk"})
df_3 = df_3.rename({"eddi": "eddi_01mn"})

# Convert 'time' column from string to date format
df_1 = df_1.with_columns(pl.col("time").str.strptime(pl.Date, "%Y-%m-%d"))
df_2 = df_2.with_columns(pl.col("time").str.strptime(pl.Date, "%Y-%m-%d"))
df_3 = df_3.with_columns(pl.col("time").str.strptime(pl.Date, "%Y-%m-%d"))

# Merge the dataframes based on 'time', 'lat', and 'lon'
merged_df = df_1.join(df_2, on=["time", "lat", "lon"])
merged_df = merged_df.join(df_3, on=["time", "lat", "lon"])

# Display the merged dataframe
print(merged_df)


shape: (62_374_360, 6)
┌────────────┬───────┬──────┬───────────┬───────────┬───────────┐
│ time       ┆ lat   ┆ lon  ┆ eddi_01wk ┆ eddi_02wk ┆ eddi_01mn │
│ ---        ┆ ---   ┆ ---  ┆ ---       ┆ ---       ┆ ---       │
│ date       ┆ f64   ┆ f64  ┆ f32       ┆ f32       ┆ f32       │
╞════════════╪═══════╪══════╪═══════════╪═══════════╪═══════════╡
│ 1981-01-06 ┆ 22.0  ┆ 23.0 ┆ -0.353626 ┆ -0.17379  ┆ 0.05802   │
│ 1981-01-06 ┆ 22.0  ┆ 23.5 ┆ -0.353626 ┆ -0.232805 ┆ -0.057442 │
│ 1981-01-06 ┆ 22.0  ┆ 24.0 ┆ -0.353626 ┆ -0.232805 ┆ -0.115403 │
│ 1981-01-06 ┆ 22.0  ┆ 24.5 ┆ -0.415945 ┆ -0.115403 ┆ -0.115403 │
│ 1981-01-06 ┆ 22.0  ┆ 25.0 ┆ -0.614502 ┆ -0.115403 ┆ -0.17379  │
│ …          ┆ …     ┆ …    ┆ …         ┆ …         ┆ …         │
│ 2024-04-29 ┆ -11.5 ┆ 49.5 ┆ -999.0    ┆ -999.0    ┆ -999.0    │
│ 2024-04-29 ┆ -11.5 ┆ 50.0 ┆ -999.0    ┆ -999.0    ┆ -999.0    │
│ 2024-04-29 ┆ -11.5 ┆ 50.5 ┆ -999.0    ┆ -999.0    ┆ -999.0    │
│ 2024-04-29 ┆ -11.5 ┆ 51.0 ┆ -999.0    ┆ -999.0    ┆

In [29]:
import polars as pl

# Define the paths to your Parquet files
parquet_files = [
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_01mn.parquet",
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_01wk.parquet",
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_02mn.parquet",
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_02wk.parquet",
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_03mn.parquet",
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_06mn.parquet",
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_09mn.parquet",
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_12_mn.parquet"
]

# Initialize an empty list to store the dataframes
dataframes = []

# Iterate over the Parquet files
for file_path in parquet_files:
    # Read the Parquet file
    df = pl.read_parquet(file_path)
    
    # Extract the time period from the file name
    time_period = file_path.split("_")[-1].split(".")[0]
    
    # Rename the 'eddi' column with the time period
    df = df.rename({"eddi": f"eddi_{time_period}"})
    
    # Check if 'time' column is already in Date format
    if df.dtypes[df.columns.index("time")] != pl.Date:
        # Convert 'time' column to Date format
        df = df.with_columns(pl.col("time").cast(pl.Date))
    
    # Append the dataframe to the list
    dataframes.append(df)

# Merge all the dataframes based on 'time', 'lat', and 'lon'
merged_df = dataframes[0]
for df in dataframes[1:]:
    merged_df = merged_df.join(df, on=["time", "lat", "lon"])


shape: (62_374_360, 11)
┌────────────┬───────┬──────┬───────────┬───┬───────────┬───────────┬───────────┬───────────┐
│ time       ┆ lat   ┆ lon  ┆ eddi_01mn ┆ … ┆ eddi_03mn ┆ eddi_06mn ┆ eddi_09mn ┆ eddi_mn   │
│ ---        ┆ ---   ┆ ---  ┆ ---       ┆   ┆ ---       ┆ ---       ┆ ---       ┆ ---       │
│ date       ┆ f64   ┆ f64  ┆ f32       ┆   ┆ f32       ┆ f32       ┆ f32       ┆ f32       │
╞════════════╪═══════╪══════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪═══════════╡
│ 1981-01-01 ┆ 22.0  ┆ 23.0 ┆ 0.546651  ┆ … ┆ -0.545979 ┆ -0.479941 ┆ -0.115403 ┆ -0.115403 │
│ 1981-01-01 ┆ 22.0  ┆ 23.5 ┆ 0.416576  ┆ … ┆ -0.545979 ┆ -0.479941 ┆ -0.115403 ┆ -0.057442 │
│ 1981-01-01 ┆ 22.0  ┆ 24.0 ┆ 0.416576  ┆ … ┆ -0.545979 ┆ -0.614502 ┆ -0.115403 ┆ -0.115403 │
│ 1981-01-01 ┆ 22.0  ┆ 24.5 ┆ 0.416576  ┆ … ┆ -0.479941 ┆ -0.686047 ┆ -0.115403 ┆ -0.17379  │
│ 1981-01-01 ┆ 22.0  ┆ 25.0 ┆ 0.416576  ┆ … ┆ -0.415945 ┆ -0.686047 ┆ -0.232805 ┆ -0.17379  │
│ …          ┆ …     ┆ …    ┆ …     

In [1]:
import polars as pl
from typing import List

def merge_parquet_files(parquet_files: List[str]) -> pl.DataFrame:
    """
    Merge multiple Parquet files into a single DataFrame.

    Args:
        parquet_files (List[str]): List of file paths to the Parquet files.

    Returns:
        pl.DataFrame: Merged DataFrame containing data from all the Parquet files.
    """
    # Initialize an empty list to store the dataframes
    dataframes = []

    # Iterate over the Parquet files
    for file_path in parquet_files:
        try:
            # Read the Parquet file
            df = pl.read_parquet(file_path)
            
            # Extract the time period from the file name
            time_period = file_path.split("_")[-1].split(".")[0]
            
            # Rename the 'eddi' column with the time period
            df = df.rename({"eddi": f"eddi_{time_period}"})
            
            # Check if 'time' column is already in Date format
            if df.dtypes[df.columns.index("time")] != pl.Date:
                # Convert 'time' column to Date format
                df = df.with_columns(pl.col("time").cast(pl.Date))
            
            # Append the dataframe to the list
            dataframes.append(df)
        except Exception as e:
            print(f"Error processing file: {file_path}")
            print(f"Error message: {str(e)}")
            continue

    # Check if any dataframes were successfully loaded
    if not dataframes:
        raise ValueError("No valid dataframes found in the provided Parquet files.")

    # Merge all the dataframes based on 'time', 'lat', and 'lon'
    merged_df = dataframes[0]
    for df in dataframes[1:]:
        try:
            merged_df = merged_df.join(df, on=["time", "lat", "lon"])
        except Exception as e:
            print(f"Error merging dataframe: {str(e)}")
            raise

    return merged_df

In [3]:
# Define the paths to your Parquet files
parquet_files = [
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_01mn.parquet",
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_01wk.parquet",
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_02mn.parquet",
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_02wk.parquet",
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_03mn.parquet",
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_06mn.parquet",
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_09mn.parquet",
    "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_12mn.parquet"
]

merged_df = merge_parquet_files(parquet_files)

In [None]:
merged_df

time,lat,lon,eddi_01mn,eddi_01wk,eddi_02mn,eddi_02wk,eddi_03mn,eddi_06mn,eddi_09mn,eddi_12mn
date,f64,f64,f32,f32,f32,f32,f32,f32,f32,f32
1981-01-01,22.0,23.0,0.546651,-0.545979,-0.232805,0.546651,-0.545979,-0.479941,-0.115403,-0.115403
1981-01-01,22.0,23.5,0.416576,-0.686047,-0.17379,0.48059,-0.545979,-0.479941,-0.115403,-0.057442
1981-01-01,22.0,24.0,0.416576,-0.761296,-0.17379,0.48059,-0.545979,-0.614502,-0.115403,-0.115403
1981-01-01,22.0,24.5,0.416576,-0.686047,-0.17379,0.416576,-0.479941,-0.686047,-0.115403,-0.17379
1981-01-01,22.0,25.0,0.416576,-0.614502,-0.17379,0.546651,-0.415945,-0.686047,-0.232805,-0.17379
…,…,…,…,…,…,…,…,…,…,…
2024-04-29,-11.5,49.5,-999.0,-999.0,-999.0,-999.0,-999.0,-999.0,-999.0,-999.0
2024-04-29,-11.5,50.0,-999.0,-999.0,-999.0,-999.0,-999.0,-999.0,-999.0,-999.0
2024-04-29,-11.5,50.5,-999.0,-999.0,-999.0,-999.0,-999.0,-999.0,-999.0,-999.0
2024-04-29,-11.5,51.0,-999.0,-999.0,-999.0,-999.0,-999.0,-999.0,-999.0,-999.0


In [4]:
# import polars as pl
# # Define the path to your Parquet file
# parquet_file_path = "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data_02wk.parquet"
# # parquet_file_path = "/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data.parquet"


# # Use Polars to read the Parquet file
# df = pl.read_parquet(parquet_file_path)
# df

In [None]:
output_file_path = '/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/merged_eddi_data.parquet'

In [None]:
merged_df.write_parquet(output_file_path)

In [66]:
def regrid_polars_df(df, start_lat=-12.0, start_lon=23.125, new_lat_res=0.5, new_lon_res=0.625):
    """
    Regrids a Polars DataFrame with latitude and longitude to a common specified grid, starting from specified latitude and longitude.

    Parameters:
    - df (pl.DataFrame): Polars DataFrame to be regridded.
    - start_lat (float): Starting latitude for the grid.
    - start_lon (float): Starting longitude for the grid.
    - new_lat_res (float): The new resolution for latitude.
    - new_lon_res (float): The new resolution for longitude.

    Returns:
    - pl.DataFrame: A DataFrame containing the regridded data.

    Example of usage:
    regridded_df = regrid_polars_df(polars_df)
    """
    # Calculate global min and max of latitude and longitude
    lat_min = df.select(pl.min('lat')).to_numpy().item()
    lat_max = df.select(pl.max('lat')).to_numpy().item()
    lon_min = df.select(pl.min('lon')).to_numpy().item()
    lon_max = df.select(pl.max('lon')).to_numpy().item()

    # Adjust the start points if they are outside the current data range
    start_lat = max(lat_min, start_lat)
    start_lon = max(lon_min, start_lon)

    # Create common latitude and longitude grids starting from the specified points
    common_lat = np.arange(start_lat, lat_max + new_lat_res, new_lat_res)
    common_lon = np.arange(start_lon, lon_max + new_lon_res, new_lon_res)

    # Function to find nearest grid point
    def find_nearest(array, value):
        idx = (np.abs(array - value)).argmin()
        return array[idx]

    # Map each lat and lon in the DataFrame to the nearest grid point
    df = df.with_columns([
        df['lat'].map_elements(lambda x: find_nearest(common_lat, x), return_dtype=float).alias('new_lat'),
        df['lon'].map_elements(lambda x: find_nearest(common_lon, x), return_dtype=float).alias('new_lon')
    ]).drop(['lat', 'lon']).rename({'new_lat': 'lat', 'new_lon': 'lon'})

    # Optional: Aggregate data if necessary
    df = df.group_by(['time', 'lat', 'lon']).agg(pl.col('eddi').mean().alias('eddi'))

    # Sort by time, latitude, and longitude for better organization
    df = df.sort(['time', 'lat', 'lon'])

    return df

# Usage example (assuming df is your DataFrame loaded with data)
regridded_df = regrid_polars_df(eddi_clean_df)


In [67]:
regridded_df

time,lat,lon,eddi
datetime[μs],f64,f64,f32
2001-01-01 00:00:00,-12.0,23.125,-0.761296
2001-01-01 00:00:00,-12.0,23.75,-0.88392
2001-01-01 00:00:00,-12.0,24.375,-0.926714
2001-01-01 00:00:00,-12.0,25.0,-1.019687
2001-01-01 00:00:00,-12.0,25.625,-1.019687
…,…,…,…
2024-04-13 00:00:00,21.5,38.75,0.567209
2024-04-13 00:00:00,21.5,39.375,0.437543
2024-04-13 00:00:00,21.5,40.0,0.199132
2024-04-13 00:00:00,21.5,43.125,-0.500068


In [68]:
regridded_df.write_parquet("/workspace/ml-drought-forecasting/ml-modeling-pipeline/data/01_raw/eddi.parquet")