In [1]:
from argparse import ArgumentParser
import os
import sys
import time

import dask.dataframe as dd
import dask_geopandas
import geopandas as gpd
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import rasterio
import shapely
import xarray as xr

# from matplotlib import colors
from pandarallel import pandarallel
from scipy import stats

sys.path.insert(1, '../scripts/')
from reaches import *
from utils import *

### Parse arguments

In [2]:
# FOR NOW, SET
width_set = 'mean'

# Control flow
if width_set == 'mean':
    width = 'WidthM'
    binn = 'Bin'
elif width_set == 'min':
    width = 'WidthM_Min'
    binn = 'Bin_Min'
elif width_set == 'max':
    width = 'WidthM_Max'
    binn = 'Bin_Max'
else:
    print('Invalid width option specified, exiting.')
    # sys.exit()

In [3]:
huc2 = '15' # '01' ### SET THIS
data_path = '/nas/cee-water/cjgleason/data/SWOT/PIXC_v2_0_HUC2_' + huc2
# save_dir =

### Pixel Cloud

In [4]:
# Get job index
# slurm = int(os.environ['SLURM_ARRAY_TASK_ID'])
index = 2517 # 106

In [5]:
mdata_path = '/nas/cee-water/cjgleason/fiona/narrow_rivers_PIXC/data/' ## HERE
file_path = os.path.join(mdata_path, 'PIXC_v2_0_HUC2_' + huc2 + '_filtered.json') ## HERE
data = open_json(file_path)

In [6]:
file_name = data[index]

In [20]:
# Get data for this tile
granule_name = file_name[:-3]
tile_name = file_name[20:28]
pass_num = int(file_name[20:23])

print(granule_name)

SWOT_L2_HR_PIXC_009_511_219L_20240122T092851_20240122T092902_PGC0_01


#### Read in PIXC

In [7]:
# Set PIXC filepath
pixc_path = os.path.join(data_path, file_name)

In [8]:
# Read in pixel group
ds_PIXC = xr.open_dataset(filename_or_obj=pixc_path,
                          group='pixel_cloud', engine='h5netcdf')

In [9]:
# Set desired data vars

## HERE CLEAN UP

variables = ['cross_track', 'water_frac',
             'pixel_area', 'height', 'geoid', 'solid_earth_tide', ## HERE
             'load_tide_fes', 'pole_tide', 'prior_water_prob', ## HERE
             'phase_noise_std', 'dheight_dphase', ## HERE
             'classification_qual', 'geolocation_qual', ## HERE
             'classification']

In [10]:
def bitwiseMask(ds):
    '''
    This function masks a PIXC granules: for now, it ony remove pixels
    with land classification and those with bad geolocation_qual.
    # See page 65 of PIXC PDD: https://podaac.jpl.nasa.gov/SWOT?tab=datasets-information&sections=about%2Bdata
    '''
    # Fow now, eliminate the bad stuff
    mask = np.where((ds.classification > 1) & 
                    (ds.interferogram_qual < 2**16) & (ds.classification_qual < 2**16) & ## HERE
                    (ds.geolocation_qual < 2**16) & (ds.sig0_qual < 2**16) &  ## HERE
                    (np.abs(ds.cross_track) > 10000) & (np.abs(ds.cross_track) < 60000))[0]
    
    print(mask.shape)
    return mask

In [11]:
# Make mask
mask = bitwiseMask(ds_PIXC)

if mask.shape[0] == 0:
    print('This granule has no pixels after masking, exiting.')
    # sys.exit(1)    

(597484,)


In [12]:
# Convert PIXC to GeoDataFrame
gdf_PIXC = makeGDF(ds=ds_PIXC, mask=mask, data_vars=variables)

In [13]:
del ds_PIXC

In [14]:
## Flag as-in RiverSP ## HERE
gdf_PIXC['geo_qual_wse_suspect'] = 0
gdf_PIXC['geo_qual_wse_suspect'] = np.where((gdf_PIXC['geolocation_qual'] >=2**7),
                                            1, gdf_PIXC['geo_qual_wse_suspect'])

gdf_PIXC['class_qual_area_suspect'] = 0
gdf_PIXC['class_qual_area_suspect'] = np.where((gdf_PIXC['classification_qual'] >=2**7),
                                            1, gdf_PIXC['geo_qual_wse_suspect'])

gdf_PIXC = gdf_PIXC.drop(columns=['geolocation_qual', 'classification_qual'])

In [16]:
## Make indicator for wse aggregation
# (don't use lane_near_water or dark_water)
wse_klass = [3.0, 4.0, 6.0, 7.0]

