In [None]:
## Neue imports
import os
import s3fs
import fsspec
import random
import numpy as np
import pandas as pd
import xarray as xr
from scripts.sentinel_1_processing import find_global_veg_clipping_values, clip_s1_data, fast_lee_filter_optimized, apply_lee_to_ds, normalize_s1_vars, calculate_SAR_index #, aggregate_s1_causal_nearest 
from scripts.sentinel_2_processing import get_s2_quality_masks, get_vegetation_mask, clean_and_normalize_bands, calculate_s2_index, filter_static_vegetation_outliers, integrate_veg_and_wrongly_classified_mask
from scripts.plot_helpers_new import plot_acquisition_timelines, find_cloud_free_indices, plot_rgb, plot_statistical_outliers
from scripts.era_5_processing import subset_era5_spatial, subset_era5_time, aggregate_era5_metrics, create_uniform_era5_features, verify_era5_alignment 
from scripts.cube_processing  import add_event_metadata
from scripts.aggregation_5_day_interval import  get_aggregation_information, align_all_to_5d, plot_acquisition_and_bins

In [None]:
S3_BASE_URL = "https://s3.waw3-2.cloudferro.com/swift/v1/"
fs = s3fs.S3FileSystem(
    anon=True,
    client_kwargs={'endpoint_url': 'https://s3.waw3-2.cloudferro.com'}
)

#  Define Bucket_Name 
BUCKET_NAME = 'ARCEME-DC-1' # old bucket: "ARCEME-DATACUBES/FIRSTBATCH"

# List all files (recursively) in the ARCEME-DC-1 bucket
files = fs.ls(BUCKET_NAME, detail=True)
zarr_files = [f['Key'] for f in files if f['Key'].endswith('.zarr')]

## Get additional data

In [None]:
df_data = pd.read_csv("data/1_max_precipitation_grid_cells.csv", sep = ",")
# info_data = pd.read_csv("data/consolidated_s2_lc_analysis.csv", sep=";")

## Load random cubes and add metadata

In [None]:
cubes = {}

for i in range(1, 12): # 1 bis 4
    random.seed(300000000+i)
    nn = random.randrange(20, len(zarr_files))

    file_path = zarr_files[nn]
    url = S3_BASE_URL + zarr_files[nn]

    cube_id = file_path.split('/DC__')[-1].replace('.zarr', '')
    print(cube_id)
    
    # Mapper erstellen und Dataset öffnen
    mapper = fsspec.get_mapper(url)
    # Nutze 'chunks' oder 'consolidated=True' falls vorhanden für Speed
    ds = xr.open_zarr(mapper, consolidated=True) 

    # Add cube_id to attrs
    ds.attrs['cube_id'] = cube_id

    # Get metadata (precip_dates, etc.)
    ds = add_event_metadata(ds, df_data,cube_id)

    # Im Dictionary speichern
    cubes[f"ds_{i}"] = ds
    print(f"Cube {i} geladen: {zarr_files[nn]}")


## Examine Cubes
## ADAPT PLOTTING FUNCTIONS TO LOGIC!!

## Sentinel 2 processing

In [None]:
for key in cubes.keys():
    print(key)
    ds = cubes[key]
    ds = get_s2_quality_masks(ds)
    ds = get_vegetation_mask(ds)
    ds = clean_and_normalize_bands(ds)
    ds = calculate_s2_index(ds, "NDVI", "strict")
    ds = calculate_s2_index(ds, "NDVI", "basic")
    ds = filter_static_vegetation_outliers(ds, "NDVI_strict")
    ds = filter_static_vegetation_outliers(ds, "NDVI_basic")
    indices = find_cloud_free_indices(ds)
    if len(indices) > 0:
        plot_statistical_outliers(ds, "NDVI_basic", indices[0])
    ds =  integrate_veg_and_wrongly_classified_mask(ds, "NDVI_basic")
    cubes[key] = ds.drop_vars(["NDVI_strict_valid_mask_static", "mask_phys_basic", "mask_phys_strict"])

## Sentinel 1 processing

In [None]:
# Visualize S1 and S2 acquisition time steps
for key in cubes.keys():
    print("##############################" , key, "###################")
    plot_acquisition_timelines(cubes[key]) # Nutze hier deine Datasets

