In [None]:
from databricks.connect import DatabricksSession
import pandas as pd

spark = DatabricksSession.builder.profile("marvelmlops").getOrCreate()

df = spark.read.table("samples.nyctaxi.trips")
df.show(5)

In [71]:
# Cell to set up automatic reloading
%load_ext autoreload
%autoreload 2

In [3]:
from packages.config import ProjectConfig
import numpy as np
from packages.paths import AllPaths
from pandas.api.types import CategoricalDtype

from pyspark.sql import SparkSession
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
#from lightgbm import LGBMClassifier
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import mlflow
from mlflow.models import infer_signature

ALL_PATHS = AllPaths()

config = ProjectConfig.from_yaml(config_path=ALL_PATHS.filename_config)

df = spark.read.option("header",True).csv(f'{ALL_PATHS.data_volume}/hotel_reservations.csv').toPandas()

In [4]:
import subprocess

# Function to get the current git SHA
def get_git_sha():
    try:
        git_sha = subprocess.check_output(['git', 'rev-parse', 'HEAD']).decode('ascii').strip()
    except Exception as e:
        git_sha = "unknown"
    return git_sha

In [5]:
mlflow.set_tracking_uri("databricks")
mlflow.set_registry_uri('databricks-uc')

In [None]:
# Databricks notebook sourc

# Extract configuration details
num_features = config.num_features
cat_features = config.cat_features
target = config.target
parameters = config.parameters
catalog_name = config.catalog_name
schema_name = config.schema_name

In [None]:
# Load training and testing sets from Databricks tables
train_set_spark = spark.table(f"{catalog_name}.{schema_name}.train_set")
train_set = spark.table(f"{catalog_name}.{schema_name}.train_set").toPandas()
test_set = spark.table(f"{catalog_name}.{schema_name}.test_set").toPandas()

X_train = train_set[num_features + cat_features]
y_train = train_set[target]

X_test = test_set[num_features + cat_features]
y_test = test_set[target]

In [None]:
# COMMAND ----------
spark = SparkSession.builder.getOrCreate()

In [None]:

# COMMAND ----------
# Define the preprocessor for categorical features
preprocessor = ColumnTransformer(
    transformers=[('cat', OneHotEncoder(handle_unknown='ignore'), cat_features)], 
    remainder='passthrough'
)

# Create the pipeline with preprocessing and the LightGBM regressor
pipeline = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('regressor', LGBMClassifier(**parameters))
])


# COMMAND ----------
mlflow.set_experiment(experiment_names=["/Shared/hotel-reservations-cremerf"])
git_sha = "fb7d5ec632172615cb88ee17ceef29ee57702a73"

# Start an MLflow run to track the training process
with mlflow.start_run(
    tags={"git_sha": f"{git_sha}",
          "branch": "week2"},
) as run:
    run_id = run.info.run_id

    pipeline.fit(X_train, y_train)
    y_pred = pipeline.predict(X_test)

    # Evaluate the model performance
    mse = mean_squared_error(y_test, y_pred)
    mae = mean_absolute_error(y_test, y_pred)
    r2 = r2_score(y_test, y_pred)

    print(f"Mean Squared Error: {mse}")
    print(f"Mean Absolute Error: {mae}")
    print(f"R2 Score: {r2}")

    # Log parameters, metrics, and the model to MLflow
    mlflow.log_param("model_type", "LightGBM with preprocessing")
    mlflow.log_params(parameters)
    mlflow.log_metric("mse", mse)
    mlflow.log_metric("mae", mae)
    mlflow.log_metric("r2_score", r2)
    signature = infer_signature(model_input=X_train, model_output=y_pred)

    dataset = mlflow.data.from_spark(
    train_set_spark, table_name=f"{catalog_name}.{schema_name}.train_set",
    version="0")
    mlflow.log_input(dataset, context="training")
    mlflow.sklearn.log_model(
        sk_model=pipeline,
        artifact_path="lightgbm-pipeline-model",
        signature=signature
    )


# COMMAND ----------
model_version = mlflow.register_model(
    model_uri=f'runs:/{run_id}/lightgbm-pipeline-model',
    name=f"{catalog_name}.{schema_name}.house_prices_model_basic",
    tags={"git_sha": f"{git_sha}"})

# COMMAND ----------
run = mlflow.get_run(run_id)
dataset_info = run.inputs.dataset_inputs[0].dataset
dataset_source = mlflow.data.get_source(dataset_info)
dataset_source.load()

# COMMAND ----------

In [None]:



# Instantiate LGBMClassifier with parameters
model = lgb.LGBMClassifier(**config)



In [6]:
git_sha = get_git_sha()

In [7]:
git_sha

'fb7d5ec632172615cb88ee17ceef29ee57702a73'

