# Goal
We have downloaded the data for the focus regions and now want to filter it.
- We only want to keep the data for the focus region bounding box and a buffer zone around it
- We keep data that has raised no quality flags
- We keep data for the "open_water" class (class 4) and the "water_near_land" class (class 3)

In [1]:
# Standard library imports
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import json
from pathlib import Path
import os
import glob
import zipfile
from functools import partial

# Third-party library imports
import geopandas as gpd
import pandas as pd
import contextily as ctx
import xarray as xr
import numpy as np
import matplotlib.pyplot as plt
import netCDF4
import earthaccess

# Matplotlib inline magic command
%matplotlib inline

# Load the configuration file which we created in the previous step
with open('config.json', 'r') as f:
    config = json.load(f)

In [2]:
# This dictionary explicitely outlines the data columns and their data types
# It is mainyl intended to be read and understood by the user.

data_columns = {
    "classification": {
        "type": int,
        "null_value": 255,
        "description": "Flags indicating water detection results.",
    },
    "classification_str": {
        "type": str,
        "null_value": "unknown",
        "value_map": {
            1: "land",
            2: "land_near_water ",
            3: "water_near_land ",
            4: "open_water ",
            5: "dark_water  ",
            6: "low_coh_water_near_land  ",
            7: "open_low_coh_water "
        },
        "description": "Flags indicating water detection results as string.",
    },
    "layover_impact": {
        "type": float,
        "null_value": 9.969209968386869e+36,
        "description": "Estimate of the height error caused by layover, which may not be reliable on a pixel by pixel basis, but may be useful to augment aggregated height uncertainties. ",
    },
    "height": {
        "type": float,
        "null_value": 9.96921e+36,
        "description": "Height of the pixel above the reference ellipsoid.",
    },
    "illumination_time": {
        "type": datetime,
        "null_value": 9.969209968386869e+36,
        "description": "Time of measurement in seconds in the UTC time scale since 1 Jan 2000 00:00:00 UTC. [tai_utc_difference] is the difference between TAI and UTC reference time (seconds) for the first measurement of the data set. If a leap second occurs within the data set, the attribute leap_second is set to the UTC time at which the leap second occurs. ",
    },
	"geolocation_qual": {
        "type": int,
        "null_value": 4294967295,
        "description": "Quality flag for the geolocation quantities in the pixel cloud data",
		"flag_masks": {
            "layover_significant": 1,
            "phase_noise_suspect": 2,
            "phase_unwrapping_suspect": 4,
            "model_dry_tropo_cor_suspect": 8,
            "model_wet_tropo_cor_suspect": 16,
            "iono_cor_gim_ka_suspect": 32,
            "xovercal_suspect": 64,
            "suspect_karin_telem": 1024,
            "medium_phase_suspect": 4096,
            "tvp_suspect": 8192,
            "sc_event_suspect": 16384,
            "small_karin_gap": 32768,
            "specular_ringing_degraded": 524288,
            "model_dry_tropo_cor_missing": 1048576,
            "model_wet_tropo_cor_missing": 2097152,
            "iono_cor_gim_ka_missing": 4194304,
            "xovercal_missing": 8388608,
            "geolocation_is_from_refloc": 16777216,
            "no_geolocation_bad": 33554432,
            "medium_phase_bad": 67108864,
            "tvp_bad": 134217728,
            "sc_event_bad": 268435456,
            "large_karin_gap": 536870912
        }
    }
}

---

