In [None]:
import os
import time
import pandas as pd
import yaml
import modin.pandas as mpd
import dask.dataframe as dd

# Ensure required backends for Modin and Dask are initialized
try:
    import ray
    ray.init()  # Initialize Ray for Modin

    from dask.distributed import Client
    client = Client()  # Start Dask client
except Exception as e:
    print(f"Initialization error for Modin/Dask backends: {e}")

# File paths
file_path = '/Users/vaweng02/Desktop/Combined_Flights_2021.csv'
yaml_path = 'file.yaml'
output_path = 'output_file.csv.gz'

# Function to validate YAML schema
def validate_yaml_schema(yaml_file):
    try:
        with open(yaml_file, 'r') as f:
            schema = yaml.safe_load(f)
            print("YAML schema loaded successfully.")
            return schema
    except FileNotFoundError:
        print("Error: YAML schema file not found.")
        return None
    except yaml.YAMLError as e:
        print(f"Error reading YAML file: {e}")
        return None

# Function to load a CSV file
def load_csv(file_path):
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"File not found at path: {file_path}")
    print("Loading dataset...")
    return pd.read_csv(file_path)

# Function to clean column names
def clean_column_names(df):
    print("Cleaning column names...")
    df.columns = df.columns.str.replace('[^a-zA-Z0-9]+', '_', regex=True)
    return df

# Function to process dataframe
def process_dataframe(df, subset_columns):
    print("Processing dataframe...")
    if not all(col in df.columns for col in subset_columns):
        raise ValueError("One or more subset columns are not present in the dataframe.")
    subset_df = df[subset_columns]
    return subset_df

# Main workflow
def main():
    try:
        # Load and validate YAML schema
        schema = validate_yaml_schema(yaml_path)
        if schema is None:
            print("Skipping YAML validation.")

        # Load dataset
        df = load_csv(file_path)

        # Clean column names
        df = clean_column_names(df)

        # Columns to subset (customize as needed)
        subset_columns = ['Column1', 'Column2', 'Column3']  # Replace with actual column names

        # Process dataframe
        subset_df = process_dataframe(df, subset_columns)

        # Save processed data to compressed file
        print("Saving processed data to compressed file...")
        subset_df.to_csv(output_path, sep='|', compression='gzip', index=False)
        print(f"Processed data saved to: {output_path}")

    except FileNotFoundError as fnf_error:
        print(fnf_error)
    except ValueError as val_error:
        print(val_error)
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

if __name__ == "__main__":
    start_time = time.time()
    main()
    print(f"Execution time: {time.time() - start_time:.2f} seconds")
