<h1 align=center style="line-height:200%;color:#0099cc">
<font color="#0099cc">
How Much Per Night?
</font>
</h1>


In [None]:
import os
import ast
import warnings
from typing import Dict, List
import subprocess, sys

import numpy as np
import pandas as pd
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split
import zipfile

warnings.filterwarnings("ignore")

RANDOM_STATE = 42
MAX_THREADS = os.cpu_count() or 8
FAST_MODE = True

# Resolve directories robustly
try:
    THIS_DIR = os.path.dirname(os.path.abspath(__file__))  # type: ignore[name-defined]
except NameError:
    THIS_DIR = os.getcwd()
DATA_DIR = os.path.join(os.path.dirname(THIS_DIR), "data")
NOTEBOOK_NAME = "Jajiga.ipynb"
SUBMISSION_NAME = "submission.csv"
RESULT_ZIP = "result.zip"

try:
    from catboost import CatBoostRegressor, Pool  # type: ignore
except Exception:
    try:
        subprocess.check_call([sys.executable, "-m", "uv", "pip", "install", "-q", "catboost"])  # uses uv per preference
    except Exception:
        subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "catboost"])
    from catboost import CatBoostRegressor, Pool  # type: ignore

train_path = os.path.join(DATA_DIR, "train.csv")
test_path = os.path.join(DATA_DIR, "test.csv")


def parse_tags(value: object) -> str:
    if isinstance(value, list):
        items = value
    elif isinstance(value, str):
        try:
            items = ast.literal_eval(value)
        except Exception:
            items = [value]
    else:
        items = []
    tags: List[str] = []
    for item in items:
        if isinstance(item, dict):
            name = item.get("name")
            if name is not None:
                tags.append(str(name))
        else:
            tags.append(str(item))
    tags = [t.strip() for t in tags if t and t != "None"]
    return " ".join(sorted(set(tags)))


def parse_sleep_arrange(value: object) -> Dict[str, int]:
    counts: Dict[str, int] = {"sleep_single": 0, "sleep_double": 0, "sleep_mattress": 0}
    items: List[object]
    if isinstance(value, str):
        try:
            items = ast.literal_eval(value)
        except Exception:
            items = []
    elif isinstance(value, list):
        items = value
    else:
        items = []
    for entry in items:
        if isinstance(entry, dict):
            counts["sleep_single"] += int(entry.get("single") or 0)
            counts["sleep_double"] += int(entry.get("double") or 0)
            counts["sleep_mattress"] += int(entry.get("mattress") or 0)
    counts["sleep_total"] = counts["sleep_single"] + counts["sleep_double"] + counts["sleep_mattress"]
    return counts


