# Download/Pre-process CFS forecast data
Lindsay Fitzpatrick
ljob@umich.edu
08/28/2024

This script:
1. Downloads CFS forecast data from the AWS as grib2 files. 
2. Opens the grib2 files, calculates total basin, lake, and land, precipitation, evaporation, and average 2m air temperature. 
3. These calculations are then added to the CSV files. 

This script needs the following files:

- GL_mask.nc
- CFS_EVAP_forecasts_Sums_CMS.csv
- CFS_PCP_forecasts_Sums_CMS.csv
- CFS_TMP_forecasts_Avgs_K.csv

In [1]:
from datetime import datetime, timedelta
import os
import sys
import boto3
from botocore import UNSIGNED
from botocore.config import Config
import cfgrib
import pandas as pd
import netCDF4 as nc
import numpy as np
import shutil

## User Inputs

In [2]:
# Path to download data to
# dir = 'C:/Users/fitzpatrick/Desktop/'
dir = './'
# Location of the mask file
mask_file = dir + 'input/GL_mask.nc'

# Location of existing CSV files or path/name to new CSV files
tmp_csv = dir + 'data/CFS_TMP_forecasts_Avgs_K.csv'
evap_csv = dir + 'data/CFS_EVAP_forecasts_Sums_CMS.csv'
pcp_csv = dir + 'data/CFS_PCP_forecasts_Sums_CMS.csv'

# IF YOU ARE CREATING NEW CSV FILES:
# Then you need to define the start and end dates
# IF YOU ARE ADDING TO EXISTING CSV FILES:
# Then these dates will be ignored and the script will automatically pull
# the last date from the existing CSV files and continue the forecast from there.
start_date = '2024-08-31'
end_date = '2024-08-31'

Presets

These shouldn't change unless the location changes for CFS data or the user wants different files (products specifies the prefix of the files. Different files contain different variables) or a specific forecast (utc specifies the forecast time).

In [3]:
## Presets ##
products = ['pgb','flx']
utc = ['00','06','12','18']

# Define mask variables
mask_variables = ['eri_basin','eri_lake','eri_land',
                 'hur_basin','hur_lake','hur_land',
                 'ont_basin','ont_lake','ont_land',
                 'mic_basin','mic_lake','mic_land',
                 'sup_basin','sup_lake','sup_land']

#AWS bucket name to locate the CFS forecast
bucket_name = 'noaa-cfs-pds'

## Define Functions

This function goes to the AWS site and downloads the needed CFS files for a given forecast day.

In [4]:
def download_grb2_aws(product, bucket_name, folder_path, download_dir):
    """
    Download the CFS forecast from AWS

    Parameters:
    - product: 'flx' or 'pgb'
    - bucket_name: for CFS data it is 'noaa-cfs-pds'
    - folder_path: the url path to data
    - download_dir: location to download data to
    """
    num_files_downloaded = 0

    # Create a boto3 client for S3
    s3_config = Config(signature_version=UNSIGNED)
    s3 = boto3.client('s3', config=s3_config)

    # List all objects in the specified folder path
    continuation_token = None
    objects = []

    # Use a loop to handle pagination
    while True:
        list_objects_args = {'Bucket': bucket_name, 'Prefix': folder_path}
        if continuation_token:
            list_objects_args['ContinuationToken'] = continuation_token

        list_objects_response = s3.list_objects_v2(**list_objects_args)

        objects.extend(list_objects_response.get('Contents', []))

        if not list_objects_response.get('IsTruncated', False):
            break

        continuation_token = list_objects_response.get('NextContinuationToken')

    # Iterate over each object and download if it ends with '.grb2'
    for obj in objects:
        key = obj['Key']
        if product in key and key.endswith('grib.grb2'): #if key.endswith('.grb2'):
            local_file_path = os.path.join(download_dir, os.path.relpath(key, folder_path))

            # Ensure the directory structure exists
            os.makedirs(os.path.dirname(local_file_path), exist_ok=True)

            # Download the file
            s3.download_file(bucket_name, key, local_file_path)
            num_files_downloaded += 1

            print(f"Downloaded: {key}")

