In [1]:
from functions_py import convert_to_polar, polar_conversion, func_def, fourier_coef_calc, get_label, img_plot, polar_conversion_fast, get_label_fast
from functions_py import pipeline_train, batch_get_labels, optimized_test_class_alg
from functions_py import optimized_func_def, optimized_fourier_coef_calc
from functions_py import qwen_pipeline_train, qwen_optimized_test_class_alg, qwen_optimized_func_def, qwen_optimized_fourier_coef_calc, qwen_polar_conversion_fast
from functions_py import run_hyperparameter_optimization
import optuna
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import os
from numba import njit
from scipy.spatial.distance import cdist


import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
from sklearn.model_selection import train_test_split
import os

In [60]:
# CNN определение
import numpy as np
import pandas as pd
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelBinarizer

def cnn_test(
    df: pd.DataFrame,
    preprocess_func: callable,
    random_state: int = 42,
    epochs: int = 10,
    batch_size: int = 128
) -> tuple[float, float]:
    """
    Тестирует CNN на предобработанных данных.
    
    Параметры:
    - df: DataFrame с колонками 'label' и пикселями
    - preprocess_func: функция предобработки, возвращает DataFrame с фичами
    - random_state: для воспроизводимости
    - epochs: количество эпох
    - batch_size: размер батча
    
    Возвращает:
    - (train_accuracy, test_accuracy)
    """
    # Применяем предобработку
    processed_df = preprocess_func(df)
    
    # Разделяем на признаки и метки
    X = processed_df.drop('label', axis=1).values
    y = processed_df['label'].values
    
    # Разбиваем на train/test
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=random_state
    )
    
    # Нормализация (если нужно)
    if X_train.max() > 1:
        X_train = X_train.astype('float32') / 255.0
        X_test = X_test.astype('float32') / 255.0
    
    # One-hot кодирование
    lb = LabelBinarizer()
    y_train = lb.fit_transform(y_train)
    y_test = lb.transform(y_test)

    # Определяем input_shape
    input_shape = (X_train.shape[1], 1)  # (n_features, 1)
    
    # Архитектура сети
    model = keras.Sequential([
        layers.Reshape(input_shape, input_shape=(X_train.shape[1],)),
        
        # Conv1D вместо Conv2D, так как у нас 1D признаки
        layers.Conv1D(32, 3, activation='relu'),
        layers.BatchNormalization(),
        layers.MaxPooling1D(2),
        
        layers.Conv1D(64, 3, activation='relu'),
        layers.BatchNormalization(),
        layers.MaxPooling1D(2),
        
        layers.Conv1D(128, 3, activation='relu'),
        layers.BatchNormalization(),
        
        layers.Flatten(),
        layers.Dense(256, activation='relu'),
        layers.Dropout(0.5),
        layers.Dense(10, activation='softmax')
    ])
    
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.001),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    
    # Обучение
    model.fit(
        X_train, y_train,
        epochs=epochs,
        batch_size=batch_size,
        validation_data=(X_test, y_test),
        verbose=0
    )
    
    # Оценка
    train_acc = model.evaluate(X_train, y_train, verbose=0)[1]
    test_acc = model.evaluate(X_test, y_test, verbose=0)[1]
    
    return train_acc, test_acc

In [67]:
# Тест CNN
base = ''
input_data = pd.read_csv(base + 'train.csv')
state = 148

train_acc, test_acc = cnn_test(
    df=input_data,
    preprocess_func=extract_autoencoder_features,  # Ваша функция
    random_state=state,
    epochs=5
)

print(f"Train Accuracy: {train_acc:.5f}")  # ~0.995
print(f"Test Accuracy: {test_acc:.5f}")    # ~0.990

