In [None]:
import pandas as pd
import numpy as np
from pyart.io import read_nexrad_archive
from datetime import datetime, timedelta
import boto3
from scipy.spatial import cKDTree
from concurrent.futures import ThreadPoolExecutor, as_completed
import bisect
import asyncio
from aiobotocore.session import get_session

def get_file_time(filename, date):
    """
        Purpose: Gets the time of a given nexrad file and returns it as a datetime
        Arguments: 
            filename: The filename for a given nexrad file, will have _HHMMSS_
                in it to parse into the datetime
            dt: The date containing the day, month, and year of the nexrad file
        Returns: A datetime representing when this pirep was created
    """
    # Example filenames below
    "YEAR/MONTH/DAY/SITE_CODE/{SITE_CODE}{YEAR}{MONTH}{DAY}_{HHMMSS}_VO6"
    "2024/12/05/KAKQ/KAKQ20241205_001256_V06"
    # Split to get just the {HHMMSS} part of the string and put these values in a datetime
    filetime = filename['Key'].split("_")[1]
    hour = int(filetime[:2])
    minute = int(filetime[2:4])
    second = int(filetime[4:])
    return datetime(year=date.year, month=date.month, day=date.day, hour=hour, minute=minute, second=second)


In [None]:
def get_pireps(filename: str) -> pd.DataFrame:
    """
        Purpose: 
           Gets the pireps we'd like to get the nexrad data for
        Arguments:
            filename: The filename for the file with the pireps we want
        Return: A pandas df of the pireps to find the nexrad data for
    """
    return pd.read_csv(filename)


In [None]:
# Define a helper function to find the closest sites
def find_5_closest_sites(row, nexrad_tree, site_codes):
    pirep_coord = np.radians([row['LAT'], row['LON']])
    distances, indices = nexrad_tree.query(pirep_coord, k=5)    
    return tuple(site_codes[indices])

In [None]:
pireps_df = get_pireps("clean_pirep_data/2025/01_turb_pireps.csv")
print(pireps_df[['LAT', 'LON']].head(5))

In [None]:
def get_closest_sites(pireps_df):
    """
    
    """
    nexrad_sites = pd.read_csv("nexrad_sites.csv")
    nexrad_coords = nexrad_sites[['Latitude', 'Longitude']].to_numpy()
    # Using radians here allows the cKDTree to treat as Euclidean which works pretty well
    nexrad_tree = cKDTree(np.radians(nexrad_coords))

    site_codes = nexrad_sites['Site Code'].to_numpy()
    # Apply the helper function to find closest sites
    pireps_df['nexrad_sites'] = pireps_df.apply(find_5_closest_sites, axis=1, args=(nexrad_tree, site_codes))
    return pireps_df

In [None]:
pireps_df_w_sites = get_closest_sites(pireps_df)
print(pireps_df_w_sites[['LAT', 'LON', 'nexrad_sites']].head(5))

In [None]:
async def s3_list_nexrad_files(date, site, session):
    """
    Function to process a single (date, site) pair.
    """
    prefix = f"{date.year}/{date.month:02}/{date.day:02}/{site}"
    async with session.create_client('s3', region_name='us-east-1') as s3:
        response = await s3.list_objects_v2(Bucket='noaa-nexrad-level2', Prefix=prefix)
        files = response.get("Contents", [])
        return ((date, site), [get_file_time(file, date) for file in files]) if files else ((date, site), [])

In [None]:
async def batch_list_nexrad_times(unique_requests):
    """
    Batch fetches available NEXRAD file times for multiple sites and datetimes.
    """
    results = {}
    session = get_session()
    tasks = [s3_list_nexrad_files(date, site, session) for date, site in unique_requests]
    

    # As tasks complete, collect results
    for count, task in enumerate(asyncio.as_completed(tasks), start=1):
        (date, site), result = await task
        results[(date, site)] = result

    return results


In [None]:
def generate_unique_requests(pireps_df):
    unique_requests = {(day, site) 
                       for sites, dt in zip(pireps_df['nexrad_sites'], pireps_df['datetime']) 
                       for site in sites 
                       for day in {dt.date(), (dt - timedelta(days=1)).date(), (dt + timedelta(days=1)).date()}}
    print(f"About to perform {len(unique_requests)} list_objects_v2 requests")
    return unique_requests
    

In [None]:
pireps_df['datetime'] = pd.to_datetime(pireps_df['datetime'])
unique_requests = generate_unique_requests(pireps_df)

In [None]:
# Takes around 5 minutes to process
nexrad_times_dict = await batch_list_nexrad_times(unique_requests)
print("Processing complete")
print(len(nexrad_times_dict))
print(nexrad_times_dict)

In [None]:
def nearest_time(times: list, pirep_dt: datetime) -> datetime:
    """
        Purpose: Returns the nearest time to pirep_time in times
        Arguments:
            times: A list of all times of nexrad files for the previous day
                current day, and next day
            pirep_time: The actual time of the pirep
        Returns: The nearest time to pirep_time that a nexrad data file was
            generated at

    """
    idx = bisect.bisect_left(times, pirep_dt)
    if idx >= len(times) - 1:
        return times[-1]
    prev_time = times[idx]
    next_time = times[idx + 1]
    return prev_time # Just return prev_time - OLD: if pirep_dt - prev_time <= next_time - pirep_dt else next_time

In [None]:
def get_closest_nexrad_files(pireps_df):
    
    all_radars = []
    missing = 0
    for index, pirep in pireps_df.iterrows():
        pirep_dt = pirep['datetime']
        radars = list()
        for site_code in pirep['nexrad_sites']:
            # times will store all possible times of nexrad files nearby
            times = list()

            # When we're close to another day, add that day's times too (in order)
            if (pirep_dt - timedelta(minutes=30)).day != pirep_dt.day:
                times += nexrad_times_dict[((pirep_dt - timedelta(minutes=30)).date(), site_code)]
            times += nexrad_times_dict[(pirep_dt.date(), site_code)]
            if (pirep_dt + timedelta(minutes=30)).day != pirep_dt.day:
                times += nexrad_times_dict[((pirep_dt + timedelta(minutes=30)).date(), site_code)]
            
            if len(times) == 0:
                missing += 1
            else:
                nexrad_dt = nearest_time(times, pirep_dt)

                prefix=f"{nexrad_dt.year}/{nexrad_dt.month:02}/{nexrad_dt.day:02}/{site_code}"
                aws_nexrad_level2_file = f"s3://noaa-nexrad-level2/{prefix}/{site_code}{nexrad_dt.year}{nexrad_dt.month:02}{nexrad_dt.day:02}_{nexrad_dt.hour:02}{nexrad_dt.minute:02}{nexrad_dt.second:02}_V06"
                radars.append(aws_nexrad_level2_file)
            # break # Break to only add the closest NEXRAD file

        all_radars.append(radars)
    print(f"There were {missing} sites missing data")
    pireps_df['aws_files'] = all_radars

In [None]:
get_closest_nexrad_files(pireps_df)
# Output the df to csv for ease of access
pireps_df.to_csv("pireps_w_closest_radars.csv", index=False)
pireps_df[['datetime', 'nexrad_sites', 'aws_files']]