In [1]:
import pandas as pd
from pathlib import Path
from tqdm import tqdm

In [2]:
def read_dist_hls_ts(site_id: str):
    base_url = 'https://raw.githubusercontent.com/OPERA-Cal-Val/DIST-Validation/refs/heads/main/mapLabelsv1sample'
    df = pd.read_csv(f'{base_url}/{site_id}_DIST-ALERT_v1sample.csv')
    df['site_id'] = site_id
    return df

In [3]:
df = read_dist_hls_ts('40')
df.head()

Unnamed: 0,granuleID,SensingTime,ProductionTime,VEG-DIST-STATUS,VEG-ANOM,VEG-IND,VEG-HIST,VEG-ANOM-MAX,VEG-DIST-CONF,VEG-DIST-DATE,...,VEG-LAST-DATE,GEN-DIST-STATUS,GEN-ANOM,GEN-ANOM-MAX,GEN-DIST-CONF,GEN-DIST-DATE,GEN-DIST-COUNT,GEN-DIST-DUR,GEN-LAST-DATE,site_id
0,OPERA_L3_DIST-ALERT-HLS_T10TFK_20211001T185119...,20211001,2023-11-01 12:04:49,1,45,25,70,45,45,20211001,...,274,1,38,38,38,20211001.0,1,1,274,40
1,OPERA_L3_DIST-ALERT-HLS_T10TFK_20211004T190239...,20211004,2023-11-01 12:06:01,1,255,255,70,45,45,20211001,...,274,1,-1,38,38,20211001.0,1,1,274,40
2,OPERA_L3_DIST-ALERT-HLS_T10TFK_20211006T185151...,20211006,2023-11-01 12:07:16,1,255,255,70,45,45,20211001,...,274,1,-1,38,38,20211001.0,1,1,274,40
3,OPERA_L3_DIST-ALERT-HLS_T10TFK_20211006T185251...,20211006,2023-11-01 12:08:30,1,255,255,70,45,45,20211001,...,274,1,-1,38,38,20211001.0,1,1,274,40
4,OPERA_L3_DIST-ALERT-HLS_T10TFK_20211008T183930...,20211008,2023-11-01 12:09:27,1,255,255,70,45,45,20211001,...,274,1,-1,38,38,20211001.0,1,1,274,40


In [4]:
FINISHED_STATES = [7, 8]
CHANGE_STATES_AGGRESSIVE = [1, 2, 3, 4, 5, 6]
CHANGE_STATES_CONSERVATIVE = [4, 5, 6]
NO_CHANGE_STATES = [0]

In [5]:
def mark_val_change(obs:str, conservative=True):
    change_vals = ['VLmaj', 'OCmaj', 'OCtotal', 'VLtotal']
    no_change_vals = ['noChange']
    if not conservative:
        change_vals += ['VLmin', 'OCmin']
    else:
        no_change_vals += ['VLmin', 'OCmin']
    
    if obs in change_vals:
        return 1
    elif obs in no_change_vals:
        return 0
    elif obs in ['noObs', '0', 0]:
        return 255
    else:
        print(obs)
        raise ValueError

In [50]:
def confirm_changes(arr, min_length=3):
    result = []
    current_run = []
    num_255 = 0

    for num in arr:
        if num == 1:
            current_run.append(1)
        elif num == 255:
            if len(current_run) > 0 and num_255 < 4:
                current_run.append(255)  # Allow up to 4 '255's between '1's
                num_255 += 1
            else:
                # Too many '255's or no valid run of 1's to append to
                if len([x for x in current_run if x == 1]) >= min_length:
                    result.extend(current_run)
                else:
                    result.extend([0] * len(current_run))  # Replace short run with 0's
                current_run = []
                result.append(255)  # Keep the '255' in the result
                num_255 = 0
        else:  # num == 0
            # A '0' means we finalize the current run
            if len([x for x in current_run if x == 1]) >= min_length:
                result.extend(current_run)
            else:
                result.extend([0] * len(current_run))  # Replace short run with 0's
            result.append(0)  # Keep the '0' in the result
            current_run = []
            num_255 = 0

    # Handle the last run of 1's and 255's after the loop
    if len([x for x in current_run if x == 1]) >= min_length or (len(current_run) > 0 and num_255 <= 2):
        result.extend(current_run)
    else:
        result.extend([0] * len(current_run))

    return result