gdf_PIXC['agg_wse'] = 0
gdf_PIXC.loc[gdf_PIXC['klass'].isin(wse_klass), 'agg_wse'] = 1

In [33]:
# Calculate wse
gdf_PIXC['wse'] = gdf_PIXC.height - gdf_PIXC.geoid - gdf_PIXC.solid_earth_tide - gdf_PIXC.load_tide_fes - gdf_PIXC.pole_tide
gdf_PIXC = gdf_PIXC.drop(columns=['height', 'geoid', 'solid_earth_tide', 'load_tide_fes', 'pole_tide'])

### Find correct HUC4s

In [18]:
### NHDPlus HR
## Find correct HUC4s
# Read in tile and HUC4 intersection data
dtype_dic= {'tile': str, 'huc4': str, 'coverage': float}
tile_huc4 = pd.read_csv(os.path.join(mdata_path,
                                    'huc4_swot_science_tiles.csv'),
                        dtype=dtype_dic)

In [21]:
# Make list of HUC4s that intersect the tile
huc4s = list(tile_huc4[(tile_huc4['tile'] == tile_name)]['huc4'])
# Limit to the current HUC2
huc4s = [x for x in huc4s if x.startswith(huc2)]

In [23]:
huc4s

['1501']

### Read in buffered flowlines (with extra 32 m on each side to capture full pseudo-pixels)

In [24]:
data_path = '/nas/cee-water/cjgleason/fiona/narrow_rivers_PIXC_data/NHD_prepped_buffered/HUC2_' + huc2 + '/'
# file_paths = getFilepaths(data_path, huc2, huc4s, width_set)

In [25]:
file_paths = []

for huc in huc4s:
    file_path = data_path + 'NHDPLUS_H_' + huc + '_HU4_GDB_prepped_buffered_' + width_set + '.parquet'
    file_paths.append(file_path)

In [26]:
reach_mask = dask_geopandas.read_parquet(path=file_paths, columns=['NHDPlusID',
                                                                   #'Bin',
                                                                   #'Slope',
                                                                   'buffers'])

In [27]:
reach_mask = reach_mask.compute()

In [28]:
# Clip masked pixels to buffered reaches with extra width
gdf_PIXC = gpd.sjoin(gdf_PIXC, reach_mask, how='inner', predicate='within').reset_index().drop(columns=['index', 'index_right'])

In [32]:
if gdf_PIXC.shape[0] == 0:
    print('This granule has no pixels that intersect reaches, exiting.')
    # sys.exit() 

In [34]:
del reach_mask

### Nadir track (a quick detour so we can delete the flowlines ASAP)

In [36]:
# Get single pixel for selecting correct nadir segment
pixel_pt = gdf_PIXC.iloc[0].geometry

In [37]:
# Find correct nadir segment and return its geometry
nadir_segment_ln = findNadir(pass_num=pass_num, pixel_pt=pixel_pt)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  super().__setitem__(key, value)


### Find alignment

In [38]:
az_nadir = calcAzimuth(line=nadir_segment_ln)

In [40]:
del pixel_pt, nadir_segment_ln

#### Read in flowlines

In [41]:
data_path = '/nas/cee-water/cjgleason/fiona/narrow_rivers_PIXC_data/NHD_prepped/HUC2_' + huc2 + '/'

In [42]:
file_paths = []

for huc in huc4s:
    file_path = data_path + 'NHDPLUS_H_' + huc + '_HU4_GDB_prepped.parquet'
    file_paths.append(file_path)

In [43]:
fields = ['NHDPlusID', 'GNIS_Name', 'LengthKM',
          # 'WidthM', 'WidthM_Min',
          # 'WidthM_Max', 'Bin', 'Bin_Min', 'Bin_Max', 'StreamOrde',
          # 'Slope', 
          'geometry']

In [44]:
flowlines = dask_geopandas.read_parquet(path=file_paths, columns=fields)

In [None]:
flowlines = flowlines.compute()

In [48]:
flowlines.loc[:,'geometry'] = flowlines.geometry.explode().force_2d()

### Calculate azimuth and sinuosity (should move sinuosity to static)

In [50]:
pandarallel.initialize(nb_workers=int(os.environ.get('SLURM_CPUS_PER_TASK')))

INFO: Pandarallel will run on 4 workers.
INFO: Pandarallel will use Memory file system to transfer data between the main process and workers.


In [51]:
flowlines['temp'] = flowlines.parallel_apply(user_defined_function=calcAzSin, axis=1)

In [52]:
flowlines[['alignment', 'sinuosity']] = pd.DataFrame(flowlines['temp'].tolist(), index=flowlines.index)
flowlines = flowlines.drop(columns='temp')

