In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import display

%matplotlib inline
plt.rcParams["figure.figsize"] = (10, 4)

df = pd.read_csv("books_bookvoed.csv")
print(df.shape)
df.head()

In [None]:
df.info()

missing = df.isna().sum().sort_values(ascending=False)
display(missing.to_frame("missing_count"))

display(df.describe(include="all").T)

summary = pd.DataFrame({
    "dtype": df.dtypes.astype(str),
    "non_null": df.notna().sum(),
    "missing": df.isna().sum(),
})
display(summary)

При проверке обнаружилось, что пропусков нет и target статистически совпадает с price, 
значит при использовании price в признаках возникает утечка.
В дальнейшем price исключаем из X при обучении модели на target

In [None]:
for col, topn in [("publisher", 20), ("category", None), ("availability", None)]:
    if col not in df.columns:
        continue

    s = df[col].astype("string")
    vc = s.value_counts()

    if topn is not None:
        vc = vc.head(topn)

    vc = vc.sort_values()

    plt.figure(figsize=(10, 5))
    vc.plot(kind="barh")
    title = f"{col} (top {topn})" if topn else col
    plt.title(title)
    plt.xlabel("count")
    plt.tight_layout()
    plt.show()

for col in ["price", "old_price", "target"]:
    if col not in df.columns:
        continue

    s = pd.to_numeric(df[col], errors="coerce")

    plt.figure(figsize=(10, 4))
    plt.hist(np.log1p(s), bins=60)
    plt.title(f"{col}")
    plt.xlabel(f"{col}")
    plt.ylabel("count")
    plt.tight_layout()
    plt.show()

if "discount_percent" in df.columns:
    disc = df["discount_percent"].astype(int)

    fig, ax = plt.subplots(figsize=(10, 4))
    ax.hist(disc, bins=range(disc.min(), disc.max() + 2))
    ax.set_title("discount_percent")
    ax.set_xlabel("discount_percent")
    ax.set_ylabel("count")
    plt.tight_layout()
    plt.show()

if {"category", "price"}.issubset(df.columns):
    tmp = df[["category", "price"]].copy()
    tmp["price"] = pd.to_numeric(tmp["price"], errors="coerce")

    fig, ax = plt.subplots(figsize=(10, 4))
    tmp.boxplot(column="price", by="category", showfliers=False, rot=25, ax=ax);
    plt.suptitle("")
    ax.set_title("price for category")
    ax.set_ylabel("price")
    plt.tight_layout()
    plt.show()

Распределения категориальных признаков неравномерны, есть доминирующие значения (особенно по category)

In [None]:
bad_old_lt_price = (df["old_price"] < df["price"]).sum()
bad_disc_zero_but_diff = ((df["discount_percent"] == 0) & (df["old_price"] > df["price"])).sum()
bad_disc_pos_but_equal = ((df["discount_percent"] > 0) & (df["old_price"] == df["price"])).sum()

print("old_price < price:", bad_old_lt_price)
print("discount=0, но old_price>price:", bad_disc_zero_but_diff)
print("discount>0, но old_price==price:", bad_disc_pos_but_equal)

q = [0.5, 0.9, 0.95, 0.99, 0.995, 0.999]
print("\nprice quantiles:\n", df["price"].quantile(q))
print("\nold_price quantiles:\n", df["old_price"].quantile(q))

Найдены неконсистентные строки (скидка 0 при old_price > price), требуется исправление

In [None]:
df_clean = df.copy()
print("Всего пропусков в исходном df:", int(df.isna().sum().sum()))
print("Всего пропусков в df_clean:", int(df_clean.isna().sum().sum()))

mask = (df_clean["discount_percent"] == 0) & (df_clean["old_price"] > df_clean["price"]) & (df_clean["old_price"] > 0)

print("Строк для исправления:", int(mask.sum()))

recalc = (1 - df_clean.loc[mask, "price"] / df_clean.loc[mask, "old_price"]) * 100
df_clean.loc[mask, "discount_percent"] = np.ceil(recalc).astype(int)

df_clean["discount_percent"] = df_clean["discount_percent"].clip(0, 90)

check = ((df_clean["discount_percent"] == 0) & (df_clean["old_price"] > df_clean["price"])).sum()
print("После исправления осталось", int(check))

for col in ["price", "old_price", "target"]:
    lo, hi = df_clean[col].quantile([0.001, 0.999])
    df_clean[col] = df_clean[col].clip(lo, hi)

num = df_clean[["price", "old_price", "discount_percent", "target"]].copy()