def prepare_df(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()

    # Convert tag-like fields to space-separated tokens
    for col, new_col in [
        ("types", "types_text"),
        ("regions", "regions_text"),
        ("features", "features_text"),
        ("rules", "rules_text"),
    ]:
        if col in out.columns:
            out[new_col] = out[col].apply(parse_tags)
        else:
            out[new_col] = ""

    # Sleep arrangement derived counts
    if "sleep_arrange" in out.columns:
        sleep_counts = out["sleep_arrange"].apply(parse_sleep_arrange).apply(pd.Series)
        out = pd.concat([out, sleep_counts], axis=1)

    # Coerce numeric-likes
    numeric_like = [
        "floor_area",
        "land_area",
        "floors_count",
        "bedrooms",
        "guest_number",
        "max_guest_number",
        "stays_min",
        "stays_max",
        "entrance_time_min",
        "entrance_time_max",
        "leaving_time",
        "ratings.count",
        "ratings.total",
        "ratings.cleanliness",
        "ratings.location",
        "ratings.checkin",
        "ratings.value",
        "units_count",
        "success_books",
        "geo.lat",
        "geo.lng",
        "sleep_single",
        "sleep_double",
        "sleep_mattress",
        "sleep_total",
    ]
    for c in numeric_like:
        if c in out.columns:
            out[c] = pd.to_numeric(out[c], errors="coerce")

    # Derived features
    if set(["floor_area", "bedrooms"]).issubset(out.columns):
        out["area_per_bedroom"] = out["floor_area"] / out["bedrooms"].replace(0, np.nan)
    if set(["guest_number", "bedrooms"]).issubset(out.columns):
        out["guests_per_bedroom"] = out["guest_number"] / out["bedrooms"].replace(0, np.nan)
    if set(["max_guest_number", "guest_number"]).issubset(out.columns):
        out["extra_guest_capacity"] = out["max_guest_number"] - out["guest_number"]
    if set(["ratings.total", "ratings.count"]).issubset(out.columns):
        out["avg_rating"] = out["ratings.total"] / out["ratings.count"].replace(0, np.nan)

    # Text lengths and word counts
    for tcol in ["title", "description", "sleep_description"]:
        if tcol in out.columns:
            s = out[tcol].fillna("").astype(str)
            out[f"len_{tcol}"] = s.str.len()
            out[f"wc_{tcol}"] = s.str.split().map(len)

    # Ensure text/string columns are strings
    for c in [
        "title",
        "description",
        "sleep_description",
        "types_text",
        "regions_text",
        "features_text",
        "rules_text",
    ]:
        if c in out.columns:
            out[c] = out[c].fillna("").astype(str)
        else:
            out[c] = ""

    return out


# Load
train_df = pd.read_csv(train_path)
test_df = pd.read_csv(test_path)

train_feat = prepare_df(train_df)
test_feat = prepare_df(test_df)

TARGET = "min_price"
y = train_feat[TARGET].to_numpy()
X = train_feat.drop(columns=[TARGET])

# Column groups
base_numeric = [
    "floor_area","land_area","floors_count","bedrooms","guest_number","max_guest_number",
    "stays_min","stays_max","entrance_time_min","entrance_time_max","leaving_time",
    "ratings.count","ratings.total","ratings.cleanliness","ratings.location","ratings.checkin","ratings.value",
    "units_count","success_books","geo.lat","geo.lng","sleep_single","sleep_double","sleep_mattress","sleep_total",
    "area_per_bedroom","guests_per_bedroom","extra_guest_capacity","avg_rating",
    "len_title","wc_title","len_description","wc_description","len_sleep_description","wc_sleep_description",
]
numeric_cols = [c for c in base_numeric if c in X.columns]

categorical_cols = [
    c for c in [
        "status","allocation","province.id","province.name","city.id","city.name",
        "province.url","city.url","is_clean","is_new","is_instant","is_plus",
    ] if c in X.columns
]

text_cols_all = [
    c for c in [
        "title","description","sleep_description","types_text","regions_text","features_text","rules_text",
    ] if c in X.columns
]
text_cols = ([] if FAST_MODE else text_cols_all)

selected_cols = [c for c in (numeric_cols + categorical_cols + text_cols) if c in X.columns]

# Prepare data types for CatBoost
X_cb = X[selected_cols].copy()
for c in categorical_cols + text_cols:
    if c in X_cb.columns:
        X_cb[c] = X_cb[c].astype(str).fillna("")
for c in numeric_cols:
    if c in X_cb.columns:
        X_cb[c] = pd.to_numeric(X_cb[c], errors="coerce")

X_tr, X_va, y_tr, y_va = train_test_split(X_cb, y, test_size=0.2, random_state=RANDOM_STATE, shuffle=True)

cat_idx = [X_cb.columns.get_loc(c) for c in categorical_cols if c in X_cb.columns]
text_idx = [X_cb.columns.get_loc(c) for c in text_cols if c in X_cb.columns]

# Train with log-target for stability
y_tr_log = np.log1p(y_tr)
y_va_log = np.log1p(y_va)

from catboost import CatBoostRegressor, Pool  # type: ignore  # after potential install
train_pool = Pool(X_tr, y_tr_log, cat_features=cat_idx, text_features=text_idx)
valid_pool = Pool(X_va, y_va_log, cat_features=cat_idx, text_features=text_idx)

model = CatBoostRegressor(
    iterations=(20000 if FAST_MODE else 50000),
    learning_rate=(0.04 if FAST_MODE else 0.035),
    depth=(30 if FAST_MODE else 50),
    l2_leaf_reg=(8.0 if FAST_MODE else 10.0),
    loss_function="RMSE",
    eval_metric="R2",
    subsample=0.85,
    rsm=(0.7 if FAST_MODE else 0.6),
    random_seed=RANDOM_STATE,
    allow_writing_files=False,
    od_type="Iter",
    od_wait=(700 if FAST_MODE else 900),
    verbose=200,
    thread_count=MAX_THREADS,
)

model.fit(train_pool, eval_set=valid_pool, use_best_model=True)

pred_va = np.expm1(model.predict(valid_pool))
r2 = r2_score(y_va, pred_va)
print(f"Validation R2: {r2:.4f}  -> approx score {round(r2,3)*250:.1f}")

best_it = getattr(model, "best_iteration_", None)
if best_it is None:
    best_it = model.tree_count_

# Fit final on full data with best iterations
X_full = X_cb.copy()
for c in categorical_cols + text_cols:
    if c in X_full.columns:
        X_full[c] = X_full[c].astype(str).fillna("")
for c in numeric_cols:
    if c in X_full.columns:
        X_full[c] = pd.to_numeric(X_full[c], errors="coerce")

full_pool = Pool(X_full, np.log1p(y), cat_features=cat_idx, text_features=text_idx)
final_model = CatBoostRegressor(
    iterations=int(best_it),
    learning_rate=(0.06 if FAST_MODE else 0.035),
    depth=(6 if FAST_MODE else 8),
    l2_leaf_reg=(4.0 if FAST_MODE else 6.0),
    loss_function="RMSE",
    subsample=0.85,
    rsm=(0.95 if FAST_MODE else 0.9),
    random_seed=RANDOM_STATE,
    allow_writing_files=False,
    verbose=False,
    thread_count=MAX_THREADS,
)
final_model.fit(full_pool)

# Predict test
X_test = test_feat[selected_cols].copy()
for c in categorical_cols + text_cols:
    if c in X_test.columns:
        X_test[c] = X_test[c].astype(str).fillna("")
for c in numeric_cols:
    if c in X_test.columns:
        X_test[c] = pd.to_numeric(X_test[c], errors="coerce")

test_pool = Pool(X_test, cat_features=cat_idx, text_features=text_idx)
pred_test = np.expm1(final_model.predict(test_pool))
pred_test = np.maximum(pred_test, 0)

# Save outputs
submission_path = os.path.join(THIS_DIR, SUBMISSION_NAME)
submission = pd.DataFrame({"min_price": pred_test})
submission.to_csv(submission_path, index=False)
print(f"Saved submission to {submission_path}")

# Zip
result_zip_path = os.path.join(THIS_DIR, RESULT_ZIP)
with zipfile.ZipFile(result_zip_path, mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
    zf.write(submission_path, SUBMISSION_NAME)
    nb_path = os.path.join(THIS_DIR, NOTEBOOK_NAME)
    if os.path.exists(nb_path):
        zf.write(nb_path, NOTEBOOK_NAME)
print(f"Wrote {result_zip_path}")
