## Working with components

Line by line rerun of the aggregate components rule

In [1]:
import xarray
import rasterio
import pandas as pd
import geopandas as gpd
import numpy as np
import hydra
import logging  
import matplotlib.pyplot as plt
import pathlib


In [2]:
from hydra.core.hydra_config import HydraConfig
import sys
sys.path.append("..")
from utils.faster_zonal_stats import polygon_to_raster_cells

logger = logging.getLogger(__name__)

In [3]:
def available_shapefile_year(year, shapefile_years_list: list):
    """
    Given a list of shapefile years,
    return the latest year in the shapefile_years_list that is less than or equal to the given year
    """
    for shapefile_year in sorted(shapefile_years_list, reverse=True):
        if year >= shapefile_year:
            return shapefile_year
 
    return min(shapefile_years_list)  # Returns the last element if year is greater than the last element

In [4]:
from hydra import initialize, compose
from omegaconf import OmegaConf

# unfortunately, we have to use the initialize function to load the config file
# this is because the @hydra decorator does not work with Notebooks very well
# this is a known issue with Hydra: https://gist.github.com/bdsaglam/586704a98336a0cf0a65a6e7c247d248
# 
# just use the relative path from the notebook to the config dir
with initialize(version_base=None, config_path="../conf"):
    cfg = compose(config_name='config.yaml')

hydra_cfg = cfg

An interesting question I've been asking myself:

When you have a hydra config and snakemake running a "composed" workflow, how do you
debug a single configuration?

In [5]:
print(OmegaConf.to_yaml(hydra_cfg))

temporal_freq: monthly
year: 2022
polygon_name: zcta
shapefile_year: 2020
show_progress: false
plot_output: false
component:
- no3
- pm25
- so4
- ss
- nh4
- dust
- bc
- om
datapaths:
  input:
    satellite_components:
      yearly: /n/netscratch/dominici_lab/Lab/pm25__washu__raw/yearly/
      monthly: /n/netscratch/dominici_lab/Lab/pm25__washu__raw/monthly/
    shapefiles: /n/dominici_lab/lab/data_processing/jonathan_pm25_washu_raster2polygon/pm25_washu_raster2polygon/data/input/shapefiles
  output:
    satelite_components:
      zcta_yearly: /n/dominici_lab/lab/lego/environmental/pm25__washu/zcta_yearly
      zcta_monthly: /n/dominici_lab/lab/lego/environmental/pm25__washu/zcta_monthly
      county_yearly: /n/dominici_lab/lab/lego/environmental/pm25__washu/county_yearly
      county_monthly: /n/dominici_lab/lab/lego/environmental/pm25__washu/county_monthly
shapefiles:
  census_tract:
    2020:
      url: https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_tract_500k.zip
      id

The dry run command is useful for this; maybe the best way is to parse the output of a dry run command and extract the
configuration for a single component.


In [6]:
import subprocess
import re
import random
from hydra import compose, initialize


def get_snakemake_jobs(snakefile="Snakefile", configfile="conf/snakemake.yaml"):
    # use the dry run command to get the jobs
    result = subprocess.run(
        ["snakemake", "--dry-run", "--printshellcmds", f"--snakefile={snakefile}", f"--configfile={configfile}"],
        capture_output=True,
        text=True,
    )

    if result.returncode != 0:
        raise RuntimeError("Snakemake dry run failed:\n" + result.stderr)

    job_blocks = re.findall(r"(localrule .*?resources:.*?)(?:\n\n|\Z)", result.stdout, flags=re.DOTALL)
    jobs = []
    for block in job_blocks:
        rule_match = re.search(r"localrule (\w+):", block)
        wildcards = dict(re.findall(r"(\w+)=([^\s,\n]+)", block))

        if rule_match:
            jobs.append({
                "rule": rule_match.group(1),
                "wildcards": wildcards,
                "raw": block
            })

    return jobs

In [7]:
#jobs = get_snakemake_jobs("../Snakefile", "../conf/snakemake.yaml")

Interesting. I'll think about this more later

Diving into the main function

In [8]:
shapefile_years_list = list(cfg.shapefiles[cfg.polygon_name].keys())
#use previously available shapefile
shapefile_year = available_shapefile_year(cfg.year, shapefile_years_list)

In [9]:
shapefile_year

2020

In [10]:
shape_path = f'../data/input/shapefiles/shapefile_{cfg.polygon_name}_{shapefile_year}/shapefile.shp'
polygon = gpd.read_file(shape_path)
polygon_ids = polygon[cfg.shapefiles[cfg.polygon_name][shapefile_year].idvar].values