corr = num.corr(numeric_only=True)
display(corr)

plt.figure(figsize=(6, 4))
plt.imshow(corr, aspect="auto")
plt.xticks(range(len(corr.columns)), corr.columns, rotation=30, ha="right")
plt.yticks(range(len(corr.index)), corr.index)
plt.colorbar()
plt.title("Correlation (numeric features)")
plt.tight_layout()
plt.show()

df_clean[["price", "old_price", "discount_percent", "target"]].describe()


Еще одно подтверждение, что target статистически совпадает с price + данные приведены к консистентному виду

In [None]:
t = df_clean["title"].astype("string").fillna("")

df_clean["title_len"] = t.str.len()
df_clean["title_words"] = t.str.split().str.len()
df_clean["title_has_year"] = t.str.contains(r"\b(?:19|20)\d{2}\b", regex=True, na=False).astype(int)

top_pub = df_clean["publisher"].astype("string").value_counts().head(30).index
df_clean["publisher_top"] = df_clean["publisher"].astype("string").where(df_clean["publisher"].isin(top_pub), "Other")

df_model = pd.get_dummies(
    df_clean,
    columns=["category", "availability", "publisher_top"],
    drop_first=False
)

X = df_model.drop(columns=["target", "title", "publisher"])
y = df_model["target"]

bool_cols = X.select_dtypes(include="bool").columns
X[bool_cols] = X[bool_cols].astype(int)

print("X:", X.shape, "y:", y.shape)
X.head()

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

base_train, base_test = train_test_split(df_clean, test_size=0.2, random_state=42)

y_train = base_train["target"].to_numpy()
y_test = base_test["target"].to_numpy()

pred1 = np.full(y_test.shape, np.median(y_train))
mae1 = mean_absolute_error(y_test, pred1)
rmse1 = np.sqrt(mean_squared_error(y_test, pred1))
r2_1 = r2_score(y_test, pred1)

print(f"Median: MAE={mae1:.3f}, RMSE={rmse1:.3f}, R2={r2_1:.3f}")

cat_median = base_train.groupby("category")["target"].median()
global_median = base_train["target"].median()

pred2 = base_test["category"].map(cat_median).fillna(global_median).to_numpy()
mae2 = mean_absolute_error(y_test, pred2)
rmse2 = np.sqrt(mean_squared_error(y_test, pred2))
r2_2 = r2_score(y_test, pred2)

print(f"By category: MAE={mae2:.3f}, RMSE={rmse2:.3f}, R2={r2_2:.3f}")

Baseline даёт MAE +- равный 515 и R2<0, значит простые правила/if-else работают плохо и модель должна улучшить результат

In [None]:
num_cols = ["old_price", "discount_percent", "title_len", "title_words", "title_has_year"]
cat_cols = ["category", "availability", "publisher_top"]

X = df_clean[num_cols + cat_cols].copy()
y = df_clean["target"].copy()

y_bins = pd.qcut(y, q=10, duplicates="drop")

X_tmp, X_test, y_tmp, y_test, bins_tmp, _ = train_test_split(
    X, y, y_bins,
    test_size=0.2,
    random_state=42,
    stratify=y_bins
)

X_train, X_val, y_train, y_val = train_test_split(
    X_tmp, y_tmp,
    test_size=0.2,
    random_state=42,
    stratify=bins_tmp
)

print("train:", X_train.shape, "val:", X_val.shape, "test:", X_test.shape)

Я использую разбиение таргета на квантили и стратифицирую по этим бинам, чтобы распределение target было сопоставимым во всех частях.
В данных нет временного признака, поэтому time-based split применить нельзя значит используем случайный split с фиксированным random_state

In [None]:
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LinearRegression

for c in num_cols:
    X_train[c] = pd.to_numeric(X_train[c], errors="coerce")
    X_val[c]   = pd.to_numeric(X_val[c], errors="coerce")
    X_test[c]  = pd.to_numeric(X_test[c], errors="coerce")

def metrics(y_true, y_pred):
    mae = mean_absolute_error(y_true, y_pred)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    r2 = r2_score(y_true, y_pred)
    return mae, rmse, r2

prep = ColumnTransformer(
    transformers=[
        ("cat", OneHotEncoder(handle_unknown="ignore"), cat_cols),
        ("num", "passthrough", num_cols),
    ]
)

variants = [True, False]
rows = []
best_fit_intercept = None
best_mae = float("inf")