In [53]:
# Rename geometry to avoid conflicts with heights
flowlines = flowlines.rename_geometry('flowlines')

### Calculate distance along reaches

In [54]:
gdf_PIXC = pd.merge(left=gdf_PIXC, right=flowlines[['NHDPlusID', 'flowlines']], on='NHDPlusID')

In [55]:
def project_point(line, point):
    # Project point onto line
    return line.project(point)

In [57]:
gdf_PIXC['distance'] = gdf_PIXC.apply(lambda x: project_point(x['flowlines'], x['geometry']), axis=1)

In [58]:
gdf_PIXC = gdf_PIXC.drop(columns='flowlines')

### For heights

In [62]:
# Multiply the correct columns for the weight denominator
gdf_PIXC['pre_denom'] = gdf_PIXC['phase_noise_std']**-2 * gdf_PIXC['dheight_dphase']**-2 * gdf_PIXC['agg_wse']

In [None]:
len(np.unique(gdf_PIXC['NHDPlusID']))

1673

In [64]:
# Sum the weight denominators by reach
denoms = gdf_PIXC.groupby(by='NHDPlusID')['pre_denom'].sum()
denoms = denoms.rename('denom')

In [65]:
gdf_PIXC = pd.merge(left=gdf_PIXC, right=denoms, how='left', on='NHDPlusID')
del denoms

In [116]:
# gdf_PIXC[gdf_PIXC['NHDPlusID'] == gdf_PIXC.iloc[[0]]['NHDPlusID'][0]]

In [69]:
gdf_PIXC['weight'] = (gdf_PIXC['phase_noise_std'] * gdf_PIXC['dheight_dphase'])**-2 * gdf_PIXC['agg_wse'] / gdf_PIXC['denom']

In [72]:
gdf_PIXC = gdf_PIXC.drop(columns=['pre_denom', 'denom'])

### Calculate slopes

In [74]:
ids = gdf_PIXC['NHDPlusID'].unique()

In [93]:
slope_swot = []

for i in ids:
    temp = gdf_PIXC[(gdf_PIXC['NHDPlusID'] == i) & (gdf_PIXC['agg_wse'] == 1)][['distance', 'wse']]
    
    if len(temp) < 2:
        slope_swot.append(np.nan)
    
    else:
        dist = temp['distance'].tolist()
        wse = temp['wse'].tolist()
        
        slope_swot.append(stats.linregress(x=dist, y=wse).slope)

In [94]:
slope_swot

