In [None]:
import os
import pandas as pd
from arosics import COREG, COREG_LOCAL
from zipfile import ZipFile
import shutil
import glob

In [3]:
def get_scenes_dict(data_df:pd.DataFrame, product:str = "", is_s1:bool=True) -> dict:
    scenes_dict = {}
    id_list = data_df.ID.unique()
    for id in id_list:
        filtered_df = data_df[data_df.ID == id].reset_index(drop=True)
        if product != "":
            filtered_df = filtered_df[filtered_df.Path.apply(lambda x: product in x)].reset_index(drop=True)

        grouper = filtered_df.Path.apply(lambda r: os.path.split(r)[1].split("_")[5 if is_s1 else 2][0:6])
        secene_list = [list(filtered_df.groupby(grouper))[i][1].Path.iloc[0] for i in range(0, len(grouper.unique()))]
        scenes_dict[id] = secene_list
    return scenes_dict

In [4]:
DATA_DIR = "../data/files_list"

S1_PRODUCTS = ["SLC", "GRD"]
S2_PRODUCTS = ["L1C", "L2A"]

s1_au_df = pd.read_csv(os.path.join(DATA_DIR, "s1_au.csv"), names=["ID","Path"])
s1_an_df = pd.read_csv(os.path.join(DATA_DIR, "s1_an.csv"), names=["ID","Path"])

s2_au_df = pd.read_csv(os.path.join(DATA_DIR, "s2_au.csv"), names=["ID","Path"])
s2_an_df = pd.read_csv(os.path.join(DATA_DIR, "s2_an.csv"), names=["ID","Path"])

s1_au_slc_dict = get_scenes_dict(s1_au_df, "SLC")
s1_au_grd_dict = get_scenes_dict(s1_au_df, "GRD")

s1_an_slc_dict = get_scenes_dict(s1_an_df, "SLC")
s1_an_grd_dict = get_scenes_dict(s1_an_df, "GRD")

s2_au_l1c_dict = get_scenes_dict(s2_au_df, "L1C", False)
s2_au_l2a_dict = get_scenes_dict(s2_au_df, "L2A", False)

s2_an_l1c_dict = get_scenes_dict(s2_an_df, "L1C", False)
s2_an_l2a_dict = get_scenes_dict(s2_an_df, "L2A", False)


In [5]:
shutil.rmtree("../data/inputs/", ignore_errors=True)
shutil.rmtree("../data/outputs/", ignore_errors=True)
os.makedirs("../data/inputs/")
os.makedirs("../data/outputs/")

In [6]:
# Test two S2 L2A images for AU

id = list(s2_au_l2a_dict.keys())[0]
s2_au_l2a_secne_files = s2_au_l2a_dict[id]

# get the Jan 2023 and Dec 2024 files
s2_au_l2a_secne_pair = [s2_au_l2a_secne_files[0], s2_au_l2a_secne_files[-1]]

for i, zip_file_path in enumerate(s2_au_l2a_secne_pair):
    with ZipFile(zip_file_path) as f:
        f.extractall(f"../data/inputs/{id}/sub{i}")

In [7]:
ref_image_dir = os.path.join("../data/inputs/", id, f"sub{0}")
ref_tci_files = list(filter(lambda f: "TCI" in f, glob.glob(f"{ref_image_dir}/*/GRANULE/*/IMG_DATA/**", recursive=True)))
if len(ref_tci_files) > 1:
    ref_image = [f for f in ref_tci_files if f.endswith("_10m.jp2")][0]
else:
    ref_image = ref_tci_files[0]

tgt_image_dir = os.path.join("../data/inputs/", id, f"sub{1}")
tgt_tci_files = list(filter(lambda f: "TCI" in f, glob.glob(f"{tgt_image_dir}/*/GRANULE/*/IMG_DATA/**", recursive=True)))
if len(tgt_tci_files) > 1:
    tgt_image = [f for f in tgt_tci_files if f.endswith("_10m.jp2")][0]
else:
    tgt_image = tgt_tci_files[0]


In [None]:
%%time
coreg_global = COREG(
    im_ref=ref_image,
    im_tgt=tgt_image,
    path_out=os.path.join("../data/outputs/", id, f"{id}_global.tiff"),
    fmt_out="GTIFF",
    v=True,
    nodata=(0.0,0.0),
    # r_b4match=2,
    # s_b4match=2,
    align_grids=True,
    max_iter = 10,
    max_shift = 100,
)
res = coreg_global.correct_shifts()

In [None]:
%%time
coreg_local = COREG_LOCAL(
    im_ref=ref_image,
    im_tgt=tgt_image,
    grid_res=2000,
    # max_points=200,
    path_out=os.path.join("../data/outputs/", id, f"{id}_local.tiff"),
    fmt_out="GTIFF",
    v=True,
    nodata=(0.0,0.0),
    # r_b4match=2,
    # s_b4match=2,
    align_grids=True,
    max_iter = 10,
    max_shift = 10,
)
res = coreg_local.correct_shifts()