In [4]:
# Define a thread-safe processing function with progress reporting
def process_file(filepath, file_index, total_files):
    try:
        # Calculate percentage complete for this file
        percent_complete = (file_index / total_files) * 100
        
        # Load data
        ds = xr.open_dataset(filepath, group='pixel_cloud')
        
        # Load the bounding box
        bbox_path = Path(config['data_dir']) / 'bbox.geojson'
        with open(bbox_path, 'r') as f:
            bbox_data = json.load(f)
        bbox_gdf = gpd.GeoDataFrame.from_features(bbox_data['features'])
        
        # Process data
        cols = [col for col in data_columns.keys() if col in ds.variables]
        df = ds[cols].to_dataframe()
        full_len = len(df)
        
        # Apply filters
        df = df[(df['latitude'] >= bbox_gdf.bounds.miny[0]) & 
                (df['latitude'] <= bbox_gdf.bounds.maxy[0]) &
                (df['longitude'] >= bbox_gdf.bounds.minx[0]) & 
                (df['longitude'] <= bbox_gdf.bounds.maxx[0])]
        
        spatial_filtered = len(df)
        
        df = df[(df['classification'] == 3) | (df['classification'] == 4)]
        class_filtered = len(df)
        
        df['classification_str'] = df['classification'].replace(data_columns['classification_str']['value_map'])
        
        print(f"[{percent_complete:.1f}%] Processed {filepath.name}: {class_filtered}/{spatial_filtered}/{full_len} records")
        return df
    except Exception as e:
        print(f"[{percent_complete:.1f}%] ERROR: {e} - Skipping file {filepath}")
        return pd.DataFrame()

# Main processing with threads instead of processes
pixel_cloud_dir = Path(config['pixel_cloud_dataset_folder'])
file_list = list(pixel_cloud_dir.glob('*.nc'))
total_files = len(file_list)
print(f"[INFO] Found {total_files} files to process - reported percentage only approximate due to threading.")

# Use ThreadPoolExecutor
results = []
with ThreadPoolExecutor(max_workers=min(os.cpu_count() * 2, 16)) as executor:
    # Submit tasks with index information
    futures = [executor.submit(process_file, filepath, i+1, total_files) 
               for i, filepath in enumerate(file_list)]
    
    for future in futures:
        try:
            results.append(future.result())
        except Exception as e:
            print(f"[ERROR] Unexpected error in worker: {e}")

# Combine results
final_df = pd.concat(results)
print(f"[INFO] Processing complete. Final dataset contains {len(final_df)} entries.")

[INFO] Found 252 files to process
[6.3%] Processed SWOT_L2_HR_PIXC_005_165_154R_20231018T133659_20231018T133710_PGC0_01.nc: 14920/963036/5893583 records
[5.2%] Processed SWOT_L2_HR_PIXC_005_012_156R_20231013T022551_20231013T022601_PGC0_01.nc: 1/1/1394952 records
[5.6%] Processed SWOT_L2_HR_PIXC_005_165_153R_20231018T133649_20231018T133700_PGC0_01.nc: 20557/659434/7099617 records
[2.8%] Processed SWOT_L2_HR_PIXC_002_012_155R_20230811T121029_20230811T121040_PGC0_01.nc: 761/84312/4186769 records
[3.6%] Processed SWOT_L2_HR_PIXC_004_318_155L_20231003T040325_20231003T040336_PGC0_01.nc: 162950/3385268/5452658 records
[1.2%] Processed SWOT_L2_HR_PIXC_001_165_154R_20230727T023640_20230727T023651_PGC0_01.nc: 13738/882775/4906060 records
[3.2%] Processed SWOT_L2_HR_PIXC_002_012_156R_20230811T121039_20230811T121050_PGC0_01.nc: 0/0/4781870 records
[4.4%] Processed SWOT_L2_HR_PIXC_005_012_154R_20231013T022531_20231013T022542_PGC0_01.nc: 0/0/333143 records
[0.8%] Processed SWOT_L2_HR_PIXC_001_165_15

In [None]:
# Save the final dataframe to a geojson file (this may take a while)
gdf = gpd.GeoDataFrame(final_df, geometry=gpd.points_from_xy(final_df.longitude, final_df.latitude))
gdf.crs = "EPSG:4326"
print(gdf.dtypes)

output_dir = Path(config['data_dir']) / 'output'
output_dir.mkdir(exist_ok=True)
output_file_name = output_dir / f'subset_class3and4_2020to2024_{datetime.now().strftime("%Y%m%d%H%M%S")}.geojson'

# sloppy fix for index64 pandas issue
gdf["row_id"] = gdf.index + 1
gdf.reset_index(drop=True, inplace=True)
gdf.set_index("row_id", inplace = True)