In [5]:
def initialize_dataframes(tmp_csv, evap_csv, pcp_csv):
    """
    Initialize new DataFrames if CSV files do not exist.
    """
    if os.path.exists(tmp_csv):
        df_tmp_forecasts = pd.read_csv(tmp_csv)
    else:
        df_tmp_forecasts = pd.DataFrame(columns=['cfs_run', 'forecast_year', 'forecast_month'] + mask_variables)
    
    if os.path.exists(evap_csv):
        df_evap_forecasts = pd.read_csv(evap_csv)
    else:
        df_evap_forecasts = pd.DataFrame(columns=['cfs_run', 'forecast_year', 'forecast_month'] + mask_variables)
    
    if os.path.exists(pcp_csv):
        df_pcp_forecasts = pd.read_csv(pcp_csv)
    else:
        df_pcp_forecasts = pd.DataFrame(columns=['cfs_run', 'forecast_year', 'forecast_month'] + mask_variables)
    
    return df_tmp_forecasts, df_evap_forecasts, df_pcp_forecasts

Function to grab a specific list of files based on the prefix or suffix of a file (ie. 'pgb', '.grb2')

In [6]:
def get_files(directory, affix, identifier):
    """
    Get a list of all GRIB2 files in the specified directory.

    Parameters:
    - directory (str): Path to the directory containing files.
    - affix (str): 'prefix' or 'suffix'
    - identifier (str):  (ie. 'pgb', 'flx', '.grb2', or '.nc')
    Returns:
    - List of file paths to the GRIB2 files.
    """
    files = []
    for file_name in os.listdir(directory):
        if affix == 'suffix': # ends with
            if file_name.endswith(identifier):
                file_path = os.path.join(directory, file_name)
                files.append(file_path)
        elif affix == 'prefix': # begins with
            if file_name.startswith(identifier):
                file_path = os.path.join(directory, file_name)
                files.append(file_path)
    return files

Function to delete the directory with CFS grb2 files because they are not needed after calculations are saved in the CSVs.

In [7]:
def delete_directory(directory_path):
    # Check if the directory exists
    if not os.path.isdir(directory_path):
        print(f"The directory {directory_path} does not exist.")
        return
    try:
        # Remove the entire directory tree
        shutil.rmtree(directory_path)
        print(f"Successfully deleted the directory and all its contents: {directory_path}")
    except Exception as e:
        print(f"Error deleting {directory_path}: {e}")

Function to calculate the grid cell areas [m2] based on the mask file. This is needed to calculate total precipitation and evaporation because the units are [kg/m2].

In [8]:
def calculate_grid_cell_areas(lon, lat):
    # Calculate grid cell areas
    # Assuming lat and lon are 1D arrays
    # Convert latitude to radians

    R = 6371000.0  # Radius of Earth in meters
    lat_rad = np.radians(lat)

    # Calculate grid cell width in radians
    dlat = np.radians(lat[1] - lat[0])
    dlon = np.radians(lon[1] - lon[0])

    # Calculate area of each grid cell in square kilometers
    area = np.zeros((len(lat), len(lon)))
    for i in range(len(lat)):
        for j in range(len(lon)):
            area[i, j] = R**2 * dlat * dlon * np.cos(lat_rad[i])

    return area

Function to calculate evaporation based on the 2m air temperature and latent heat flux.

In [9]:
# ET = kg/(m^2*time^1) or 1 mm
# LE = MJ/(M^2*time^1)
# λ  = MJ/kg

# Latent heat of vaporization varies slightly with temperature. Allen et al. (1998) provides an equation 
# for calculating λ with air  temperature variation. Temperature in this case must be in degrees Celcius.

# λ=2.501−(2.361×10−3)×Temp Celcius

# so for our data with Temp in Kelvin...

# λ=2.501−((2.361×10−3)×(Temp-273.15))

# Our variable_lhf is in W/m^2 or J/(m^2*time^1). In order to convert to MJ we must multiply by 10^-6 or 
# 0.000001. Now we have lamba and variable_lhf both in terms of MJ.

# Equation below will provide an evaporation rate in kg/m2 per s. 

def calculate_evaporation(temperature_K, latent_heat):
    lamda=(2.501-(0.002361*(temperature_K-273.15)))
    evaporation_rate=((latent_heat)*0.000001)/lamda

    return evaporation_rate # kg/m2 per s

Function to open each of the grib2 files and calculate the total precipitation, total evaporation, and average 2m air temperature over an entire basin, land, or lake for each of the Great Lakes. This uses the mask file to calculate each of these.