[1m1313/1313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 3ms/step


  super().__init__(**kwargs)


Train Accuracy: 0.96539
Test Accuracy: 0.94821


In [37]:
# Функции предобработки
def transform_pixels_to_sums_vectorized(df):
    # Создаем копию исходного DataFrame
    result_df = df[['label']].copy()
    
    # Получаем все пиксельные колонки
    pixel_cols = [f'pixel{i}' for i in range(784)]
    pixels = df[pixel_cols].values
    
    # Преобразуем в 3D массив (n_images, 28, 28)
    images = pixels.reshape(-1, 28, 28)
    
    # Суммы по строкам (axis=2 - сумма по столбцам в каждом ряду)
    row_sums = images.sum(axis=2)
    
    # Суммы по столбцам (axis=1 - сумма по рядам в каждом столбце)
    col_sums = images.sum(axis=1)
    
    # Добавляем суммы в DataFrame
    for i in range(28):
        result_df[f'sum_row{i}'] = row_sums[:, i] / 28
    for i in range(28):
        result_df[f'sum_column{i}'] = col_sums[:, i] / 28
    
    return result_df

from skimage.feature import hog

def extract_hog_features(df, orientations=8, pixels_per_cell=(8, 8), cells_per_block=(1, 1)):
    result_df = df[['label']].copy()
    pixel_cols = [f'pixel{i}' for i in range(784)]
    images = df[pixel_cols].values.reshape(-1, 28, 28)
    
    hog_features = []
    for image in images:
        features = hog(image, orientations=orientations, pixels_per_cell=pixels_per_cell,
                       cells_per_block=cells_per_block, feature_vector=True)
        hog_features.append(features)
    
    hog_array = np.array(hog_features)
    for i in range(hog_array.shape[1]):
        result_df[f'hog_{i}'] = hog_array[:, i]
    
    return result_df

from scipy.stats import skew, kurtosis

def extract_statistical_features(df):
    """Извлечение статистических характеристик изображения"""
    result_df = df[['label']].copy()
    pixels = df[[f'pixel{i}' for i in range(784)]].values
    
    # Вычисление статистик
    means = pixels.mean(axis=1)
    stds = pixels.std(axis=1)
    skews = skew(pixels, axis=1)
    kurtoses = kurtosis(pixels, axis=1)
    
    result_df['mean'] = means
    result_df['std'] = stds
    result_df['skew'] = skews
    result_df['kurtosis'] = kurtoses
    
    return result_df

from skimage.util import view_as_blocks

def extract_pooling_features(df, pool_size=7, mode='max'):
    """Агрегация признаков по регионам изображения"""
    result_df = df[['label']].copy()
    pixels = df[[f'pixel{i}' for i in range(784)]].values
    images = pixels.reshape(-1, 28, 28)
    
    assert 28 % pool_size == 0, "Размер изображения должен делиться на размер окна"
    n_blocks = 28 // pool_size
    
    features = []
    for img in images:
        blocks = view_as_blocks(img, block_shape=(pool_size, pool_size))
        if mode == 'max':
            pooled = blocks.max(axis=(2,3)).flatten()
        elif mode == 'mean':
            pooled = blocks.mean(axis=(2,3)).flatten()
        features.append(pooled)
    
    pooled_arr = np.array(features)
    for i in range(pooled_arr.shape[1]):
        result_df[f'pool_{mode}_{i}'] = pooled_arr[:, i]
    
    return result_df

def extract_combined_features(df):
    """Комбинация HOG и статистических признаков"""
    # Извлекаем HOG-признаки
    hog_df = extract_hog_features(df)
    
    # Извлекаем статистические признаки
    stat_df = extract_statistical_features(df).drop(columns=['label'])
    
    # Объединяем результаты
    result_df = pd.concat([hog_df, stat_df], axis=1)
    return result_df

from skimage.feature import local_binary_pattern

def extract_lbp_features(df, radius=3, n_points=24):
    result_df = df[['label']].copy()
    pixels = df[[f'pixel{i}' for i in range(784)]].values
    images = pixels.reshape(-1, 28, 28)
    
    lbp_features = []
    for img in images:
        lbp = local_binary_pattern(img, P=n_points, R=radius, method='uniform')
        hist, _ = np.histogram(lbp, bins=n_points+2, range=(0, n_points+2))
        lbp_features.append(hist)
    
    lbp_arr = np.array(lbp_features)
    for i in range(lbp_arr.shape[1]):
        result_df[f'lbp_{i}'] = lbp_arr[:, i]
    
    return result_df

from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.models import Model

def extract_autoencoder_features(df, encoding_dim=32):
    result_df = df[['label']].copy()
    pixels = df[[f'pixel{i}' for i in range(784)]].values / 255.0
    
    # Архитектура автоэнкодера
    input_img = Input(shape=(784,))
    encoded = Dense(encoding_dim, activation='relu')(input_img)
    decoded = Dense(784, activation='sigmoid')(encoded)
    autoencoder = Model(input_img, decoded)
    encoder = Model(input_img, encoded)
    
    autoencoder.compile(optimizer='adam', loss='mse')
    autoencoder.fit(pixels, pixels, epochs=10, batch_size=256, verbose=0)
    
    encoded_features = encoder.predict(pixels)
    for i in range(encoding_dim):
        result_df[f'ae_{i}'] = encoded_features[:, i]
    
    return result_df

In [47]:
# вычисление аккураси
def test_train_class(predobr_func, D, M, T)"
    base = ''
    input_data = pd.read_csv(base + 'train.csv')
    conversion_state = True
    write_state = False
    state = 148
    target = 'label'
    data_with_labels = predobr_func(input_data)
    
    t_coef = T
    file_temp_name = prefix + 'D'+ str(D) + 'M' + str(M)
    
    
    data_train, data_test, data_train_means, fourier_coef = pipeline_train(data_with_labels, target, state, conversion_state, write_state, D, M)
    print('train acc')
    accuracy = optimized_test_class_alg(prefix, file_temp_name, target,data_train , data_train, data_train_means, fourier_coef, t_coef, rewrite = True)
    print(accuracy)
    print('test acc')
    accuracy = optimized_test_class_alg(prefix, file_temp_name, target,data_test , data_train, data_train_means, fourier_coef, t_coef, rewrite = True)
    print(accuracy)

In [57]:
test_train_class(extract_autoencoder_features, 3400, 11, 1.4308)

[1m1313/1313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 2ms/step
train acc
0.674271499644634
test acc
0.6604617604617604


In [None]:
# Обучение
base = ''
input_data = pd.read_csv(base + 'train.csv')
conversion_state = True
write_state = False
state = 148
target = 'label'

prefix = 'COEF_TEST_1.05_column_row_'
D_left = 10
D_right = 15000
M_left = 2
M_right = 21
T_left = 0.1
T_right = 2.0

max_iter = 1000

best_param, best_val = run_hyperparameter_optimization(extract_autoencoder_features, input_data, target, 
                                                       conversion_state, write_state, state, base, prefix, D_left, D_right, 
                                                       M_left, M_right, T_left, T_right, max_iter)
print('param', best_param)
print('acc = ', best_val)