In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import glob
import os
from pathlib import Path
from tqdm import tqdm
import gzip
import holoviews as hv
import hdf5storage

import unfoc

from utig_radar_loading import file_util, stream_util, geo_util, segment_splits, opr_gps_file_generation, opr_header_generation, preprocessing

In [None]:
pd.options.mode.copy_on_write = True
tqdm.pandas()
hv.extension('bokeh')

In [None]:
use_cache = True
cache_dir = "outputs/file_index.csv"
base_path = "/kucresis/scratch/data/UTIG"

df_files = file_util.load_file_index_df(base_path, cache_dir, read_cache=use_cache)
df_artifacts = file_util.create_artifacts_df(df_files) # df_artifacts is a dataframe with one row per stream file

df_artifacts.head()

In [None]:
# Group by transect, selecting the stream types that are needed

usable_artifact_types = {
    "gps": {"stream_types": ["GPSnc1", "GPStp2", "GPSap3"], "file_names": ["xds.gz"]},
    "radar": {"stream_types": ["RADnh5", "RADnh3", "RADnh2", "RADnh4", "RADjh1"], "file_names": ["bxds"]},
    "imu": {"stream_types": ["AVNnp1"], "file_names": ["bxds"]},
}

df_transects = file_util.arrange_by_transect(df_artifacts, usable_artifact_types)
df_transects.head()

In [None]:
df_all_seasons = file_util.assign_seasons(df_transects)
df_all_seasons.head()

In [None]:
print(f"The following seasons were found in the dataset:")
seasons = np.array(df_all_seasons['season'].unique())
seasons.sort()
print(seasons)

### Select a single season to extract

In [None]:
season_year = 2017

In [None]:
# Create df_season (filtered to selected season) and check for missing data

df_season = df_all_seasons[df_all_seasons['season'] == season_year]
df_season = df_season.sort_values(by='start_timestamp')

df_season_missing_data = df_season[df_season['radar_path'].isnull()]
df_season = df_season[df_season['radar_path'].notnull()]

if len(df_season_missing_data) > 0:
    print(f"[WARNING] Missing radar data for {len(df_season_missing_data)} transects out of {len(df_season)+len(df_season_missing_data)}")

df_season_missing_imu = df_season[df_season['imu_path'].isnull()]
if len(df_season_missing_imu) > 0:
    print(f"[WARNING] Missing IMU data for {len(df_season_missing_imu)} transects")

# Display information about this season

# - Types of stream files:
print(f"GPS stream types: {df_season['gps_stream_type'].unique()}")
print(f"Radar stream types: {df_season['radar_stream_type'].unique()}")
print(f"IMU stream types: {df_season['imu_stream_type'].unique()}")

# - Sets
print(f"Sets: {df_season.reset_index()['set'].unique()}")

# - Projects
print(f"Projects: {df_season.reset_index()['prj'].unique()}")

# - Aircraft
ac_ident = df_season.reset_index()['set'].iloc[0][:3]
print(f"Aircraft identifier: {ac_ident}")

# - Season name
season_name = f"{season_year}_Antarctica_Basler{ac_ident}"
print(f"Season name: {season_name}")

In [None]:
# Assign each transect to a segment

df_season = segment_splits.assign_segments(df_season, timestamp_field='tim', parse_ct=False, timestamp_split_threshold=2000) # Split based on the 'tim' counter (in ms)
#df_season = segment_splits.assign_segments(df_season, timestamp_field='TIMESTAMP', parse_ct=True, timestamp_split_threshold=pd.Timedelta(milliseconds=2000)) # OR, split based on the 'TIMESTAMP' field

n_segments = len(df_season['segment_path'].unique())
max_segments_per_day = df_season['segment_number'].max()

print(f"Created {n_segments} segments. Maximum segment number on a single day is {max_segments_per_day}.")
df_season.head()

In [None]:
# Create map of segments

segment_dfs = geo_util.load_gps_data(df_season)
missing_data_dfs = geo_util.load_gps_data(df_season_missing_data)

paths = []

# Add missing data
if len(missing_data_dfs) > 0:
    _, p = geo_util.create_path(missing_data_dfs)
    paths.append(p.opts(color='red', line_width=3).relabel('Missing Radar Data'))
else:
    print("No missing data to display.")