In [11]:
polygon_ids

array(['15301', '15658', '15601', ..., '40475', '99738', '74820'],
      dtype=object)

In [12]:
cfg.temporal_freq

'monthly'

In [13]:
cfg.satellite_component

{'yearly': None, 'monthly': {'url': {'no3': 'https://wustl.app.box.com/s/tfyt4uyuzbt4hbnw7bhos16aep9b5u7g/folder/257359597790', 'pm25': 'https://wustl.app.box.com/s/tfyt4uyuzbt4hbnw7bhos16aep9b5u7g/folder/251944780084', 'so4': 'https://wustl.app.box.com/s/tfyt4uyuzbt4hbnw7bhos16aep9b5u7g/folder/257342958891', 'ss': 'https://wustl.app.box.com/s/tfyt4uyuzbt4hbnw7bhos16aep9b5u7g/folder/257399814162', 'nh4': 'https://wustl.app.box.com/s/tfyt4uyuzbt4hbnw7bhos16aep9b5u7g/folder/257348671048', 'dust': 'https://wustl.app.box.com/s/tfyt4uyuzbt4hbnw7bhos16aep9b5u7g/folder/257380016906', 'bc': 'https://wustl.app.box.com/s/tfyt4uyuzbt4hbnw7bhos16aep9b5u7g/folder/257377372368', 'om': 'https://wustl.app.box.com/s/tfyt4uyuzbt4hbnw7bhos16aep9b5u7g/folder/257368204252'}, 'zipname': 'Monthly', 'file_prefix': 'V5GL04.HybridPM25c_0p10.NorthAmerica'}, 'latitude_layer': 'lat', 'longitude_layer': 'lon', 'component': {'no3': {'file_prefix': 'V5NA04.02.HybridNO3-NO3.NorthAmerica', 'layer': 'GWRNO3'}, 'pm25': {

In [14]:
cfg.component

['no3', 'pm25', 'so4', 'ss', 'nh4', 'dust', 'bc', 'om']

In [39]:
component_index = 7

In [40]:
cfg.satellite_component.component[cfg.component[component_index]]

{'file_prefix': 'V5NA04.02.HybridOM-OM.NorthAmerica', 'layer': 'GWROM'}

The filenames are not going to be consistent enough across components to create fillable fstrings, so I think it is safe to just list the files in the downloaded directory directly.

In [41]:
filenames = pathlib.Path(f"../data/input/satellite_components/{cfg.temporal_freq}/{cfg.component[component_index]}/").glob("*.nc")
filenames = [str(f) for f in filenames if f.is_file()]

In [42]:
filenames.sort()
filenames

['../data/input/satellite_components/monthly/om/V5NA04.02.HybridOM-OM.NorthAmerica.2000-2000-APR.nc',
 '../data/input/satellite_components/monthly/om/V5NA04.02.HybridOM-OM.NorthAmerica.2000-2000-AUG.nc',
 '../data/input/satellite_components/monthly/om/V5NA04.02.HybridOM-OM.NorthAmerica.2000-2000-DEC.nc',
 '../data/input/satellite_components/monthly/om/V5NA04.02.HybridOM-OM.NorthAmerica.2000-2000-FEB.nc',
 '../data/input/satellite_components/monthly/om/V5NA04.02.HybridOM-OM.NorthAmerica.2000-2000-JAN.nc',
 '../data/input/satellite_components/monthly/om/V5NA04.02.HybridOM-OM.NorthAmerica.2000-2000-JUL.nc',
 '../data/input/satellite_components/monthly/om/V5NA04.02.HybridOM-OM.NorthAmerica.2000-2000-JUN.nc',
 '../data/input/satellite_components/monthly/om/V5NA04.02.HybridOM-OM.NorthAmerica.2000-2000-MAR.nc',
 '../data/input/satellite_components/monthly/om/V5NA04.02.HybridOM-OM.NorthAmerica.2000-2000-MAY.nc',
 '../data/input/satellite_components/monthly/om/V5NA04.02.HybridOM-OM.NorthAmerica

This will hopefully print all of the NC files to be aggregated in the downloaded directory.

In [43]:
ds = xarray.open_dataset(filenames[0])

In [44]:
ds

The layer in no3 is called "GWRNO3"

In [86]:
layer = getattr(ds, cfg.satellite_component.component[cfg.component].layer)

In [87]:
layer

In [88]:
dims = layer.dims
assert len(dims) == 2, "netcdf coordinates must be 2d"
lon = layer[cfg.satellite_component.longitude_layer].values
lat = layer[cfg.satellite_component.latitude_layer].values
transform = rasterio.transform.from_origin(
    lon[0], lat[-1], lon[1] - lon[0], lat[1] - lat[0]
)

In [89]:
# compute mapping
poly2cells = polygon_to_raster_cells(
    polygon,
    layer.values[::-1],
    affine=transform,
    all_touched=True,
    nodata=np.nan,
    verbose=cfg.show_progress,
)

In [90]:
poly2cells

[(array([2771, 2771, 2772, 2772, 2772, 2772, 2772, 2772, 2772, 2773, 2773,
         2773, 2773, 2773, 2773, 2773, 2773, 2773, 2773, 2773, 2773, 2774,
         2774, 2774, 2774, 2774, 2774, 2774, 2774, 2774, 2774, 2774, 2774,
         2774, 2774, 2774, 2775, 2775, 2775, 2775, 2775, 2775, 2775, 2775,
         2775, 2775, 2775, 2775, 2775, 2775, 2775, 2776, 2776, 2776, 2776,
         2776, 2776, 2776, 2776, 2776, 2776, 2776, 2776, 2776, 2776, 2776,
         2776, 2776, 2776, 2776, 2776, 2777, 2777, 2777, 2777, 2777, 2777,
         2777, 2777, 2777, 2777, 2777, 2777, 2777, 2777, 2777, 2777, 2777,
         2777, 2777, 2777, 2777, 2777, 2778, 2778, 2778, 2778, 2778, 2778,
         2778, 2778, 2778, 2778, 2778, 2778, 2778, 2778, 2778, 2778, 2778,
         2778, 2778, 2778, 2778, 2778, 2779, 2779, 2779, 2779, 2779, 2779,
         2779, 2779, 2779, 2779, 2779, 2779, 2779, 2779, 2779, 2779, 2779,
         2779, 2779, 2779, 2779, 2779, 2780, 2780, 2780, 2780, 2780, 2780,
         2780, 2780, 2780

So this seems to generally work. Now the aggregation can be tested:

In [92]:
i=1
filename=filenames[i]
ds = xarray.open_dataset(filename)
layer = getattr(ds, cfg.satellite_component.component[cfg.component].layer)


In [93]:
stats = []
for indices in poly2cells:
    if len(indices[0]) == 0:
        # no cells found for this polygon
        stats.append(np.nan)
        continue
    cells = layer.values[::-1][indices]
    stats.append(np.nanmean(cells))

In [94]:
df = pd.DataFrame(
    {cfg.component: stats, "year": cfg.year},
    index=pd.Index(polygon_ids, name=cfg.polygon_name)
)

In [95]:
df

Unnamed: 0_level_0,no3,year
zcta,Unnamed: 1_level_1,Unnamed: 2_level_1
15301,0.615648,2022
15658,0.631761,2022
15601,0.847231,2022
17720,1.094118,2022
18843,0.409091,2022
...,...,...
79837,0.235978,2022
88338,0.168182,2022
40475,0.673516,2022
99738,0.000000,2022


In [98]:
# == save output file
if cfg.temporal_freq == "yearly":
    # ignore month since len(filenames) == 1
    output_filename = f"satellite_{cfg.component}_{cfg.polygon_name}_{cfg.temporal_freq}__{cfg.year}.parquet"

elif cfg.temporal_freq == "monthly":
    # use month in filename since len(filenames) = 12
    month = f"{i + 1:02d}"
    df["month"] = month
    output_filename = f"satellite_{cfg.component}_{cfg.polygon_name}_{cfg.temporal_freq}__{cfg.year}_{month}.parquet"

output_path = f"..data/output/satellite_components/{cfg.component}_{cfg.polygon_name}_{cfg.temporal_freq}/{output_filename}"
df.to_parquet(output_path)

# plot aggregation map using geopandas
if cfg.plot_output:
    LOGGER.info("Plotting result...")
    gdf = gpd.GeoDataFrame(df, geometry=polygon.geometry.values, crs=polygon.crs)
    png_path = f"{logging_dir}/{output_filename}.png"
    gdf.plot(column=cfg.component, legend=True)
    plt.savefig(png_path)


OSError: Cannot save file into a non-existent directory: '..data/output/satellite_components/no3_zcta_monthly'

What we need to do is make sure it can propagate to the various components.