Installing packages and libraries to set up environment. Had some problems installing certain packages, especially xarray, so doing so one by one for the most part to make sure they install successfully, and can troubleshoot specific libraries if I run into problems.

In [0]:
%pip install numpy==1.24.4

Python interpreter will be restarted.
Collecting numpy==1.24.4
  Downloading numpy-1.24.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (17.3 MB)
Installing collected packages: numpy
  Attempting uninstall: numpy
    Found existing installation: numpy 1.21.5
    Not uninstalling numpy at /databricks/python3/lib/python3.9/site-packages, outside environment /local_disk0/.ephemeral_nfs/envs/pythonEnv-15f369a0-800c-49da-967e-bc2e05bb9fd5
    Can't uninstall 'numpy'. No files were found to uninstall.
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
scipy 1.7.3 requires numpy<1.23.0,>=1.16.5, but you have numpy 1.24.4 which is incompatible.
Successfully installed numpy-1.24.4
Python interpreter will be restarted.


In [0]:
%pip install xarray==2023.12.0 --no-deps





Python interpreter will be restarted.
Collecting xarray==2023.12.0
  Downloading xarray-2023.12.0-py3-none-any.whl (1.1 MB)
Installing collected packages: xarray
Successfully installed xarray-2023.12.0
Python interpreter will be restarted.


In [0]:
%pip install s3fs==2023.6.0

Python interpreter will be restarted.
Collecting s3fs==2023.6.0
  Downloading s3fs-2023.6.0-py3-none-any.whl (28 kB)
Collecting aiohttp!=4.0.0a0,!=4.0.0a1
  Downloading aiohttp-3.11.17-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.6 MB)
Collecting aiobotocore~=2.5.0
  Downloading aiobotocore-2.5.4-py3-none-any.whl (73 kB)
Collecting fsspec==2023.6.0
  Downloading fsspec-2023.6.0-py3-none-any.whl (163 kB)
Collecting botocore<1.31.18,>=1.31.17
  Downloading botocore-1.31.17-py3-none-any.whl (11.1 MB)
Collecting wrapt<2.0.0,>=1.10.10
  Downloading wrapt-1.17.2-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (82 kB)
Collecting aioitertools<1.0.0,>=0.5.1
  Downloading aioitertools-0.12.0-py3-none-any.whl (24 kB)
Collecting frozenlist>=1.1.1
  Downloading frozenlist-1.6.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (288 kB)
Collecting multidict<7.0,>=4.5
  Downloading multidict-6

In [0]:
%pip install h5netcdf

Python interpreter will be restarted.
Collecting h5netcdf
  Downloading h5netcdf-1.6.1-py3-none-any.whl (49 kB)
Collecting h5py
  Downloading h5py-3.13.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.6 MB)
Installing collected packages: h5py, h5netcdf
Successfully installed h5netcdf-1.6.1 h5py-3.13.0
Python interpreter will be restarted.


In [0]:
%pip install cftime


Python interpreter will be restarted.
Collecting cftime
  Downloading cftime-1.6.4.post1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
Installing collected packages: cftime
Successfully installed cftime-1.6.4.post1
Python interpreter will be restarted.


In [0]:
import pandas as pd
import time
import os



In [0]:
import xarray as xr
import s3fs
import numpy as np
from datetime import datetime, timedelta

print(np.__version__)
print(xr.__version__)


1.24.4
2023.12.0


Setting some parameters. The s3 bucket name. the start and end date for the files that have been pulled into the s3 bucket. Then creating an array of the dates for use in our for loop.

In [0]:
# S3 config
bucket = "s3-dle-930613e9-20bb-49c5-a6ae-0c9aab0ecba3" # copy over the name of the s3 bucket that you created and moved the files to.
s3 = s3fs.S3FileSystem(anon=True)

# Date range
start = datetime.strptime("20240101", "%Y%m%d") #20240101 - cluster terminated after saving file 20240126
end = datetime.strptime("20240131", "%Y%m%d")
dates = [(start + timedelta(days=i)).strftime("%Y%m%d") for i in range((end - start).days + 1)]

print(dates)


['20240118', '20240119', '20240120', '20240121', '20240122', '20240123', '20240124', '20240125', '20240126', '20240127', '20240128', '20240129', '20240130', '20240131']


Setting the location of where we are putting the files we pull in from the s3 bucket. The four loop opens the files by data, then filters the contents of the files to only select data that falls within certain latitudes and longitudes that encompass Minnesota, as well as parts of Minnesota's neighbors. Files are saved as parquet files and the log of successfully saved files, failures, and missing files will appear in the output. This is very beneficial as my cluster typically would time out before all 31 files where read in and saved. In those cases I would go back to the section above, change the dates to only pull in the files that did not process, then run this section again.

In [0]:
# Going to filter the data to Minnesota
output_dir = "/dbfs/" 
os.makedirs(output_dir, exist_ok=True)

for date in dates:
    path = f"{bucket}/{date}.gfs.t12z.sfcf001.nc"
    print(f" Processing {date}...")

    try:
        if s3.exists(path):
            with s3.open(path, mode="rb") as f:
                ds = xr.open_dataset(f, decode_times=False)  # skip time decode for speed
                subset = ds.where(
                    (ds.lat >= 43.5) & (ds.lat <= 49.5) &
                    (ds.lon >= 262.5) & (ds.lon <= 270.5), # longitude is in positive only
                    drop=True
                )

                # Convert to Pandas then Spark DataFrame
                pandas_df = subset.to_dataframe().reset_index()
                spark_df = spark.createDataFrame(pandas_df)

                # Save each day's subset as Parquet
                output_path = f"{output_dir}{date}.parquet"
                spark_df.write.mode("overwrite").parquet(output_path)

                print(f"Saved {output_path}")
        else:
            print(f"File not found: {path}")
    except Exception as e:
        print(f"Failed to process {date}: {e}")

    time.sleep(0.5)


 Processing 20240118...
Saved /dbfs/20240118.parquet
 Processing 20240119...
Saved /dbfs/20240119.parquet
 Processing 20240120...
Saved /dbfs/20240120.parquet
 Processing 20240121...
Saved /dbfs/20240121.parquet
 Processing 20240122...
Saved /dbfs/20240122.parquet
 Processing 20240123...
Saved /dbfs/20240123.parquet
 Processing 20240124...
Saved /dbfs/20240124.parquet
 Processing 20240125...
Saved /dbfs/20240125.parquet
 Processing 20240126...
Saved /dbfs/20240126.parquet
 Processing 20240127...
Saved /dbfs/20240127.parquet
 Processing 20240128...
Saved /dbfs/20240128.parquet
 Processing 20240129...
Saved /dbfs/20240129.parquet
 Processing 20240130...
Saved /dbfs/20240130.parquet
 Processing 20240131...
Saved /dbfs/20240131.parquet


Once all files are run successfully, you can move to the NOAA Data Transformation notebook.