task 2.1

In [85]:
import numpy as np

import plotly.graph_objects as go
import plotly.subplots as sp
import plotly.express as px

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split

In [None]:
def train_val_split(
      X: np.ndarray,
      y: np.ndarray,
      train_fraction: float = 0.1,
      train_size: int = None,
      random_state: int = 42,
    ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    
    # Валидация
    if not isinstance(X, np.ndarray) or not isinstance(y, np.ndarray):
        raise TypeError("X и y должны быть numpy.ndarray")
    if X.ndim != 2 or X.shape[1] != 1:
        raise ValueError("X должен иметь форму (n_samples, 1)")
    if y.ndim != 1 or y.shape[0] != X.shape[0]:
        raise ValueError("y должен иметь форму (n_samples,) и соответствовать X")
    if not np.issubdtype(X.dtype, np.number) or not np.issubdtype(y.dtype, np.number):
        raise TypeError("X и y должны быть числовыми")

    n = X.shape[0]
    if train_size is not None:
        if not isinstance(train_size, (int, np.integer)):
            raise TypeError("train_size должен быть целым")
        if not (1 <= train_size < n):
            raise ValueError("train_size должен быть в [1, n-1]")
        n_train = int(train_size)
    else:
        if not (isinstance(train_fraction, float) or isinstance(train_fraction, np.floating)):
            raise TypeError("train_fraction должен быть float")
        if not (0 < train_fraction <= 1):
            raise ValueError("train_fraction должен быть в (0,1]")
        n_train = max(1, min(n - 1, int(round(n * float(train_fraction)))))

    rng = np.random.default_rng(random_state)
    perm = rng.permutation(n)
    train_idx = perm[:n_train]
    val_idx = perm[n_train:]

    X_train, y_train = X[train_idx], y[train_idx]
    X_val, y_val = X[val_idx], y[val_idx]

    mask = (X_train[:, 0] >= 0.0) & (X_train[:, 0] <= 1.0)
    X_train, y_train = X_train[mask], y_train[mask]

    if X_train.shape[0] == 0:
        raise ValueError("После фильтрации train опустел. Увеличьте train_fraction/size или измените random_state.")

    y_train = y_train + rng.normal(0.0, 0.1, size=y_train.shape)

    return X_train, y_train, X_val, y_val



In [87]:
rng = np.random.default_rng(0)
X = rng.uniform(low=-1.0, high=2.0, size=(1000, 1))  # (1000,1) равномерно в [-1, 2)
x1d = X[:, 0]
y = 3*x1d - 8*(x1d**2) + 6*(x1d**3)                

X_train, y_train, X_val, y_val = train_val_split(
    X, y, train_fraction=0.1, train_size=None, random_state=42
)

fig = go.Figure()

fig.add_trace(go.Scatter(
    x=X_val[:, 0], y=y_val,
    mode="markers",
    name="Валидация (без шума)",
    marker=dict(size=6, opacity=0.7)
))

fig.add_trace(go.Scatter(
    x=X_train[:, 0], y=y_train,
    mode="markers",
    name="Тренировка (с шумом σ=0.1)",
    marker=dict(size=7, opacity=0.9)
))

# разумные диапазоны осей
x_min = float(np.min(X[:, 0])) - 0.1
x_max = float(np.max(X[:, 0])) + 0.1
y_all = np.concatenate([y_val, y_train])
y_min = float(np.min(y_all)) - 0.1
y_max = float(np.max(y_all)) + 0.1

fig.update_layout(
    title="y = 3x − 8x^2 + 6x^3\n"
          "Тренировка x, y с шумом",
    xaxis_title="x",
    yaxis_title="y",
    xaxis=dict(range=[x_min, x_max]),
    yaxis=dict(range=[y_min, y_max]),
    legend=dict(bgcolor="rgba(255,255,255,0.8)")
)

fig.show()

task 2.2

In [88]:
class numpyPolyLinearRegression:
    def __init__(self, d):
        self.coef_ = None       # shape (d,) — коэффициенты при [x, x^2, ..., x^d]
        self.intercept_ = None  # w0
        self.d = d

    @staticmethod
    def _validate_Xy(X, y):
        if not isinstance(X, np.ndarray) or not isinstance(y, np.ndarray):
            raise TypeError("X и y должны быть numpy.ndarray")
        if X.ndim != 2 or X.shape[1] != 1:
            raise ValueError("X должен иметь форму (n_samples, 1)")
        if y.ndim != 1 or y.shape[0] != X.shape[0]:
            raise ValueError("y должен иметь форму (n_samples,) и соответствовать X")
        if not np.issubdtype(X.dtype, np.number) or not np.issubdtype(y.dtype, np.number):
            raise TypeError("X и y должны быть числовыми")

    def _design(self, X):
        # Вандермонд без переворота степеней: столбцы [1, x, x^2, ..., x^d]
        x = X[:, 0]
        # np.vander по умолчанию даёт убывающие степени, используем increasing=True
        Phi = np.vander(x, N=self.d+1, increasing=True)  # shape (n, d+1)
        return Phi

    def fit(self, X, y):
        """
        X: np.ndarray (n_samples, 1)
        y: np.ndarray (n_samples,)
        """
        self._validate_Xy(X, y)
        if not isinstance(self.d, (int, np.integer)) or self.d < 0:
            raise ValueError("d должен быть целым неотрицательным")

        Phi = self._design(X)  # (n, d+1)
        # Решим МНК через lstsq (устойчивее, чем обратная/нормальные уравнения напрямую)
        theta, *_ = np.linalg.lstsq(Phi, y, rcond=None)  # (d+1,)
        self.intercept_ = float(theta[0])
        self.coef_ = theta[1:].astype(float) if self.d >= 1 else np.array([], dtype=float)
        return self

    def predict(self, X):
        """
        X: np.ndarray (n_samples, 1)
        """
        if self.intercept_ is None:
            raise RuntimeError("Сначала вызовите fit")
        # тот же дизайн
        Phi = self._design(X)  # (n, d+1)
        theta = np.concatenate(([self.intercept_], self.coef_))
        return Phi @ theta

    def score(self, X, y):
        """
        Коэффициент детерминации R^2
        """
        y_pred = self.predict(X)
        y_mean = np.mean(y)
        ss_res = np.sum((y - y_pred)**2)
        ss_tot = np.sum((y - y_mean)**2)
        if ss_tot == 0:
            # соглашение, близкое к sklearn: 1.0 при идеальном совпадении, иначе 0.0
            return 1.0 if np.allclose(y, y_pred) else 0.0
        return 1.0 - ss_res/ss_tot

In [89]:
results = []
models_np = {}
d_grid = list(range(0, 8))

# для плавных линий предсказания на графиках
xx = np.linspace(np.min(X[:,0]) - 0.1, np.max(X[:,0]) + 0.1, 400).reshape(-1, 1)

for d in d_grid:
    m = numpyPolyLinearRegression(d=d).fit(X_train, y_train)
    models_np[d] = m
    r2_tr = m.score(X_train, y_train)
    r2_vl = m.score(X_val, y_val)
    results.append((d, r2_tr, r2_vl))

results

[(0, np.float64(0.0), np.float64(-0.01878931284339158)),
 (1, np.float64(0.3171690201024311), np.float64(0.07871995924665454)),
 (2, np.float64(0.5327787340151627), np.float64(0.11137702361290658)),
 (3, np.float64(0.832424106741302), np.float64(0.9625403623393061)),
 (4, np.float64(0.8353245390601574), np.float64(0.6698346880692705)),
 (5, np.float64(0.842072284841491), np.float64(-22.455027050510257)),
 (6, np.float64(0.8421008301593637), np.float64(-24.422857620059766)),
 (7, np.float64(0.84212781242862), np.float64(-177.38783553354995))]

In [90]:
def sklearn_poly_model(d):
    return Pipeline([
        ("poly", PolynomialFeatures(degree=d, include_bias=True)),  # bias внутри
        ("lin", LinearRegression(fit_intercept=False))              # bias уже в poly
    ])

models_skl = {}
for d in d_grid:
    pipe = sklearn_poly_model(d)
    pipe.fit(X_train, y_train)
    models_skl[d] = pipe
    
d_star = 3
theta_np = np.concatenate(([models_np[d_star].intercept_], models_np[d_star].coef_))
theta_skl = models_skl[d_star].named_steps["lin"].coef_  # порядок такой же: [1, x, x^2, x^3]

print("Оптимальные параметры (d=3) — numpyPolyLinearRegression:")
print("w0, w1, w2, w3 =", theta_np)

print("\nОптимальные параметры (d=3) — sklearn Pipeline:")
print("w0, w1, w2, w3 =", theta_skl)

Оптимальные параметры (d=3) — numpyPolyLinearRegression:
w0, w1, w2, w3 = [-8.77623318e-03  3.45189780e+00 -9.63994878e+00  7.24481230e+00]

Оптимальные параметры (d=3) — sklearn Pipeline:
w0, w1, w2, w3 = [-8.77623318e-03  3.45189780e+00 -9.63994878e+00  7.24481230e+00]


In [91]:
for d in d_grid:
    fig = go.Figure()
    # точки валидации (без шума)
    fig.add_trace(go.Scatter(
        x=X_val[:,0], y=y_val, mode="markers", name="Валидация (y без шума)",
        marker=dict(size=6, opacity=0.7)
    ))
    # точки тренировки (с шумом)
    fig.add_trace(go.Scatter(
        x=X_train[:,0], y=y_train, mode="markers", name="Тренировка (y с шумом)",
        marker=dict(size=7, opacity=0.9)
    ))
    # предсказание линии
    y_hat_line = models_np[d].predict(xx)
    fig.add_trace(go.Scatter(
        x=xx[:,0], y=y_hat_line, mode="lines", name=f"Предсказание numpy-модели (d={d})"
    ))
    # оси — разумные диапазоны
    y_all = np.concatenate([y_train, y_val, y_hat_line])
    fig.update_layout(
        title=f"Полиномиальная регрессия (numpy) при d={d}",
        xaxis_title="x", yaxis_title="y",
        xaxis=dict(range=[float(np.min(xx)), float(np.max(xx))]),
        yaxis=dict(range=[float(np.min(y_all))-0.1, float(np.max(y_all))+0.1]),
        legend=dict(bgcolor="rgba(255,255,255,0.85)")
    )
    fig.show()

In [None]:
results = np.array(results, dtype=float)  # shape (len(d_grid), 3)
d_arr = results[:,0]
r2_tr_arr = results[:,1]
r2_vl_arr = results[:,2]

fig = go.Figure()
fig.add_trace(go.Scatter(x=d_arr, y=r2_tr_arr, mode="lines+markers", name="Train R²"))
fig.add_trace(go.Scatter(x=d_arr, y=r2_vl_arr, mode="lines+markers", name="Val R²"))

fig.add_shape(type="rect", x0=-0.5, x1=1.5, y0=min(r2_tr_arr.min(), r2_vl_arr.min())-0.05,
              y1=max(r2_tr_arr.max(), r2_vl_arr.max())+0.05, opacity=0.15, fillcolor="lightblue",
              line=dict(width=0))
fig.add_annotation(x=1, y=np.nanmax(r2_vl_arr), text="Underfitting-зона (низкая сложность)",
                   showarrow=False, yshift=20)

fig.add_shape(type="rect", x0=5.5, x1=7.5, y0=min(r2_tr_arr.min(), r2_vl_arr.min())-0.05,
              y1=max(r2_tr_arr.max(), r2_vl_arr.max())+0.05, opacity=0.15, fillcolor="salmon",
              line=dict(width=0))
fig.add_annotation(x=6.5, y=np.nanmin(r2_vl_arr), text="Overfitting-зона (слишком высокая сложность)",
                   showarrow=False, yshift=-20)

fig.update_layout(
    title="Коэффициент детерминации R^2 в зависимости от d",
    xaxis_title="Степень полинома d",
    yaxis_title="R^2",
    xaxis=dict(dtick=1),
    legend=dict(bgcolor="rgba(255,255,255,0.85)")
)
fig.show()

task 2.3

In [93]:
d_fix = 7

# логарифмическая сетка из >= 100 точек (возьмём 150)
alphas = np.logspace(-6, 3, 150)  
r2_val_list = []
r2_tr_list = []

for a in alphas:
    model = NumpyPolyRidge(degree=d_fix, alpha=float(a)).fit(X_train, y_train)
    r2_tr = model.score(X_train, y_train)
    r2_vl = model.score(X_val, y_val)
    r2_tr_list.append(r2_tr)
    r2_val_list.append(r2_vl)

r2_tr_arr = np.array(r2_tr_list)
r2_val_arr = np.array(r2_val_list)
alpha_opt = float(alphas[np.argmax(r2_val_arr)])

print(f"Оптимальный alpha при d={d_fix}: {alpha_opt:.6g}, валид. R^2={r2_val_arr.max():.6f}")

Оптимальный alpha при d=7: 4.61774e-06, валид. R^2=0.981070


In [None]:
fig = go.Figure()
fig.add_trace(go.Scatter(x=alphas, y=r2_val_arr, mode="lines+markers", name="Val R^2"))
fig.add_trace(go.Scatter(x=alphas, y=r2_tr_arr, mode="lines+markers", name="Train R^2", opacity=0.5))

fig.add_vline(x=alpha_opt, line_dash="dot")
fig.add_annotation(x=alpha_opt, y=r2_val_arr.max(),
                   text=f"α* ≈ {alpha_opt:.3g}", showarrow=True, yshift=20)

fig.update_layout(
    title=f"Зависимость R^2 от α при d={d_fix}",
    xaxis_title="α (логарифмическая шкала)",
    yaxis_title="R^2",
    xaxis_type="log",
    legend=dict(bgcolor="rgba(255,255,255,0.85)")
)
fig.show()


In [95]:
d_grid = list(range(0, 8))
models = {}
results = []

xx = np.linspace(np.min(X[:,0]) - 0.1, np.max(X[:,0]) + 0.1, 400).reshape(-1, 1)

for d in d_grid:
    m = NumpyPolyRidge(degree=d, alpha=alpha_opt).fit(X_train, y_train)
    models[d] = m
    r2_tr = m.score(X_train, y_train)
    r2_vl = m.score(X_val, y_val)
    results.append((d, r2_tr, r2_vl))

    # график по d: точки + предсказание
    y_line = m.predict(xx)
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=X_val[:,0], y=y_val, mode="markers",
                             name="Валидация (y без шума)", marker=dict(size=6, opacity=0.7)))
    fig.add_trace(go.Scatter(x=X_train[:,0], y=y_train, mode="markers",
                             name="Тренировка (y с шумом)", marker=dict(size=7, opacity=0.9)))
    fig.add_trace(go.Scatter(x=xx[:,0], y=y_line, mode="lines",
                             name=f"Предсказание Ridge (d={d}, α={alpha_opt:.3g})"))

    y_all = np.concatenate([y_train, y_val, y_line])
    fig.update_layout(
        title=f"Полиномиальная Ridge-регрессия при d={d}, α={alpha_opt:.3g}",
        xaxis_title="x", yaxis_title="y",
        xaxis=dict(range=[float(np.min(xx)), float(np.max(xx))]),
        yaxis=dict(range=[float(np.min(y_all))-0.1, float(np.max(y_all))+0.1]),
        legend=dict(bgcolor="rgba(255,255,255,0.85)")
    )
    fig.show()