# Add segments with data
for segment_path in df_season['segment_path'].unique():
    dfs_list_tmp = [df for df in segment_dfs if df['segment_path'].iloc[0] == segment_path]
    _, p = geo_util.create_path(dfs_list_tmp)
    p = p.relabel(f"Segment {segment_path}")
    paths.append(p)

p = stream_util.create_antarctica_basemap() * hv.Overlay(paths)
p = p.opts(aspect='equal', frame_width=800, frame_height=800, tools=['hover'])
p = p.opts(title=season_name, legend_position='right')
p

In [None]:
hv.save(p, f"outputs/maps/{season_name}.html")

### Create GPS support files for each segment

In [None]:
# gps_df = stream_util.load_xds_stream_file(df_season.iloc[5]['gps_path'])
# imu_df = stream_util.parse_binary_AVNnp1(df_season.iloc[5]['imu_path'])

# print(gps_df.columns)
# print(imu_df.columns)

# gps_time_keys = ['TIMESTAMP', 'GPS_TIME']
# imu_time_keys = ['GPS_TIME', 'unix_time']

# for key in gps_time_keys:
#     if key in gps_df.columns:
#         raw_range = gps_df[key].max() - gps_df[key].min()
#         print(f"GPS {key}: min={gps_df[key].min()}, max={gps_df[key].max()}")
#         print(f"  type: {gps_df[key].dtype}, raw range: {raw_range}")
#         if np.issubdtype(gps_df[key].dtype, np.number):
#             print(f"   {raw_range} seconds = {raw_range/60} minutes")

#             date_since_gps_epoch = pd.to_datetime('1980-01-06 00:00:00') + pd.to_timedelta(gps_df[key], unit='s')
#             print(f"   which is {date_since_gps_epoch.min()} to {date_since_gps_epoch.max()} UTC (assuming GPS epoch)")
#             date_since_unix_epoch = pd.to_datetime(gps_df[key], unit='s', origin='unix')
#             print(f"   which is {date_since_unix_epoch.min()} to {date_since_unix_epoch.max()} UTC (assuming Unix epoch)")
#             date_since_ni_epoch = pd.to_datetime('1904-01-01 00:00:00') + pd.to_timedelta(gps_df[key], unit='s')
#             print(f"   which is {date_since_ni_epoch.min()} to {date_since_ni_epoch.max()} UTC (assuming NI epoch)")

# for key in imu_time_keys:
#     if key in imu_df.columns:
#         raw_range = imu_df[key].max() - imu_df[key].min()
#         print(f"IMU {key}: min={imu_df[key].min()}, max={imu_df[key].max()}")
#         print(f"  type: {imu_df[key].dtype}, raw range: {raw_range}")
#         if np.issubdtype(imu_df[key].dtype, np.number):
#             print(f"   {raw_range} seconds = {raw_range/60} minutes")

#             date_since_gps_epoch = pd.to_datetime('1980-01-06 00:00:00') + pd.to_timedelta(imu_df[key], unit='s')
#             print(f"   which is {date_since_gps_epoch.min()} to {date_since_gps_epoch.max()} UTC (assuming GPS epoch)")
#             date_since_unix_epoch = pd.to_datetime(imu_df[key], unit='s', origin='unix')
#             print(f"   which is {date_since_unix_epoch.min()} to {date_since_unix_epoch.max()} UTC (assuming Unix epoch)")
#             date_since_ni_epoch = pd.to_datetime('1904-01-01 00:00:00') + pd.to_timedelta(imu_df[key], unit='s')
#             print(f"   which is {date_since_ni_epoch.min()} to {date_since_ni_epoch.max()} UTC (assuming NI epoch)")

In [None]:
overwrite_existing_gps_files = False

gps_paths = df_season.groupby(['segment_date_str', 'segment_number'])[['segment_date_str', 'segment_number', 'gps_path', 'imu_path']].apply(
    opr_gps_file_generation.make_segment_gps_file,
    include_groups=False,
    output_base_dir=f"outputs/gps/{season_name}",
    overwrite=overwrite_existing_gps_files)

gps_paths

### Create parameter spreadsheet starting templates

In [None]:
def radar_paths_ordered(x):
    l = x.sort_values('start_timestamp')['radar_path'].tolist()
    l = [str(Path(*Path(p).parts[-5:-1])) for p in l]
    return "{'" + "', '".join(l) + "'}"

radar_paths = df_season.groupby(['segment_date_str', 'segment_number'])[['radar_path', 'start_timestamp']].apply(radar_paths_ordered)
radar_paths