In [None]:
# Get global 99.5 percentile values for vv and vh 
global_vv_max, global_vh_max = find_global_veg_clipping_values(cubes)
print(global_vv_max, global_vh_max)

for key in cubes.keys():
    print(key)
    
    # Apply clipping
    cubes[key] = clip_s1_data(cubes[key], global_vv_max, global_vh_max)
    
    # Apply lee filtering
    cubes[key] = apply_lee_to_ds(cubes[key], bands=['vv', 'vh'], win_size=7, cu=0.25)

    # Normalize bands
    cubes[key] = normalize_s1_vars(cubes[key], global_vv_max, global_vh_max)

In [None]:
for key in cubes.keys():
    print(f"################################ Cube: {key} ################################")
    cubes[key] = align_all_to_5d(cubes[key], "basic")

## Optional (but maybe worth testing)

In [None]:
for key in cubes.keys():
    for index in ["DpRVIVV", "VVVHS", "VHVVR"]:
        print(f"Calculating index: {index}")
        cubes[key][index] = calculate_SAR_index(cubes[key], index)

# ERA-5 climate data

In [None]:
path = "/net/data/arceme/era5_land/"
pei_cube_name = "PEICube_era5land.zarr"
t2_cube_name = "t2_ERA5land.zarr"
tp_cube_name = "tp_ERA5land.zarr"

pei_cube = xr.open_zarr(os.path.join(path, pei_cube_name), consolidated=False)
t2_cube  = xr.open_zarr(os.path.join(path, t2_cube_name), consolidated=False)
tp_cube  = xr.open_zarr(os.path.join(path, tp_cube_name), consolidated=False)

In [None]:
era5_cubes = [pei_cube, t2_cube, tp_cube]

for key in cubes.keys():
    ds = cubes[key]
    era5_features = [] # Liste für gesammelte ERA5-Cubes

    for era5_cube_raw in era5_cubes:
        # 1. Räumlich/Zeitlicher Subset
        temp_cube = subset_era5_time(era5_cube_raw, ds)
        temp_cube = subset_era5_spatial(temp_cube, ds, True)
        
        era5_vars = list(temp_cube.data_vars)
        
        # 2. Aggregation & Alignment
        temp_cube = aggregate_era5_metrics_new(temp_cube, ds, era5_vars)
        temp_cube = create_uniform_era5_features(ds, temp_cube)

        # 3. Absicherung: Koordinaten exakt angleichen
        temp_cube = temp_cube.reindex_like(ds, method='nearest')
        
        era5_features.append(temp_cube)

    # 4. Alle ERA5 Variablen auf einmal zum S2-Cube hinzufügen
    ds = xr.merge([ds] + era5_features)
    cubes[key] = ds
    print(f"✅ Integrated all ERA5 variables into cube {key}")

## Now check for datatypes, variables to drop and dimensions and save

In [None]:
to_drop = ['B01',
 'B02',
 'B03',
 'B04',
 'B05',
 'B06',
 'B07',
 'B08',
 'B09',
 'B11',
 'B12',
 'quality_mask_basic',
 'quality_mask_strict',#
 's1_quality_mask']

In [None]:
cubes["ds_1"]

In [None]:
for key in  cubes.keys():
    # cubes[key] = cubes[key].drop_vars(to_drop)
    cubes[key]["vv"] = cubes[key]["vv"].astype("float32")
    cubes[key]["vh"] = cubes[key]["vh"].astype("float32")
   

In [None]:

# Zielverzeichnis erstellen
output_dir = "../processed_data_final_new"

for key, ds in cubes.items():
    # Definiere den Pfad für diesen spezifischen Cube innerhalb des Zarr-Groups
    # oder als separaten Store
    cube_path = f"{output_dir}/{key}"
    
    # Effizientes Speichern:
    # 1. 'compute=True' führt die Dask-Berechnungen jetzt aus
    # 2. 'mode=w' schreibt den Cube (oder 'a' für anfügen)
    ds.to_zarr(cube_path, mode='w', consolidated=True)
    print(f"✅ Cube {key} successfully saved to Zarr as float32.")