In [96]:
results = np.array(results, dtype=float)
d_arr = results[:,0]
r2_tr_arr = results[:,1]
r2_vl_arr = results[:,2]

fig = go.Figure()
fig.add_trace(go.Scatter(x=d_arr, y=r2_tr_arr, mode="lines+markers", name="Train R²"))
fig.add_trace(go.Scatter(x=d_arr, y=r2_vl_arr, mode="lines+markers", name="Val R²"))

ymin = min(r2_tr_arr.min(), r2_vl_arr.min()) - 0.05
ymax = max(r2_tr_arr.max(), r2_vl_arr.max()) + 0.05
fig.add_shape(type="rect", x0=-0.5, x1=1.5, y0=ymin, y1=ymax,
              opacity=0.15, fillcolor="lightblue", line=dict(width=0))
fig.add_annotation(x=1, y=ymax, text="Underfitting (малая сложность)", showarrow=False, yshift=-20)

fig.add_shape(type="rect", x0=5.5, x1=7.5, y0=ymin, y1=ymax,
              opacity=0.15, fillcolor="salmon", line=dict(width=0))
fig.add_annotation(x=6.5, y=ymin, text="Overfitting (слишком высокая сложность)", showarrow=False, yshift=20)

fig.update_layout(
    title=f"R² на train/val в зависимости от степени d (ridge, α={alpha_opt:.3g})",
    xaxis_title="Степень полинома d",
    yaxis_title="R²",
    xaxis=dict(dtick=1),
    legend=dict(bgcolor="rgba(255,255,255,0.85)")
)
fig.show()