In [None]:
def first_prj_set_str(x):
    return list(x['prj'].unique())

def transect_names(x):
    return list(x.sort_values('start_timestamp')['trn'])

mission_names = df_season.reset_index().groupby(['segment_date_str', 'segment_number'])[['prj', 'set']].apply(first_prj_set_str)
transect_names = df_season.reset_index().groupby(['segment_date_str', 'segment_number'])[['start_timestamp', 'trn']].apply(transect_names)

In [None]:
defaults = preprocessing.load_defaults(f'src/utig_radar_loading/defaults/{season_name}.yaml')

base_params_dir = Path(f'outputs/params/{season_name}')
Path(base_params_dir).mkdir(parents=True, exist_ok=True)

def make_parameter_sheet(default_values, segments, overrides={}):
    df = pd.DataFrame(default_values, index=segments)
    for key, value in overrides.items():
       df[key] = value
    return df

make_parameter_sheet(defaults['cmd'], radar_paths.index, overrides={
    'mission_names': mission_names,
    'notes': transect_names
}).to_csv(base_params_dir / 'cmd.csv')

make_parameter_sheet(defaults['records'], radar_paths.index, overrides={
    'file.board_folder_name': radar_paths,
    'gps.fn': gps_paths
}).to_csv(base_params_dir / 'records.csv')

make_parameter_sheet(defaults['qlook'], radar_paths.index).to_csv(base_params_dir / 'qlook.csv')
make_parameter_sheet(defaults['radar'], radar_paths.index).to_csv(base_params_dir / 'radar.csv')

### Generate temporary header files

In [None]:
df_season_subset = df_season #.loc[[('PEL', 'JKB2u', 'Y20b'), ('ICP10', 'JKB2u', 'F01T02a'), ('PEL', 'JKB2u', 'X48a')]]
df_season_subset.head()

In [None]:
import dask.dataframe as dd
from dask.distributed import Client, as_completed, progress
from dask import delayed

print("Setting up Dask LocalCluster for parallel processing...")
client = Client(n_workers=10)
print(f"Dashboard link: {client.dashboard_link}")
client

In [None]:
header_base_dir = f"/kucresis/scratch/tteisberg_sta/scripts/opr_user_tmp/headers/rds/{season_name}/"

# Get file locations (this is fast, no need to parallelize)
header_file_locations = df_season_subset['radar_path'].apply(opr_header_generation.get_header_file_location, base_dir=header_base_dir)

# Function to get header and save it
def get_and_save_header(path, fn):
    """Get header information and save it to file"""
    header = opr_header_generation.get_header_information(path)
    
    # Create directory if it doesn't exist
    fn_path = Path(fn)
    fn_path.parent.mkdir(parents=True, exist_ok=True)
    
    # Save to file
    hdf5storage.savemat(str(fn_path), header, format='7.3')
    print(f"Saved header to {fn}")
    
    return header

# Parallelized version using Dask
# Create delayed tasks for each file
delayed_tasks = []
files_to_process = []

for path, fn in zip(df_season_subset['radar_path'], header_file_locations):
    if Path(fn).exists():
        print(f"Header file already exists for {path}, skipping.")
    else:
        delayed_tasks.append(delayed(get_and_save_header)(path, fn))
        files_to_process.append((path, fn))

if len(delayed_tasks) > 0:
    # Compute in parallel with progress bar
    print(f"Processing and saving {len(delayed_tasks)} header files in parallel...")
    
    # Submit all tasks
    futures = client.compute(delayed_tasks)
    
    # Track progress
    progress(futures)
    
    # Gather results
    headers_list = client.gather(futures)
    
    print(f"Successfully generated and saved {len(headers_list)} header files.")
else:
    print("No header files need to be generated, all files already exist.")

In [None]:
# if len(header_file_locations_to_generate) > 0:
#     for header, fn in zip(headers.values, header_file_locations_to_generate.values()):
#         fn = Path(fn)
#         fn.parent.mkdir(parents=True, exist_ok=True)

#         header_tmp = header.copy()
#         header_tmp['offset'] = header_tmp['offset'].astype(np.int64)  # Convert offsets to int64 for saving -- TODO: should have been this type originally

#         print(f"Writing header to {fn}")
#         hdf5storage.savemat(str(fn), header_tmp, format='7.3')