for fit_intercept in variants:
    pipe = Pipeline([
        ("prep", prep),
        ("lr", LinearRegression(fit_intercept=fit_intercept)),
    ])

    pipe.fit(X_train, y_train)
    pred_val = pipe.predict(X_val)

    mae, rmse, r2 = metrics(y_val, pred_val)
    rows.append({
        "fit_intercept": fit_intercept,
        "MAE_val": mae,
        "RMSE_val": rmse,
        "R2_val": r2
    })

    if mae < best_mae:
        best_mae = mae
        best_fit_intercept = fit_intercept

results = pd.DataFrame(rows).sort_values("MAE_val")
display(results)
print({"fit_intercept": best_fit_intercept})
best_params = {"fit_intercept": best_fit_intercept}

X_train_full = pd.concat([X_train, X_val], axis=0)
y_train_full = pd.concat([y_train, y_val], axis=0)

final_pipe = Pipeline([
    ("prep", prep),
    ("lr", LinearRegression(fit_intercept=best_fit_intercept)),
])

final_pipe.fit(X_train_full, y_train_full)
pred_test = final_pipe.predict(X_test)

mae_t, rmse_t, r2_t = metrics(y_test, pred_test)
print(f"MAE={mae_t:.3f}, RMSE={rmse_t:.3f}, R2={r2_t:.3f}")

Категориальные признаки кодируются через OneHotEncoder (не ломается на новых категориях благодаря handle_unknown="ignore"), числовые передаются как есть. Гиперпараметр подбираю вручную перебором fit_intercept {True, False} по MAE на валидации, затем обучаю финальную модель на объединённых train+val и оцениваю качество на test.

Метрики на test близки к val соответственно явного переобучения не наблюдается.
Качество сильно лучше baseline, значит модель реально учится на признаках

In [None]:
prep = ColumnTransformer(
    transformers=[
        ("cat", OneHotEncoder(handle_unknown="ignore"), cat_cols),
        ("num", "passthrough", num_cols),
    ],
)

lin_pipe = Pipeline([
    ("prep", prep),
    ("lr", LinearRegression(**best_params)),
])

lin_pipe.fit(X_train, y_train)

pred_train = lin_pipe.predict(X_train)
pred_val   = lin_pipe.predict(X_val)
pred_test  = lin_pipe.predict(X_test)

rows = []
for split, yt, yp in [
    ("train", y_train, pred_train),
    ("val",   y_val,   pred_val),
    ("test",  y_test,  pred_test),
]:
    rows.append({
        "split": split,
        "MAE":  mean_absolute_error(yt, yp),
        "RMSE": np.sqrt(mean_squared_error(yt, yp)),
        "R2":   r2_score(yt, yp),
    })

report = pd.DataFrame(rows).set_index("split").round(4)
display(report)

Метрики на train/val/test очень близки, поэтому переобучения нет

In [None]:
from sklearn.linear_model import SGDRegressor
from sklearn.preprocessing import StandardScaler

prep_sgd = ColumnTransformer(
    transformers=[
        ("cat", OneHotEncoder(handle_unknown="ignore"), cat_cols),
        ("num", StandardScaler(with_mean=False), num_cols),
    ],
    remainder="drop",
)

sgd = SGDRegressor(
    loss="squared_error",
    penalty=None,
    max_iter=1,
    warm_start=True,
    tol=None,
    random_state=42,
)

sgd_pipe = Pipeline([("prep", prep_sgd), ("model", sgd)])

epochs = 15
hist = []

best_val = float("inf")
bad_steps = 0

for ep in range(1, epochs + 1):
    sgd_pipe.fit(X_train, y_train)

    tr_pred = sgd_pipe.predict(X_train)
    va_pred = sgd_pipe.predict(X_val)

    tr_mae = mean_absolute_error(y_train, tr_pred)
    va_mae = mean_absolute_error(y_val, va_pred)

    tr_rmse = np.sqrt(mean_squared_error(y_train, tr_pred))
    va_rmse = np.sqrt(mean_squared_error(y_val, va_pred))

    hist.append((ep, tr_mae, va_mae, tr_rmse, va_rmse))

    if va_mae < best_val - 1e-4:
        best_val = va_mae
        bad_steps = 0
    else:
        bad_steps += 1
        if bad_steps >= 5:
            break

hist = pd.DataFrame(hist, columns=["epoch", "mae_train", "mae_val", "rmse_train", "rmse_val"])