task 2.4

In [97]:
# загрузка датасета
# !wget --no-check-certificate 'https://docs.google.com/uc?export=download&id=1q49VyL3htV04F64izkn-eywysCqtzlAa' -O dataset.npz

In [98]:
dataset = np.load('dataset.npz', allow_pickle=True)
Xtrain, ytrain, Xtest, ytest = dataset['Xtrain'], dataset['ytrain'], dataset['Xtest'], dataset['ytest']

print("Xtrain:", Xtrain.shape)
print("ytrain:", ytrain.shape)
print("Xtest :", Xtest.shape)
print("ytest :", ytest.shape)

Xtrain: (10000, 100)
ytrain: (10000,)
Xtest : (10000, 100)
ytest : (10000,)


In [99]:
print("NaN в Xtrain:", np.isnan(Xtrain).sum())
print("NaN в ytrain:", np.isnan(ytrain).sum())

# стандартизация
Xmean = Xtrain.mean(axis=0)
Xstd  = Xtrain.std(axis=0)
Xstd[Xstd == 0] = 1 

Xtrain_norm = (Xtrain - Xmean) / Xstd
Xtest_norm  = (Xtest  - Xmean) / Xstd

n = Xtrain_norm.shape[0]
val_fraction = 0.2
idx = np.arange(n)
rng = np.random.default_rng(42)
rng.shuffle(idx)
split = int(n * (1 - val_fraction))
train_idx, val_idx = idx[:split], idx[split:]