In [10]:
def process_grib_files(download_dir, df_tmp_forecasts, df_evap_forecasts, df_pcp_forecasts, mask_lat, mask_lon, mask_ds, mask_variables, area, calculate_evaporation):
    # Find all the .grb2 files in the directory
    file_list = get_files(download_dir, 'suffix', '.grb2')

    for grib2_file in file_list:

        filename = os.path.basename(grib2_file)
        parts = filename.split('.')
        cfs_run = parts[2]
        date_part = parts[3]  # Assuming parts[2] is in the format YYYYMM
        forecast_year = date_part[:4]
        forecast_month = date_part[4:6]

        # Handle tmp_forecasts DataFrame indexing
        index_tmp = len(df_tmp_forecasts) if not df_tmp_forecasts.empty else 0
        index_evap = len(df_evap_forecasts) if not df_evap_forecasts.empty else 0
        index_pcp = len(df_pcp_forecasts) if not df_pcp_forecasts.empty else 0

        # Check if the DataFrames need to be aligned before adding data
        #assert index_tmp == index_evap == index_pcp, f"Index mismatch: tmp({index_tmp}), evap({index_evap}), pcp({index_pcp})"

        if filename.startswith('flxf'):

            # Open the flx file at the 2m level to pull the 2m air temperature
            flx_2mabove = cfgrib.open_dataset(grib2_file, engine='cfgrib', filter_by_keys={'typeOfLevel': 'heightAboveGround', 'level': 2})
            df_tmp_forecasts.loc[index_tmp, 'cfs_run'] = cfs_run
            df_tmp_forecasts.loc[index_tmp, 'forecast_year'] = forecast_year
            df_tmp_forecasts.loc[index_tmp, 'forecast_month'] = forecast_month
            mean2t = flx_2mabove['mean2t']

            # Cut the variable to the mask domain
            mean2t_cut = mean2t.sel(
                latitude=slice(mask_lat.max(), mask_lat.min()),
                longitude=slice(mask_lon.min(), mask_lon.max())
            )
            # Remap and upscale the variable to match the mask domain
            mean2t_remap = mean2t_cut.interp(latitude=mask_lat, longitude=mask_lon, method='linear')
            
            # Calculate mean2t for each of the mask variables (i.e., eri_lake, eri_basin, etc.)
            for mask_var in mask_variables:

                mask = mask_ds.variables[mask_var][:]
                # Take the mean over the mask area
                tmp_avg = np.mean(mean2t_remap * mask)

                df_tmp_forecasts.loc[index_tmp, mask_var] = tmp_avg.data

            ###############################################################################

            # Open the flx file again but at the surface level to pull the latent heat flux
            flx_surface = cfgrib.open_dataset(grib2_file, engine='cfgrib', filter_by_keys={'typeOfLevel': 'surface'})
            df_evap_forecasts.loc[index_evap, 'cfs_run'] = cfs_run
            df_evap_forecasts.loc[index_evap, 'forecast_year'] = forecast_year
            df_evap_forecasts.loc[index_evap, 'forecast_month'] = forecast_month
            mslhf = flx_surface['mslhf']
            
            # Cut the variable to the mask domain
            mslhf_cut = mslhf.sel(
                latitude=slice(mask_lat.max(), mask_lat.min()),
                longitude=slice(mask_lon.min(), mask_lon.max())
            )
            # Remap and upscale the variable to match the mask domain
            mslhf_remap = mslhf_cut.interp(latitude=mask_lat, longitude=mask_lon, method='linear')
            
            # Calculate evaporation across the entire domain using air temp and latent heat flux
            evap = calculate_evaporation(mean2t_remap, mslhf_remap)
            
            # Calculate evaporation for each of the mask variables (i.e., eri_lake, eri_basin, etc.)
            for mask_var in mask_variables:
                
                mask = mask_ds.variables[mask_var][:]
                total_evap = (np.sum(evap * area * mask)) # Converts kg/s/m2 to kg/s
                # Convert kg/s to m³/s (assuming density of water ≈ 1000 kg/m³)
                evap_cms = total_evap / 1000.0

                df_evap_forecasts.loc[index_evap, mask_var] = evap_cms.data
                
            index_tmp += 1
            index_evap += 1

        ###############################################################################

        elif filename.startswith('pgbf'):

            # Open the pgb file at the surface level to pull the precipitation
            pgb_surface = cfgrib.open_dataset(grib2_file, engine='cfgrib', filter_by_keys={'typeOfLevel': 'surface'})
            df_pcp_forecasts.loc[index_pcp, 'cfs_run'] = cfs_run
            df_pcp_forecasts.loc[index_pcp, 'forecast_year'] = forecast_year
            df_pcp_forecasts.loc[index_pcp, 'forecast_month'] = forecast_month
            pcp = pgb_surface['tp']  # Total precipitation
            
            # Cut the variable to the mask domain
            pcp_cut = pcp.sel(
                latitude=slice(mask_lat.max(), mask_lat.min()),
                longitude=slice(mask_lon.min(), mask_lon.max())
            )
            # Remap and upscale the variable to match the mask domain
            pcp_remap = pcp_cut.interp(latitude=mask_lat, longitude=mask_lon, method='linear')
            
            for mask_var in mask_variables:
                mask = mask_ds.variables[mask_var][:]
                
                # Convert precipitation from kg/m² per 6 hours to kg/m² per second
                pcp_per_s = pcp_remap / 21600.0 # seconds in 6hrs
                total_pcp_kg_per_s = (np.sum(pcp_per_s * area * mask)) # kg/s

                # Convert kg/s to m³/s (assuming density of water ≈ 1000 kg/m³)
                total_pcp_cms = total_pcp_kg_per_s / 1000.0
                df_pcp_forecasts.loc[index_pcp, mask_var] = total_pcp_cms.data
            
            index_pcp += 1

        print(f'Done with {filename}')