plt.figure(figsize=(10, 4))
plt.plot(hist["epoch"], hist["mae_train"], label="train")
plt.plot(hist["epoch"], hist["mae_val"], label="val")
plt.title("MAE по эпохам")
plt.xlabel("epoch")
plt.ylabel("MAE")
plt.legend()
plt.tight_layout()
plt.show()

plt.figure(figsize=(10, 4))
plt.plot(hist["epoch"], hist["rmse_train"], label="train")
plt.plot(hist["epoch"], hist["rmse_val"], label="val")
plt.title("RMSE по эпохам")
plt.xlabel("epoch")
plt.ylabel("RMSE")
plt.legend()
plt.tight_layout()
plt.show()

SGDRegressor - линейная регрессия + градиентный спуск, то есть есть эпохи, в отличии от простой линейной регрессии
в пункте просят работу с эпохами, так что я работаю с данным видом линейной регресии

При max_iter=1 и warm_start=True каждый вызов fit() добавляет одну итерацию оптимизации, поэтому можно наблюдать снижение MAE/RMSE на train и val.
По графикам видно быстрое улучшение на первых шагах, разрыв между train и val небольшой и не растёт, переобучения нет

In [None]:
from scipy import sparse
prep_gd = ColumnTransformer(
    transformers=[
        ("cat", OneHotEncoder(handle_unknown="ignore"), cat_cols),
        ("num", StandardScaler(with_mean=False), num_cols),
    ],
    remainder="drop",
)

Xtr = prep_gd.fit_transform(X_train)
Xva = prep_gd.transform(X_val)
Xte = prep_gd.transform(X_test)

ytr = y_train.to_numpy(dtype=float)
yva = y_val.to_numpy(dtype=float)
yte = y_test.to_numpy(dtype=float)


def calc_metrics(y_true, y_pred):
    mae = mean_absolute_error(y_true, y_pred)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    r2 = r2_score(y_true, y_pred)
    return mae, rmse, r2


class LinearRegressionGD:
    def __init__(self, lr=0.01, epochs=50, l2=0.0, fit_intercept=True, random_state=42):
        self.lr = float(lr)
        self.epochs = int(epochs)
        self.l2 = float(l2)
        self.fit_intercept = bool(fit_intercept)
        self.random_state = int(random_state)

        self.w_ = None
        self.b_ = 0.0

    def fit(self, X, y, X_val=None, y_val=None, track=False):
        n, d = X.shape

        rng = np.random.RandomState(self.random_state)
        self.w_ = rng.normal(scale=0.01, size=d)
        self.b_ = 0.0

        history = {"epoch": [], "mae_train": [], "mae_val": []} if track else None

        for ep in range(1, self.epochs + 1):
            pred = X.dot(self.w_)
            if self.fit_intercept:
                pred = pred + self.b_

            err = pred - y

            grad_w = (X.T.dot(err)) / n
            if self.l2:
                grad_w = grad_w + self.l2 * self.w_

            self.w_ -= self.lr * grad_w

            if self.fit_intercept:
                self.b_ -= self.lr * err.mean()

            if track and X_val is not None and y_val is not None:
                tr_pred = self.predict(X)
                va_pred = self.predict(X_val)
                history["epoch"].append(ep)
                history["mae_train"].append(mean_absolute_error(y, tr_pred))
                history["mae_val"].append(mean_absolute_error(y_val, va_pred))

        return history

    def predict(self, X):
        pred = X.dot(self.w_)
        if self.fit_intercept:
            pred = pred + self.b_
        return pred

grid = []
for lr in [0.005, 0.01, 0.03]:
    for l2 in [0.0, 1e-4, 1e-3]:
        for fi in [True, False]:
            grid.append({"lr": lr, "epochs": 60, "l2": l2, "fit_intercept": fi})

rows = []
best_params = None
best_mae = float("inf")

for params in grid:
    m = LinearRegressionGD(**params)
    m.fit(Xtr, ytr)

    val_pred = m.predict(Xva)
    if not np.isfinite(val_pred).all():
        continue

    mae, rmse, r2 = calc_metrics(yva, val_pred)
    rows.append({**params, "MAE_val": mae, "RMSE_val": rmse, "R2_val": r2})

    if mae < best_mae:
        best_mae = mae
        best_params = params

results = pd.DataFrame(rows).sort_values("MAE_val").reset_index(drop=True)
display(results.head(10))
print(best_params)

curve_model = LinearRegressionGD(**best_params)
hist = curve_model.fit(Xtr, ytr, X_val=Xva, y_val=yva, track=True)