Xtr, Xval = Xtrain_norm[train_idx], Xtrain_norm[val_idx]
ytr, yval = ytrain[train_idx], ytrain[val_idx]

print("train size:", Xtr.shape, "val size:", Xval.shape)


NaN в Xtrain: 0
NaN в ytrain: 0
train size: (8000, 100) val size: (2000, 100)


In [100]:
corr_features = np.corrcoef(Xtr.T)

corr_y = np.array([np.corrcoef(Xtr[:,i], ytr)[0,1] for i in range(Xtr.shape[1])])

fig = px.imshow(corr_features, color_continuous_scale="RdBu", zmin=-1, zmax=1,
                title="Корреляции между признаками (train)")
fig.show()

fig2 = go.Figure(data=[go.Bar(x=np.arange(len(corr_y)), y=corr_y)])
fig2.update_layout(title="Корреляция каждого признака с y", xaxis_title="Номер признака", yaxis_title="corr(x_i, y)")
fig2.show()


In [101]:
top_n = 3
top_idx = np.argsort(np.abs(corr_y))[-top_n:]
print("Выбраны признаки:", top_idx)

# (1, x, x^2, попарные произведения)
Xtr_sel = Xtr[:, top_idx]
Xval_sel = Xval[:, top_idx]
Xtest_sel = Xtest_norm[:, top_idx]