## Begin Script

Open the mask file. Pull the latitude and longitude to be used to cut the global variable down to just the Great Lakes domain and upscale. Also calculates area of each of the grid cells.

In [11]:
# Open existing CSVs or create empty dataframes to save to new CSVs
df_tmp_forecasts, df_evap_forecasts, df_pcp_forecasts = initialize_dataframes(tmp_csv, evap_csv, pcp_csv)

# If we are starting a new CSV, then user must input dates above to pull data
if df_tmp_forecasts.empty:
    print("Creating new files.")
    start_date = datetime.strptime(start_date, "%Y-%m-%d") # User input above
    end_date = datetime.strptime(end_date, "%Y-%m-%d") # User input above
else:
    # If we are adding to an existing CSV, then pull the last date from the CSV
    # and continue from there
    last_cfs = df_tmp_forecasts['cfs_run'].astype(str).iloc[-1][:8]
    start_date = datetime.strptime(last_cfs, '%Y%m%d') + timedelta(days=1)
    # Pull all the forecasts days up to yesterday (the most complete forecast)
    end_date = datetime.now() - timedelta(days=1)

# Check if start_date is equal to or after end_date
if start_date > end_date:
    print("The files are up-to-date.")
    sys.exit()  # Stop the script

print(f"Starting from: {start_date.strftime('%Y-%m-%d')} and continuing through: {end_date.strftime('%Y-%m-%d')}")

# Create a date range
date_range = pd.date_range(start=start_date, end=end_date)
# Convert to integer format YYYYMMDD
dates_array = date_range.strftime('%Y%m%d').astype(int)

Starting from: 2024-11-13 and continuing through: 2024-11-13


In [12]:
# Open the mask file and calculate the grid cell areas
mask_ds = nc.Dataset(mask_file)
mask_lat = mask_ds.variables['latitude'][:]
mask_lon = mask_ds.variables['longitude'][:]
area = calculate_grid_cell_areas(mask_lon, mask_lat)

Begin loop to go through the user input dates. Loop creates a directory to download the CFS grib files, runs through the download_grb2_aws funtion to download and then run through the process_grib_files to do the calculations. It then saves the calculations to the CSV files, deletes the grib2 files and moves on to the next date.

In [13]:
for date in dates_array:
    print(f"Beginning {date}.")
    download_dir = f'{dir}{date}/CFS/'
    if not os.path.exists(download_dir):
        os.makedirs(download_dir)

    # Uses AWS to download the grib2 files
    for utc_time in utc:
        for product in products:
            folder_path = f'cfs.{date}/{utc_time}/monthly_grib_01/'
            download_grb2_aws(product, bucket_name, folder_path, download_dir)

    process_grib_files(download_dir, df_tmp_forecasts, df_evap_forecasts, df_pcp_forecasts, mask_lat, mask_lon, mask_ds, mask_variables, area, calculate_evaporation)   
    
    # Save the updated DataFrames to CSV files
    df_tmp_forecasts.to_csv(tmp_csv, sep=',', index=False)
    df_evap_forecasts.to_csv(evap_csv, sep=',', index=False)
    df_pcp_forecasts.to_csv(pcp_csv, sep=',', index=False)

    # Delete downloaded grib2 files
    #delete_directory(download_dir)
    
    print(f"Done with {date}.")

