In [None]:
"""Example Python script that processes hotel reservation data using src/hotel_reservations/data_preprocessor.py.

This was developed for Deliverable 1 of the MarvelousMLOps project.
"""
import pandas as pd
from loguru import logger
from marvelous.logging import setup_logging
from pyspark.sql import SparkSession

from hotel_reservations.config import ProjectConfig
from hotel_reservations.data_preprocessor import DataProcessor


def main(config_path: str, data_path: str = "../data/data.csv", env: str = "dev") -> None:
    """Process hotel cancellation data.

    :param config_path: Path to the YAML configuration file
    :param env: Environment to use (dev, acc, prd)
    """
    # Establish config
    config = ProjectConfig.from_yaml(config_path="../project_config.yml", env="dev")

    # Set up logging
    setup_logging(log_file="logs/marvelous-1da.log")  # Assuming this is required and works.

    # Initialize Spark session
    spark = SparkSession.builder.getOrCreate()

    # Load and validate the configuration
    try:
        config = ProjectConfig.from_yaml(config_path, env)
        print(f"Configuration loaded for environment: {env}")
    except Exception as e:
        print(f"Error loading configuration: {e}")
        spark.stop()
        return

    # Load the data
    try:
        raw_data = pd.read_csv(data_path)
        print(f"Data loaded from {data_path}, shape: {raw_data.shape}")
    except Exception as e:
        print(f"Error loading data: {e}")
        spark.stop()
        return

    # Initialize the DataPreprocessor class that handles the data processing
    processor = DataProcessor(raw_data, config, spark)

    # Preprocess the data
    print("Starting data preprocessing...")
    processor.preprocess()

    # Split the data into train and test sets
    print("Splitting data into train and test sets...")
    train_set, test_set = processor.split_data()
    logger.info("Training set shape: %s", train_set.shape)
    logger.info("Test set shape: %s", test_set.shape)

    # Save the data to Databricks tables
    print("Saving data to Databricks catalog...")
    processor.save_to_catalog(train_set, test_set)

    # Enable change data feed
    print("Enabling Change Data Feed...")
    processor.enable_change_data_feed()

    print("Processing completed successfully!")

    # Stop Spark session
    spark.stop()


config_path = "../project_config.yml"
data_path = "../data/data.csv"
env = "dev"

main(config_path=config_path, data_path=data_path, env=env)