def format_ts(df: pd.DataFrame) -> pd.DataFrame:
    df.columns = [c.lower() if '-' in c else c for c in df.columns]

    # This is the important part - we do not have any data in this pixel when there is no observation there!!!
    # will assign to change indication
    no_obs_mask = df['veg-anom'] == 255
    
    
    df_f = df[['site_id', 'granuleID', 'SensingTime', 'veg-dist-status', 'gen-dist-status']].copy()
    
    df_f.loc[no_obs_mask, 'veg-dist-status'] = [255] * no_obs_mask.sum()
    df_f.loc[no_obs_mask, 'gen-dist-status'] = [255] * no_obs_mask.sum()

    df_f.rename(columns={'granuleID': 'granule_id', 'SensingTime': 'sensing_time', 'veg-dist-status': 'veg_status', 'gen-dist-status': 'gen_status'}, inplace=True)
    df_f['sensing_time'] = df_f.sensing_time.astype(str)
    df_f['date_str'] = df_f.sensing_time.map(lambda ts: f'{ts[:4]}-{ts[4:6]}-{ts[6:]}')
    df_f['sensing_time'] = df_f.sensing_time.map(lambda ts: pd.Timestamp(f'{ts[:4]}-{ts[4:6]}-{ts[6:]}'))

    site_id = df_f['site_id'].tolist()[0]
    
    val_data_dir = Path('val_timeseries_by_site')
    df_site_ts = pd.read_csv(val_data_dir/f'site_{site_id}.csv')
    n_before = df_f.shape[0]
    df_f = pd.merge(df_f, df_site_ts[['date', 'obs']], left_on='date_str', right_on='date', how='inner')
    assert n_before == df_f.shape[0]

    df_f.drop(columns='date_str', inplace=True)
    df_f.rename(columns={'obs': 'val_obs'}, inplace=True)

    df_f['val_change_conservative'] = df_f.val_obs.map(mark_val_change)
    df_f['val_change_aggressive'] = df_f.val_obs.map(lambda obs: mark_val_change(obs, conservative=False))
    
    df_f['dist-hls-veg_change_conservative'] = df_f.veg_status.map(lambda status: int(status in CHANGE_STATES_CONSERVATIVE))
    df_f['dist-hls-veg_change_aggressive'] = df_f.veg_status.map(lambda status: int(status in CHANGE_STATES_AGGRESSIVE))

    df_f['dist-hls-gen_change_conservative'] = df_f.gen_status.map(lambda status: int(status in CHANGE_STATES_CONSERVATIVE))
    df_f['dist-hls-gen_change_aggressive'] = df_f.gen_status.map(lambda status: int(status in CHANGE_STATES_AGGRESSIVE))

    
    
    for col in df_f.columns:
        if 'change' in col:
            df_f[col + '_confirmed'] = confirm_changes(df_f[col].tolist())

    return df_f

In [51]:
df_ts = format_ts(df)
df_ts.head()

Unnamed: 0,site_id,granule_id,sensing_time,veg_status,gen_status,date,val_obs,val_change_conservative,val_change_aggressive,dist-hls-veg_change_conservative,dist-hls-veg_change_aggressive,dist-hls-gen_change_conservative,dist-hls-gen_change_aggressive,val_change_conservative_confirmed,val_change_aggressive_confirmed,dist-hls-veg_change_conservative_confirmed,dist-hls-veg_change_aggressive_confirmed,dist-hls-gen_change_conservative_confirmed,dist-hls-gen_change_aggressive_confirmed
0,228,OPERA_L3_DIST-ALERT-HLS_T36SVD_20211003T082849...,2021-10-03,255,255,2021-10-03,noChange,0,0,0,0,0,0,0,0,0,0,0,0
1,228,OPERA_L3_DIST-ALERT-HLS_T36SVD_20211003T083811...,2021-10-03,255,255,2021-10-03,noChange,0,0,0,0,0,0,0,0,0,0,0,0
2,228,OPERA_L3_DIST-ALERT-HLS_T36SVD_20211005T082739...,2021-10-05,255,255,2021-10-05,noChange,0,0,0,0,0,0,0,0,0,0,0,0
3,228,OPERA_L3_DIST-ALERT-HLS_T36SVD_20211008T083749...,2021-10-08,255,255,2021-10-08,noChange,0,0,0,0,0,0,0,0,0,0,0,0
4,228,OPERA_L3_DIST-ALERT-HLS_T36SVD_20211010T082851...,2021-10-10,255,255,2021-10-10,noChange,0,0,0,0,0,0,0,0,0,0,0,0


# Automate and Serialize

In [52]:
data_dir = Path('dist_hls_timeseries_by_site')
data_dir.mkdir(exist_ok=True)

In [53]:
def retreive_and_serialize_one(site_id: str):
    df = read_dist_hls_ts(site_id)
    df = format_ts(df)
    out_path = data_dir / f'site_{site_id}.parquet'
    df.to_parquet(out_path)
    return out_path

In [54]:
paths = list(map(retreive_and_serialize_one, tqdm(range(1, 301))))
paths[:3]

100%|███████████████████████████████████████| 300/300 [00:36<00:00,  8.23it/s]


[PosixPath('dist_hls_timeseries_by_site/site_1.parquet'),
 PosixPath('dist_hls_timeseries_by_site/site_2.parquet'),
 PosixPath('dist_hls_timeseries_by_site/site_3.parquet')]