# save to geojson
gdf.to_file(output_file_name, driver='GeoJSON')
print(f"File saved to {output_file_name}")

KeyboardInterrupt: 

In [None]:
import geopandas as gpd
from pathlib import Path
import pandas as pd
from datetime import datetime
import pyarrow as pa
import pyarrow.parquet as pq
import dask.dataframe as dd
import json
from shapely.geometry import Point
import multiprocessing
import concurrent.futures

def optimize_geojson_export(final_df, config):
    """Blazingly fast GeoJSON export with optimizations"""
    # Create output directory
    output_dir = Path(config['data_dir']) / 'output'
    output_dir.mkdir(exist_ok=True)
    output_file_name = output_dir / f'subset_class3and4_2020to2024_{datetime.now().strftime("%Y%m%d%H%M%S")}.geojson'
    
    # Optimize: Convert only necessary columns
    coords_df = final_df[['longitude', 'latitude']].copy()
    
    # Optimize: Use parallel processing for geometry creation
    cpu_count = max(multiprocessing.cpu_count() - 1, 1)
    
    def chunks(lst, n):
        """Split dataframe into chunks for parallel processing"""
        for i in range(0, len(lst), n):
            yield lst.iloc[i:i + n]
    
    def create_geometries(chunk_df):
        """Create geometries for a chunk of data"""
        return [Point(x, y) for x, y in zip(chunk_df['longitude'], chunk_df['latitude'])]
    
    # Determine optimal chunk size based on dataframe size
    chunk_size = max(min(len(coords_df) // cpu_count, 100000), 1000)
    
    # Create geometries in parallel
    geometries = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=cpu_count) as executor:
        futures = [executor.submit(create_geometries, chunk) for chunk in chunks(coords_df, chunk_size)]
        for future in concurrent.futures.as_completed(futures):
            geometries.extend(future.result())
    
    # Create GeoDataFrame with optimized dtypes
    final_df_optimized = final_df.copy()
    
    # Optimize: Convert string columns to categorical
    for col in final_df_optimized.select_dtypes(include=['object']).columns:
        final_df_optimized[col] = final_df_optimized[col].astype('category')
    
    # Optimize: Use int16/float32 where possible
    for col in final_df_optimized.select_dtypes(include=['int64']).columns:
        if final_df_optimized[col].max() < 32767 and final_df_optimized[col].min() > -32768:
            final_df_optimized[col] = final_df_optimized[col].astype('int16')
    
    for col in final_df_optimized.select_dtypes(include=['float64']).columns:
        if col not in ['longitude', 'latitude']:  # Keep precision for coordinates
            final_df_optimized[col] = final_df_optimized[col].astype('float32')
    
    # Create GeoDataFrame
    gdf = gpd.GeoDataFrame(final_df_optimized, geometry=geometries, crs="EPSG:4326")
    
    # Optimize: Use simple integer index
    gdf.reset_index(drop=True, inplace=True)
    
    # Optimize: Write to file in chunks to reduce memory usage
    def write_geojson_in_chunks(gdf, output_file, chunk_size=10000):
        # Write header
        with open(output_file, 'w') as f:
            f.write('{"type":"FeatureCollection","features":[')
        
        # Write features in chunks
        for i in range(0, len(gdf), chunk_size):
            chunk = gdf.iloc[i:i+chunk_size]
            chunk_json = chunk.to_json(drop_id=True)
            chunk_features = json.loads(chunk_json)['features']
            
            # Convert to string and remove brackets
            features_str = json.dumps(chunk_features)[1:-1]
            
            # Add comma if not the first chunk
            if i > 0:
                features_str = ',' + features_str
            
            # Append to file
            with open(output_file, 'a') as f:
                f.write(features_str)
            
            # Clean up to release memory
            del chunk
            del chunk_json
            del chunk_features
        
        # Write footer
        with open(output_file, 'a') as f:
            f.write(']}')
    
    # Write to file using optimized method
    write_geojson_in_chunks(gdf, output_file_name)
    
    print(f"File saved to {output_file_name}")
    return output_file_name

# Usage
output_file = optimize_geojson_export(final_df, config)