def poly_features(X):
    n, k = X.shape
    feats = [np.ones((n,1))]      
    feats.append(X)
    feats.append(X**2)
    
    for i in range(k):
        for j in range(i+1, k):
            feats.append((X[:,i]*X[:,j]).reshape(-1,1))
    return np.hstack(feats)

Phi_tr = poly_features(Xtr_sel)
Phi_val = poly_features(Xval_sel)
Phi_test = poly_features(Xtest_sel)
print("Форма Phi:", Phi_tr.shape)


Выбраны признаки: [51 48 13]
Форма Phi: (8000, 10)


In [102]:
def fit_linear(Phi, y):
    theta, *_ = np.linalg.lstsq(Phi, y, rcond=None)
    return theta

def predict(Phi, theta):
    return Phi @ theta

def rmse(y_true, y_pred):
    return np.sqrt(np.mean((y_true - y_pred)**2))

theta = fit_linear(Phi_tr, ytr)
y_pred_val = predict(Phi_val, theta)
y_pred_test = predict(Phi_test, theta)

rmse_val = rmse(yval, y_pred_val)
rmse_test = rmse(ytest, y_pred_test)

print(f"RMSE на валидации: {rmse_val:.4f}")
print(f"RMSE на тесте:     {rmse_test:.4f}")


RMSE на валидации: 13.0585
RMSE на тесте:     12.8048


In [103]:
# График сравнения
fig = go.Figure()
fig.add_trace(go.Scatter(y=yval, mode="markers", name="Истинные y"))
fig.add_trace(go.Scatter(y=y_pred_val, mode="markers", name="Предсказанные y"))
fig.update_layout(title="Сравнение истинных и предсказанных y (валидация)",
                  xaxis_title="№ наблюдения", yaxis_title="y")
fig.show()

# График остатков
resid = yval - y_pred_val
fig2 = go.Figure(go.Scatter(y=resid, mode="markers"))
fig2.update_layout(title="Остатки на валидации", xaxis_title="№ наблюдения", yaxis_title="y_true - y_pred")
fig2.show()
