In [None]:
# you do not need to run, this code is for presetitnh 
import dask.dataframe as dd
import os
import glob

files_v1 = [
    "../data/commitinfo/provider_authz_v1_commit.parquet",
    "../data/commitinfo/req_auth_v1_commit.parquet",
    "../data/commitinfo/http_authz_v1_commit.parquet",
    "../data/commitinfo/tcp_authz_v1_commit.parquet",
    "../data/commitinfo/jwt_authz_v1_commit.parquet",
    "../data/commitinfo/provider_authz_v1_commit.parquet",
    "../data/commitinfo/ingress_authz_ip_v1_commit.parquet",
    "../data/commitinfo/ingress_authz_remote_ip_v1_commit.parquet",
    "../data/commitinfo/mtls_strict_v1_commit.parquet",
    "../data/commitinfo/mtls_permissive_v1_commit.parquet",
    "../data/commitinfo/mtls_disable_v1_commit.parquet",
]

files_v1beta1 = [
    "../data/commitinfo/peer_auth_v1beta1_commit.parquet",
    "../data/commitinfo/req_auth_v1beta1_commit.parquet",
    "../data/commitinfo/http_authz_v1beta1_commit.parquet",
    "../data/commitinfo/tcp_authz_v1beta1_commit.parquet",
    "../data/commitinfo/jwt_authz_v1beta1_commit.parquet",
    "../data/commitinfo/provider_authz_v1beta1_commit.parquet",
    "../data/commitinfo/ingress_authz_ip_v1beta1_commit.parquet",
    "../data/commitinfo/ingress_authz_remote_ip_v1beta1_commit.parquet",
    "../data/commitinfo/mtls_strict_v1beta1_commit.parquet",
    "../data/commitinfo/mtls_permissive_v1beta1_commit.parquet",
    "../data/commitinfo/mtls_disable_v1beta1_commit.parquet",
]

files_v1alpha1 = [
    "../data/commitinfo/peer_auth_v1alpha1_commit.parquet",
    "../data/commitinfo/req_auth_v1alpha1_commit.parquet",
    "../data/commitinfo/any_authz_v1alpha1_commit.parquet",
    "../data/commitinfo/mtls_strict_v1alpha1_commit.parquet",
    "../data/commitinfo/mtls_permissive_v1alpha1_commit.parquet",
]


processed_dirs = {
    "v1": "../data/processed/v1",
    "v1beta1": "../data/processed/v1beta1",
    "v1alpha1": "../data/processed/v1alpha1",
}

final_dirs = {
    "v1": "../data/final/v1",
    "v1beta1": "../data/final/v1beta1",
    "v1alpha1": "../data/final/v1alpha1",
}

def extract_and_save(file_list, out_dir):
    os.makedirs(out_dir, exist_ok=True)

    for f in file_list:
        print(f"Processing {f}...")
        try:
            ddf = dd.read_parquet(f, columns=['sha', 'commit', 'repository_full_name'], blocksize="128MB")


            def extract(df):
                df['author_date'] = df['commit'].apply(lambda x: x['author']['date'])
                return df[['sha', 'author_date', 'repository_full_name']]

            ddf = ddf.map_partitions(
                extract,
                meta={'sha': 'object', 'author_date': 'object', 'repository_full_name': 'object'}
            )

  
            ddf = ddf.map_partitions(lambda df: df.drop_duplicates(subset='sha'), meta=ddf._meta)


            name = os.path.basename(f).replace(".parquet", "")
            out_path = os.path.join(out_dir, f"{name}_processed")
            ddf.to_parquet(out_path, overwrite=True)

 
            ddf_check = dd.read_parquet(out_path)
            if 'sha' not in ddf_check.columns:
                print(f"[MISSING] 'sha' column missing in saved file: {out_path}")
            else:
                print(f"[OK] 'sha' column exists in saved file: {out_path}")

        except Exception as e:
            print(f"[ERROR] Failed to process {f}: {e}")

def combine_and_save(dir_path, final_out):
    files = glob.glob(os.path.join(dir_path, "*_processed"))
    if not files:
        print(f"[WARNING] No processed files found in {dir_path}")
        return

    all_parquet_files = []
    for d in files:

        parquet_files = glob.glob(os.path.join(d, "*.parquet"))
        if parquet_files:
            all_parquet_files.extend(parquet_files)
        else:
            print(f"[SKIP] No parquet files found in {d}")

    if not all_parquet_files:
        print(f"[ERROR] No valid parquet files to combine in {dir_path}")
        return


    ddf = dd.read_parquet(all_parquet_files, columns=['sha', 'author_date', 'repository_full_name'])


    ddf = ddf.set_index('sha', sorted=False, drop=False)
    ddf = ddf.map_partitions(lambda df: df[~df.index.duplicated(keep='first')])
    ddf = ddf.reset_index(drop=True)

    os.makedirs(final_out, exist_ok=True)
    ddf.to_parquet(final_out, overwrite=True)
    print(f"Combined data saved to {final_out}")

# v1
extract_and_save(files_v1, processed_dirs['v1'])
combine_and_save(processed_dirs['v1'], final_dirs['v1'])

# v1beta1
extract_and_save(files_v1beta1, processed_dirs['v1beta1'])
combine_and_save(processed_dirs['v1beta1'], final_dirs['v1beta1'])

# v1alpha1
extract_and_save(files_v1alpha1, processed_dirs['v1alpha1'])
combine_and_save(processed_dirs['v1alpha1'], final_dirs['v1alpha1'])
