In [None]:
import warnings
from easydict import EasyDict as edict
import pandas as pd
# cudf
import cudf
from utils.data_preprocess_utils import get_config, get_all_files
warnings.filterwarnings("ignore")

In [None]:
import utils.data_preprocess_utils as pre
import concurrent.futures

def preprocess_csv(file_path, cfg):
    # print('Preprocessing: ', file_path)
    df = cudf.read_csv(file_path, usecols=cfg.colums_to_extract)
    df['# Timestamp'] = df['# Timestamp'].astype('datetime64[ns]')
    # Data Cleaning
    df = pre.rename_columns(df, cfg)
    df = pre.drop_duplicates(df)
    df = pre.filter_missing_value(df, cfg.colums_to_drop_na)
    df = pre.filter_mmsi(df)
    df = pre.rmove_outlinears(df=df, LAT_MIN=cfg.LAT_MIN, LAT_MAX=cfg.LAT_MAX, LON_MIN=cfg.LON_MIN, LON_MAX=cfg.LON_MAX)
    df = pre.filter_minority(df=df, threshold=cfg.traj_points_threshold, column='MMSI')
    df = pre.filter_SOG(df=df, SOG_threshold=cfg.SOG_threshold)
    # print('Data Cleaning Done!')
    df = df.to_pandas()
    # Data Completion
    df = pre.complete_missing_value(cfg=cfg, df=df)
    # print('Data Completion Done!')
    # df = pre.trans2cat(df)
    df = df.sort_values(by=['MMSI', 'time']).reset_index(drop=True)
    # Calc Nautical Distance
    df['distance'] = df.groupby('MMSI').apply(pre.haversine_distance).reset_index(drop=True)
    # Calc Time Diff
    df['SOG_diff'] = df.groupby('MMSI')['SOG'].diff(periods=1).fillna(0)
    # Calc COG Diff
    df['COG_diff'] = df.groupby('MMSI')['COG'].diff(periods=1).fillna(0)
    # if abs(COG_diff) > 180, COG_diff = 360 - abs(COG_diff)
    df['COG_diff'] = df['COG_diff'].apply(lambda x: abs(x) if abs(x) < 180 else 360 - abs(x))
    # Calc Time Diff
    df['time_diff'] = df.groupby('MMSI')['time'].diff(periods=1)
    # Calc Distance Diff
    df['distance_diff'] = df.groupby('MMSI')['distance'].diff(periods=1).fillna(0).abs()
    # Split Trips
    df['trips_id'] = df.groupby('MMSI').apply(pre.split_trips, cfg=cfg).reset_index(drop=True)
    df = df.sort_values(by=['trips_id', 'time']).reset_index(drop=True)
    df = pre.filter_minority(df=df, threshold=cfg.trip_points_threshold, column='trips_id')

    preprocessed_file_path = cfg.out_dir + file_path.split('/')[-1].split('.')[0] + '.feather'
    df = df.reset_index(drop=True)
    df.to_feather(preprocessed_file_path)
    print('Preprocessed file saved: ', preprocessed_file_path)
    return None

def process_file(file_path, start_date, end_date, cfg):
    try:
        yyyy, mm, dd = file_path.split('/')[-1].split('.')[0].split('-')[1:]
        # print(yyyy, mm, dd)
        if dd is not None:
            date = pd.to_datetime(yyyy+'-'+mm+'-'+dd)
        else:
            date = pd.to_datetime(yyyy+'-'+mm)
        if date >= pd.to_datetime(start_date) and date <= pd.to_datetime(end_date):
            # print('Processing: ', file_path)
            preprocess_csv(file_path, cfg)
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
    

def preprocess_csvs(data_dir, start_date, end_date, cfg):
    file_paths = get_all_files(data_dir)
    start_date = pd.to_datetime(start_date)
    end_date = pd.to_datetime(end_date)
    print('Start Preprocessing...')
    print('Start Date: ', start_date)
    print('End Date: ', end_date)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        futures = [executor.submit(process_file, file_path, start_date, end_date, cfg) for file_path in file_paths]
        # Wait for all threads to finish
        concurrent.futures.wait(futures)



cfg = edict(get_config('./cfg/data_preprocess_cfg.yaml'))
preprocess_csvs(cfg.in_dir, '2019-01-01', '2019-03-31', cfg)