In [116]:
num_features = config.num_features
for col in num_features:
    df[col] = pd.to_numeric(df[col], errors='coerce')

In [117]:
cat_features = config.cat_features
for cat_col in cat_features:
    df[cat_col] = df[cat_col].astype('category')

In [None]:
import matplotlib.pyplot as plt

plt.hist(df['avg_price_per_room'], bins=50)
plt.title('Distribution of Average Price per Room')
plt.xlabel('Average Price per Room')
plt.ylabel('Frequency')
plt.show()

In [None]:
# Exclude zeros from calculations
non_zero_prices = df.loc[df['avg_price_per_room'] != 0, 'avg_price_per_room']

mean_price = non_zero_prices.mean()
median_price = non_zero_prices.median()

print(f"Mean Price: {mean_price}")
print(f"Median Price: {median_price}")

for col in cat_features:
    # Ensure the column is of type 'category'
    if not isinstance(df[col].dtype, CategoricalDtype):
        df[col] = df[col].astype('category')
    
    # Add 'Unknown' to categories if not already present
    if 'Unknown' not in df[col].cat.categories:
        df[col] = df[col].cat.add_categories(['Unknown'])
    
    # Fill NaN values with 'Unknown'
    df[col] = df[col].fillna('Unknown')

df[config.target] = df[config.target].map({'Not_Canceled': 0, 'Canceled': 1})

In [None]:
# apply median imputation to null or zero values in avg_price_per_room to avoid right skewness of the data
# check what kind of preprocessing could we perform to categorical data
# remember to put scalling and encoding to the modelling module

In [None]:
class_counts = df['booking_status'].value_counts()
print(class_counts)

In [None]:
class_counts = df['booking_status'].value_counts()
print(class_counts)

class_percentages = df['booking_status'].value_counts(normalize=True) * 100
print(class_percentages)

In [94]:
from sklearn.preprocessing import LabelEncoder

le = LabelEncoder()
df['booking_status_encoded'] = le.fit_transform(df['booking_status'])


In [None]:
config.parameters

In [120]:
import mlflow
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from mlflow.models import infer_signature
from packages.config import ProjectConfig
import json
from mlflow import MlflowClient

In [None]:
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)


ALLPATHS = AllPaths()

config = ProjectConfig.from_yaml(config_path=ALLPATHS.filename_config)
logger.info("Configuration loaded:")
print(yaml.dump(config, default_flow_style=False))

In [121]:
client = MlflowClient()

In [122]:

mlflow.set_registry_uri("databricks-uc")
mlflow.set_tracking_uri("databricks://marvelmlops")

In [None]:
client.tracking_uri

In [None]:
run_id = mlflow.search_runs(
    experiment_names=["/Shared/cremerf-cancellation-prediction"],
    filter_string="tags.branch='week2'",
).run_id[0]

In [None]:
df['booking_status_encoded']

In [None]:
missing_target = df['booking_status_encoded'].isnull().sum()
print(f"Missing values in target variable: {missing_target}")

In [None]:
# Select numeric columns
numeric_cols = df.select_dtypes(include='number').columns

# Count zeros in numeric columns
zero_counts = (df[numeric_cols] == 0).sum()

# Count NaN/null values in numeric columns
nan_counts = df[numeric_cols].isna().sum()

# Create a summary DataFrame
summary_df = pd.DataFrame({
    'Zero Count': zero_counts,
    'NaN/Null Count': nan_counts
})

# Display the summary
print(summary_df)

In [58]:
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split

In [59]:
# Define numeric and categorical features
numeric_features = [
    'no_of_adults', 'no_of_children', 'no_of_weekend_nights', 'no_of_week_nights',
    'lead_time', 'no_of_previous_cancellations', 'no_of_previous_bookings_not_canceled',
    'avg_price_per_room', 'no_of_special_requests'
]
categorical_features = [
    'type_of_meal_plan', 'required_car_parking_space', 'room_type_reserved', 
    'arrival_year', 'arrival_month', 'arrival_date', 'market_segment_type', 
    'repeated_guest'
]

# Preprocessing for numeric data: convert data types and scale
numeric_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),  # Handle missing values
    ('scaler', StandardScaler())  # Scale data
])

# Preprocessing for categorical data: fill missing values and apply one-hot encoding
categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),  # Handle missing values
    ('onehot', OneHotEncoder(handle_unknown='ignore'))  # Convert categorical data
])

# Combine preprocessing steps
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ],
    remainder='drop'  # This will drop other columns not listed explicitly
)

In [60]:
pipeline = Pipeline(steps=[
    ('preprocessor', preprocessor)
])

In [61]:
# Split data into features and target
X = df.drop('booking_status', axis=1)
y = df['booking_status']

# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
spark.read.option("header",True).csv('/Volumes/mlops_students/cremerfederico29/data/hotel_reservations.csv').show()