plt.figure(figsize=(10, 4))
plt.plot(hist["epoch"], hist["mae_train"], label="train")
plt.plot(hist["epoch"], hist["mae_val"], label="val")
plt.title("MAE по эпохам")
plt.xlabel("epoch")
plt.ylabel("MAE")
plt.legend()
plt.tight_layout()
plt.show()

Xfull = sparse.vstack([Xtr, Xva], format="csr")
yfull = np.concatenate([ytr, yva])

final_model = LinearRegressionGD(**best_params)
final_model.fit(Xfull, yfull)

test_pred = final_model.predict(Xte)
mae_te, rmse_te, r2_te = calc_metrics(yte, test_pred)
print(f"MAE={mae_te:.3f}, RMSE={rmse_te:.3f}, R2={r2_te:.3f}")

График MAE по эпохам показывает стабильное уменьшение ошибки и близость train/val, то есть переобучения нет
Финальная модель обучена на объединении train+val и оценена на test

In [None]:
from sklearn.base import clone

def reg_metrics(y_true, y_pred):
    y_true = np.asarray(y_true, dtype=float).ravel()
    y_pred = np.asarray(y_pred, dtype=float).ravel()

    mae = mean_absolute_error(y_true, y_pred)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    r2 = r2_score(y_true, y_pred)
    return mae, rmse, r2

prep_tpl = ColumnTransformer(
    transformers=[
        ("cat", OneHotEncoder(handle_unknown="ignore"), cat_cols),
        ("num", StandardScaler(with_mean=False), num_cols),
    ],
    remainder="drop",
)

prep_lr = clone(prep_tpl)
sk = Pipeline([
    ("prep", prep_lr),
    ("model", LinearRegression(fit_intercept=True)),
])
sk.fit(X_train, y_train)

sk_val = reg_metrics(y_val, sk.predict(X_val))
sk_test = reg_metrics(y_test, sk.predict(X_test))

prep_gd = clone(prep_tpl)
Xtr = prep_gd.fit_transform(X_train)
Xva = prep_gd.transform(X_val)
Xte = prep_gd.transform(X_test)

ytr = np.asarray(y_train, dtype=float).ravel()
yva = np.asarray(y_val, dtype=float).ravel()
yte = np.asarray(y_test, dtype=float).ravel()

gd = LinearRegressionGD(lr=0.003, epochs=4000, l2=0.0, fit_intercept=True, random_state=42)
gd.fit(Xtr, ytr)

gd_val = reg_metrics(yva, gd.predict(Xva))
gd_test = reg_metrics(yte, gd.predict(Xte))

cmp = pd.DataFrame([
    {"model": "sklearn LinearRegression", "split": "val",  "MAE": sk_val[0],  "RMSE": sk_val[1],  "R2": sk_val[2]},
    {"model": "sklearn LinearRegression", "split": "test", "MAE": sk_test[0], "RMSE": sk_test[1], "R2": sk_test[2]},
    {"model": "my LinearRegressionGD",    "split": "val",  "MAE": gd_val[0],  "RMSE": gd_val[1],  "R2": gd_val[2]},
    {"model": "my LinearRegressionGD",    "split": "test", "MAE": gd_test[0], "RMSE": gd_test[1], "R2": gd_test[2]},
]).round(4)

display(cmp)

Разница метрик ожидаема, так как sklearn решает задачу в закрытой форме, а GD итерационно и чувствителен к lr/epochs

In [None]:
trials = [
    (0.004, 1500),
    (0.003, 2000),
    (0.003, 4000),
    (0.002, 6000),
    (0.0015, 8000),
]

rows = []
for lr, epochs in trials:
    m = LinearRegressionGD(lr=lr, epochs=epochs, l2=0.0, fit_intercept=True, random_state=42)
    m.fit(Xtr, ytr)

    pred = m.predict(Xva)
    if not np.isfinite(pred).all():
        rows.append({"lr": lr, "epochs": epochs, "MAE_val": np.nan, "RMSE_val": np.nan, "R2_val": np.nan})
        continue

    mae, rmse, r2 = reg_metrics(yva, pred)
    rows.append({"lr": lr, "epochs": epochs, "MAE_val": mae, "RMSE_val": rmse, "R2_val": r2})

probe = pd.DataFrame(rows).round(4).sort_values("MAE_val")
display(probe)

print("sklearn val:",
      {"MAE": round(float(sk_val[0]), 4),
       "RMSE": round(float(sk_val[1]), 4),
       "R2": round(float(sk_val[2]), 4)})

Видно, что при подборе lr/epochs качество приближается к sklearn, а не “улетает” в inf/NaN