Beginning 20241113.
Downloaded: cfs.20241113/00/monthly_grib_01/pgbf.01.2024111300.202411.avrg.grib.grb2
Downloaded: cfs.20241113/00/monthly_grib_01/pgbf.01.2024111300.202412.avrg.grib.grb2
Downloaded: cfs.20241113/00/monthly_grib_01/pgbf.01.2024111300.202501.avrg.grib.grb2
Downloaded: cfs.20241113/00/monthly_grib_01/pgbf.01.2024111300.202502.avrg.grib.grb2
Downloaded: cfs.20241113/00/monthly_grib_01/pgbf.01.2024111300.202503.avrg.grib.grb2
Downloaded: cfs.20241113/00/monthly_grib_01/pgbf.01.2024111300.202504.avrg.grib.grb2
Downloaded: cfs.20241113/00/monthly_grib_01/pgbf.01.2024111300.202505.avrg.grib.grb2
Downloaded: cfs.20241113/00/monthly_grib_01/pgbf.01.2024111300.202506.avrg.grib.grb2
Downloaded: cfs.20241113/00/monthly_grib_01/pgbf.01.2024111300.202507.avrg.grib.grb2
Downloaded: cfs.20241113/00/monthly_grib_01/pgbf.01.2024111300.202508.avrg.grib.grb2
Downloaded: cfs.20241113/00/monthly_grib_01/flxf.01.2024111300.202411.avrg.grib.grb2
Downloaded: cfs.20241113/00/monthly_grib_01/f

  df_tmp_forecasts.loc[index_tmp, 'cfs_run'] = cfs_run
  df_tmp_forecasts.loc[index_tmp, 'forecast_year'] = forecast_year
  df_tmp_forecasts.loc[index_tmp, 'forecast_month'] = forecast_month
  df_evap_forecasts.loc[index_evap, 'cfs_run'] = cfs_run
  df_evap_forecasts.loc[index_evap, 'forecast_year'] = forecast_year
  df_evap_forecasts.loc[index_evap, 'forecast_month'] = forecast_month


4211 2024111300 2024 11
4211 2024111300 2024 11
4211 2024111300 2024 11
4211 2024111300 2024 11
4211 2024111300 2024 11
4211 2024111300 2024 11
4211 2024111300 2024 11
4211 2024111300 2024 11
4211 2024111300 2024 11
4211 2024111300 2024 11
4211 2024111300 2024 11
4211 2024111300 2024 11
4211 2024111300 2024 11
4211 2024111300 2024 11
4211 2024111300 2024 11
Done with flxf.01.2024111300.202411.avrg.grib.grb2
4212 2024111300 2024 12
4212 2024111300 2024 12
4212 2024111300 2024 12
4212 2024111300 2024 12
4212 2024111300 2024 12
4212 2024111300 2024 12
4212 2024111300 2024 12
4212 2024111300 2024 12
4212 2024111300 2024 12
4212 2024111300 2024 12
4212 2024111300 2024 12
4212 2024111300 2024 12
4212 2024111300 2024 12
4212 2024111300 2024 12
4212 2024111300 2024 12
Done with flxf.01.2024111300.202412.avrg.grib.grb2
4213 2024111300 2025 01
4213 2024111300 2025 01
4213 2024111300 2025 01
4213 2024111300 2025 01
4213 2024111300 2025 01
4213 2024111300 2025 01
4213 2024111300 2025 01
4213 20241

  df_pcp_forecasts.loc[index_pcp, 'cfs_run'] = cfs_run
  df_pcp_forecasts.loc[index_pcp, 'forecast_year'] = forecast_year
  df_pcp_forecasts.loc[index_pcp, 'forecast_month'] = forecast_month


4211 2024111300 2024 11 eri_basin 4176.939915450957
4211 2024111300 2024 11 eri_lake 700.7214005868984
4211 2024111300 2024 11 eri_land 3979.326887141073
4211 2024111300 2024 11 hur_basin 5271.191430716874
4211 2024111300 2024 11 hur_lake 1243.3170682140974
4211 2024111300 2024 11 hur_land 4186.8785053874335
4211 2024111300 2024 11 ont_basin 2884.279557942299
4211 2024111300 2024 11 ont_lake 338.8964169074853
4211 2024111300 2024 11 ont_land 2955.880780867348
4211 2024111300 2024 11 mic_basin 4637.692595367197
4211 2024111300 2024 11 mic_lake 1410.540252964442
4211 2024111300 2024 11 mic_land 3575.6782113188215
4211 2024111300 2024 11 sup_basin 4055.922609337765
4211 2024111300 2024 11 sup_lake 1011.0971491189979
4211 2024111300 2024 11 sup_land 3353.0386568671165
Done with pgbf.01.2024111300.202411.avrg.grib.grb2
4212 2024111300 2024 12 eri_basin 5277.023560390684
4212 2024111300 2024 12 eri_lake 819.6792312083737
4212 2024111300 2024 12 eri_land 5211.25239683757
4212 2024111300 2024 

Close any open files before finishing script.

In [14]:
mask_ds.close()