if __name__ == "__main__": import platform, sys, os import xarray as xr import numpy as np import pandas as pd import warnings warnings.filterwarnings('ignore') # plotting modules from IPython.display import Image from pygmtsar import S1, Stack, tqdm_dask, NCubeVTK, ASF, XYZTiles import os import rasterio import rioxarray import zipfile import glob import shutil import argparse import geopandas as gpd from dotenv import dotenv_values import matplotlib.pyplot as plt import matplotlib import seaborn as sns import json import geopandas as gpd import time import re # default chunksize (512) is enough suitable for a single subswath processing using resolution 15m # select higher chunk size (1024) to process multiple subswaths and scenes using resolution 15m from pygmtsar import datagrid datagrid.chunksize = 1024 parser = argparse.ArgumentParser( description="Process InSAR data using PyGMTSAR library" ) parser.add_argument( "-d", "--dir", help="Path of the directory containing extracted Sentinel-1 SLC imagery (for all data)", required=True, dest="dir", metavar="DIRECTORY", type=str ) parser.add_argument( "-c", "--csv", help="CSV containing new SLC images not already contained within main data directory", required=True, dest="csv", metavar="CSV", type=str ) parser.add_argument( "-a", "--shapefile", help="Shapefile used to clip tiff files (using a predefined buffer).", required=True, dest="shapefile", metavar="SHAPEFILE" ) parser.add_argument( "-s", "--subswath", help="Interger containing one (or several) of the three subswaths (1, 2, 3)", required=True, dest="subswath", metavar="subswath", type=int ) parser.add_argument( "-n", "--name", help="Name of the project (will be in folder names).", required=True, dest="name", metavar="name", type=str ) args = parser.parse_args() ORBIT = 'D' SUBSWATH = args.subswath WORKDIR = args.name SPILLDIR = "dask_spill" CSV = args.csv SHAPEFILE = args.shapefile TEMPDIR = "temp_data" DATADIR = args.dir #DEMFILE = 'dem/brum_dem.nc' DEMFILE = None BASEDAYS = 300 BASEMETERS = 150 RESOLUTION = 60 POLARIZATION = 'VV' #RESOLUTION = 30 new = False CORRLIMIT = 0.15 displacement_dir = WORKDIR+"_displacement_" + str(CORRLIMIT) check_dir = os.path.exists(displacement_dir) if check_dir == False: os.mkdir(displacement_dir) check_dir = os.path.exists(SPILLDIR) if check_dir == False: os.mkdir(SPILLDIR) # check_dir = os.path.exists(DATADIR) # if check_dir == False: # os.mkdir(DATADIR) import dask, dask.distributed # increase timeouts to work using default chunk size (512) even on large areas dask.config.set({'distributed.comm.timeouts.tcp': '60s'}) #print (dask.config.get("distributed.comm.timeouts.tcp")) dask.config.set({'distributed.comm.timeouts.connect': '60s'}) #print (dask.config.get("distributed.comm.timeouts.connect")) from dask.distributed import Client, LocalCluster cluster = LocalCluster( memory_limit='28GB', n_workers=1, local_directory=f'{SPILLDIR}', # Specify the directory for spilling config={ 'distributed.worker.memory.target': 0.6, # Begin spilling at 60% memory load 'distributed.worker.memory.spill': 0.7, # Spill more aggressively at 70% 'distributed.worker.memory.pause': 0.8, # Pause worker at 80% load 'distributed.worker.memory.terminate': 0.95, # Restart worker at 95% load } ) client = Client(cluster) cluster = LocalCluster(n_workers=1) client = Client(cluster) #MASTER = '2018-02-15' #sbas = SBAS(DATADIR, DEMFILE, basedir=WORKDIR).set_master(MASTER) AOI = gpd.read_file(SHAPEFILE) print(type(AOI)) geometry = AOI.geometry.buffer(0.1).unary_union AOI = next(AOI.iterfeatures()) AOI['geometry'] = geometry #print(AOI) #print(AOI['geometry']) #AOI = gpd.GeoDataFrame.from_features(AOI) AOI = gpd.GeoDataFrame(AOI) AOI.to_file("test.json", driver="GeoJSON") print(type(AOI)) #AOI['geometry'] = geometry #print(AOI) # establish key variables tested in if/pass interrogators pairs = None dates = None sbas = None intf_sbas = None corr_sbas = None unwraps = None phasefilts = None disps = None unwraps_detrend = None time_start = time.time() #print(f'TIME STARTED: {time_start}') # SCENES will be list from ASF capable of being investigated to determine whether of not all relevant scenes have been downloaded. df = pd.read_csv(CSV) data_list = glob.glob(f'{DATADIR}/*.tiff') for index, data in enumerate(data_list): result = re.split("[-|t]", data) data_list[index] = result[5] SCENES = df['Scene Names'].to_list() #crop for testing SCENES = SCENES[100:106] # filter dates and times of scenes to compare against scene_dates = [] for scene in SCENES: result = re.split("[_|T]", scene) #print(result) comp_date = result[5] #rint(comp_date) if comp_date in data_list: pass else: print(f"Date {comp_date} not downloaded, preparing to download.") scene_dates.append(scene) # check CSV list with data list if len(scene_dates) != 0: ENV_VALS = dotenv_values(".env") asf_user = ENV_VALS.get('asf-user') asf_password = ENV_VALS.get('asf-password') asf = ASF(asf_user, asf_password) asf.download(TEMPDIR, scene_dates, 123, POLARIZATION) scenes = S1.scan_slc(TEMPDIR) print("create stack...") sbas = Stack(WORKDIR, drop_if_exists=True).set_scenes(scenes) sbas.to_dataframe() sbas.backup(DATADIR) shutil.rmtree(TEMPDIR) else: print("NO NEW DATA PRODUCTS TO DOWNLOAD, REPROCESSING...") scenes = S1.scan_slc(DATADIR, subswath=123) sbas = Stack(WORKDIR, drop_if_exists=True).set_scenes(scenes) sbas.download_dem() print("ALIGN IMAGES...") sbas.compute_align(n_jobs=6) print("CREATE BASELINE PAIRS...") baseline_pairs = sbas.baseline_pairs(days=BASEDAYS, meters=BASEMETERS) baseline_pairs = sbas.sbas_pairs_limit(baseline_pairs, limit=2, iterations=2) print(baseline_pairs) print("COMPUTE GEOCODE...") sbas.compute_geocode() topo = sbas.get_topo() print("CALCULATE BASELINE PAIRS...") pairs, dates = sbas.get_pairs(baseline_pairs, dates=True) # load Sentinel-1 data print("GENERATE INTERFEROGRAMS...") # Gaussian filtering N meter cut-off wavelength with multilooking on Sentinel-1 intensities # load Sentinel-1 data[ decimator = sbas.decimator(resolution=RESOLUTION) print("open data...") data = sbas.open_data(dates) print("calculate intensity...") intensity = sbas.multilooking(np.square(np.abs(data)), wavelength=400, coarsen=(1,4)) # calculate phase difference with topography correction print("calculate phase diff...") phase = sbas.phasediff(pairs, data, topo) # Gaussian filtering N meter cut-off wavelength with multilooking #phase = sbas.multilooking(phase, weight=psfunction, wavelength=30) print("filter phase...") phase = sbas.multilooking(phase, wavelength=400, coarsen=(1,4)) print("calculate correlation...") corr = sbas.correlation(phase, intensity) #phase_goldstein = sbas.goldstein(phase, corr, 32) # convert complex phase difference to interferogram print("run interferogram command...") intf = sbas.interferogram(sbas.goldstein(phase, corr, 32)) print("EXPORT DS SBAS INTERFEROGRAMS...") # compute together because correlation depends on phase, and filtered phase depends on correlation. tqdm_dask(result := dask.persist(decimator(corr), decimator(intf)), desc='Compute Phase and Correlation') # unpack results corr_sbas, intf_sbas = result print("UNWRAP INTERFEROGRAMS...") tqdm_dask(unwrap_sbas := sbas.unwrap_snaphu(intf_sbas, corr_sbas.where(corr_sbas>=CORRLIMIT)).persist(), desc='SNAPHU Unwrapping') print("DETREND INTERFEROGRAMS...") trend_sbas = sbas.gaussian(unwrap_sbas.phase, wavelength=10000) trend_sbas = sbas.sync_cube(trend_sbas, 'trend_sbas') print("QUALITY CHECK SBAS UNWRAPPED INTERFERROGRAMS...") hist_sbas = (unwrap_sbas.phase - trend_sbas).std(['y','x']) hist_sbas = sbas.sync_cube(hist_sbas, 'hist_sbas') pairs = sbas.get_pairs(baseline_pairs) pairs['outliers'] = hist_sbas.values pairs_best = sbas.sbas_pairs_covering(pairs, 'outliers', 3) trend_sbas = trend_sbas.sel(pair=pairs_best.pair.values) unwrap_sbas = unwrap_sbas.sel(pair=pairs_best.pair.values) intf_sbas = intf_sbas.sel(pair=pairs_best.pair.values) corr_sbas = corr_sbas.sel(pair=pairs_best.pair.values) print("CALCULATE DISPLACEMENT...") # calculate phase displacement in radians and convert to LOS displacement in millimeter disp_sbas = sbas.los_displacement_mm(sbas.lstsq(unwrap_sbas.phase - trend_sbas, corr_sbas)) # optionally, materialize to disk and open disp_sbas = sbas.sync_cube(disp_sbas, 'disp_sbas') # geocode to lat/ lon disp_sbas_ll = sbas.cropna(sbas.ra2ll(disp_sbas)) disp_sbas_ll = sbas.sync_cube(disp_sbas_ll, 'disp_sbas_ll') print("CALCULATE STL TREND...") stl_sbas = sbas.stl(disp_sbas_ll) stl_sbas = sbas.sync_cube(stl_sbas, 'stl_sbas') print("EXPORT DISPLACEMENT AND STL...") for date in disp_sbas_ll.date: # Adjust to access dates in your dataset slice_for_date = disp_sbas_ll.sel(date=date) slice_for_date.to_netcdf(f'{displacement_dir}/{date.values}.nc') for date in stl_sbas.date: slice_for_date = stl_sbas.sel(date=date) slice_for_date.to_netcdf(f'{displacement_dir}/{date.values}_stl.nc')