[nan,
 np.float64(-0.01893633462839396),
 nan,
 np.float64(-0.004034433038188071),
 np.float64(-0.0048675571622269365),
 np.float64(-0.011553538561968264),
 np.float64(-0.0055969847238479635),
 np.float64(-0.005788412003496933),
 np.float64(-0.004845281505458221),
 nan,
 nan,
 nan,
 np.float64(-0.005474089563290428),
 nan,
 np.float64(-0.0057891369777006855),
 nan,
 nan,
 np.float64(1.158370933583705e-05),
 np.float64(-0.0026929603443676987),
 np.float64(-0.003990838519559602),
 np.float64(-0.003333073110300648),
 nan,
 np.float64(-0.014749099178841905),
 nan,
 np.float64(-0.03090312015974898),
 nan,
 np.float64(-0.009818862176964774),
 nan,
 nan,
 np.float64(-0.011529633125430256),
 np.float64(-0.011457127777005634),
 np.float64(-0.01361168678903416),
 nan,
 nan,
 nan,
 np.float64(-0.005116264517375676),
 np.float64(-0.01330040450372259),
 np.float64(-0.008148858252732675),
 np.float64(-0.0016132632862227592),
 nan,
 np.float64(0.004112384747793625),
 np.float64(-0.03702890761437717),

In [None]:
temp = pd.DataFrame({'NHDPlusID': ids, 'slope_swot': slope_swot})
temp['slope_swot'] = np.abs(temp['slope_swot'])

In [None]:
gdf_wse = pd.merge(left=gdf_wse, right=temp, how='left', on='NHDPlusID')

In [None]:
gdf_wse = gdf_wse.drop_duplicates(subset='NHDPlusID').reset_index()

In [None]:
def checkMag(df):
    # if df['slope_swot'] > df['Slope']:
    #     ratio = df['slope_swot'] / df['Slope']
    # else:
    #     ratio = df['Slope'] / df['slope_swot']
    if np.isnan(df['slope_swot']):
        return False
    
    else:
        order1 = math.floor(math.log10(df['slope_swot']))
        order2 = math.floor(math.log10(df['Slope']))
        
    # if (ratio > 0.1) and (ratio < 10):
    #     return True
    # else:
    #     return False

        if order1 == order2:
            return True
        else:
            return False

In [None]:
gdf_wse['slope_match'] = gdf_wse.apply(func = checkMag, axis=1)

In [None]:
slope_match = gdf_wse[gdf_wse['slope_match'] == True]['NHDPlusID'].tolist()

In [None]:
sj['good_wse'] = np.where(sj['NHDPlusID'].isin(slope_match), True, False)

In [None]:
pd.DataFrame(sj.sort_values(by=['NHDPlusID', 'counter'])[::10].Bin.value_counts()).reset_index()

In [None]:
reaches_min = pd.DataFrame(sj.groupby('NHDPlusID')['coverage'].min()).reset_index()

In [None]:
# Merge on bins
reaches_min = pd.merge(left=reaches_min, right=sj[['NHDPlusID', 'Bin']], how='left', on='NHDPlusID')
# Take every tenth row to get reach-level results
reaches_min = reaches_min.sort_values(by=['NHDPlusID'])[::10].reset_index()

In [None]:
pd.DataFrame(reaches_min[reaches_min['coverage'] > 0.1].Bin.value_counts()).reset_index()

In [None]:
pd.DataFrame(sj[sj['good_wse'] == True].sort_values(by=['NHDPlusID', 'counter'])[::10].Bin.value_counts()).reset_index()

In [None]:
sj[sj['good_wse'] == True].iloc[50:60]

### Find coverage

#### Make pseudo pixels

In [None]:
# Set along-track pixel resolution
azimuth_res = 22 # meters

In [None]:
# Make pseudo pixels
start = time.time()
gdf_PIXC_cov['pseudo_pixel'] = gdf_PIXC_cov.parallel_apply(user_defined_function=makePseudoPixels,
                                                           args=(nadir_segment_ln,
                                                                 azimuth_res),
                                                           axis=1)
end = time.time()
print(end - start)

In [None]:
gdf_PIXC_cov = gdf_PIXC_cov.rename(columns={'geometry': 'pixel_centroid'}).set_geometry('pseudo_pixel').set_crs(epsg=3857)

In [None]:
# Get bounds of PIXC tile
pseudo_bounds = gdf_PIXC_cov.total_bounds

In [None]:
# Copy geometry column as sjoin will discard it
gdf_PIXC_cov['pseudo_geom'] = gdf_PIXC_cov.geometry

#### Read in buffered segments

In [None]:
data_path = '/nas/cee-water/cjgleason/fiona/narrow_rivers_PIXC_data/NHD_prepped_segmented_buffered/HUC2_' + huc2 + '/'

In [None]:
file_paths = []

for huc in huc4s:
    file_path = data_path + 'NHDPLUS_H_' + huc + '_HU4_GDB_prepped_segmented_buffered_' + width_set + '.parquet'
    file_paths.append(file_path)

In [None]:
segments = dask_geopandas.read_parquet(path=file_paths)

In [None]:
segments = segments.compute()

In [None]:
segments = segments.clip(pseudo_bounds)

In [None]:
# Keep only reaches that are fully contained in PIXC granule
segments = segments.groupby('NHDPlusID').filter(lambda x: len(x) == 10)

In [None]:
segments = segments.sort_values(by=['NHDPlusID', 'counter']).reset_index()

In [None]:
segments = segments.drop(columns='index')

In [None]:
# Get number of reaches per bin
counts = pd.DataFrame(segments.sort_values(by=['NHDPlusID', 'counter'])[::10].Bin_Min.value_counts()).reset_index()

In [None]:
counts

In [None]:
# Calculate segment area
segments['segment_area'] = segments.geometry.area

#### Join and analyze coverage

In [None]:
# Merge the segments and pseudo-puxels by intersection
sj = gpd.sjoin(segments, gdf_PIXC_cov, how='left', predicate='intersects').reset_index()

In [None]:
del gdf_PIXC_cov

In [None]:
sj = sj.drop(columns=['index', 'index_old', 'index_right'])

In [None]:
# # NHDPlusID_left reflects the segment ids, keeping that one
# sj = sj.drop(columns=['NHDPlusID_right', 'points', 'azimuth_index',
#                       'range_index',
#                       # 'height', 'geoid',
#                       # 'klass',
#                       'latitude', 'longitude'])

In [None]:
sj = sj.set_geometry('pseudo_geom')

In [None]:
sj = sj.groupby('NHDPlusID', as_index=False).parallel_apply(user_defined_function=specialDissolve)

In [None]:
sj = sj.reset_index().drop(columns=['level_0', 'level_1', 'points',
                                    # 'azimuth_index', 'range_index',
                                    'cross_track', 'pixel_area',
                                    'prior_water_prob', 'klass', 'pixel_centroid'])

In [None]:
sj['pseudo_geom_clip'] = sj.parallel_apply(user_defined_function=specialClip,
                                           axis=1)

In [None]:
# Calculate the pseudo-pixel area within each node
sj['pseudo_area'] = sj.pseudo_geom_clip.area

In [None]:
sj['coverage'] = sj.pseudo_area/sj.segment_area

In [None]:
sj['coverage'] = sj['coverage'].fillna(0)

In [None]:
# Drop geometry columns
sj = sj.drop(columns=['pseudo_geom', 'buffers', 'pseudo_geom_clip', 'pseudo_area'])

### Do stats

In [None]:
bins = sj.Bin.unique()

#### Reaches

In [None]:
reaches_cent, reaches_thresh, reaches_min = summarizeCoverage(df=sj, binn=binn,
                                            bins=bins, counts=counts)

In [None]:
reaches_min

In [None]:
reaches_min.sort_values(by=['NHDPlusID'])[::10]

In [None]:
# d = {}
# # d_q = {}
# for i in range(1, 10):
#     threshold = i/10
#     # print(threshold)
    
#     detected = sj.groupby([binn, 'NHDPlusID'])['coverage'].apply(lambda x: (x > threshold).sum()) / 10
#     reach = detected.reset_index()
    
#     # reach = detected.groupby(binn).quantile(q=[x / 100.0 for x in range(0,100,1)]).reset_index()
        
#     d[threshold] = reach

In [None]:
# Add a column for each DataFrame indicating the key
# for threshold, data in d.items():
#     data['threshold'] = threshold
    
for threshold, data in d.items():
    data['threshold'] = threshold

In [None]:
# Concatenate all DataFrames into one
# reaches_desc = pd.concat(d.values())

reaches_cent = pd.concat(d.values()).rename(columns={'level_1': 'centile'})

In [None]:
reaches_cent

In [None]:
# reaches_cent = pd.merge(left=reaches_cent, right=counts, how='left', on=binn)

In [None]:
reaches_cent

In [None]:
reaches_min = pd.DataFrame(sj.groupby('NHDPlusID')['coverage'].min()).reset_index()

In [None]:
reaches_min = pd.merge(left=reaches_min, right=sj[['NHDPlusID', binn]], how='left', on='NHDPlusID')

In [None]:
min_cov

In [None]:
# reaches = pd.DataFrame(data=d).T

In [None]:
# reaches.columns = bins

### Write out

In [None]:
save_path = os.path.join('/nas/cee-water/cjgleason/fiona/narrow_rivers_PIXC_data/', 'PIXC_v2_0_HUC2_01')

In [None]:
# Combine node_desc
node_desc_both = pd.concat([node_desc, node_desc_w_zero], ignore_index=True)
node_desc_both

In [None]:
# Combine node_quant
node_quant_both = pd.concat([node_quant, node_quant_w_zero], ignore_index=True)
node_quant_both

In [None]:
# nodes_desc_both.to_csv(os.path.join(save_path, granule_name + '_nodes_describe.csv'))
# nodes_quant_both.to_csv(os.path.join(save_path, granule_name + '_nodes_quantile.csv'))

In [None]:
# reaches_desc.to_csv(os.path.join(save_path, granule_name + '_reaches_describe.csv'))
# reaches_quant.to_csv(os.path.join(save_path, granule_name + '_reaches_quantile.csv'))

In [None]:
test = pd.read_parquet('/nas/cee-water/cjgleason/fiona/narrow_rivers_PIXC_output/PIXC_v2_0_HUC2_01_2025_03_02_min/SWOT_L2_HR_PIXC_004_242_074L_20230930T103957_20230930T104008_PGC0_01_reaches_thresh.parquet')

In [None]:
test

### Look at heights

#### Read in buffered flowlines

In [127]:
# data_path = '/nas/cee-water/cjgleason/fiona/narrow_rivers_PIXC_data/NHD_prepped_buffered/HUC2_' + huc2 + '/'

In [128]:
file_paths = []

for huc in huc4s:
    file_path = data_path + 'NHDPLUS_H_' + huc + '_HU4_GDB_prepped_buffered_' + width_set + '.parquet'
    file_paths.append(file_path)

In [129]:
reach_extent = dask_geopandas.read_parquet(path=file_paths, columns=['NHDPlusID', 'Bin', 'Slope', 'buffers'])

In [None]:
reach_extent = reach_extent.compute()

In [None]:
reach_extent['area'] = reach_extent['buffers'].area