In [None]:
import numpy as np
import pandas as pd
import os

В этом задании необходимо попытаться предсказать, сгенерировано ли изображение при помощи ИИ или создано человеком.

In [None]:
# загружаем нужные библиотеки
import keras
from keras.preprocessing import image
import sklearn
import matplotlib
import matplotlib.pyplot as plt
import cv2
from skimage import io
import imageio

# Предобработка изображений

In [None]:
# функция, поочередно открывающая картинки из папки, преобразующая их в вектора и складывающая в массив
# функция открывает картинку не в RGB, а в сером цвете (IMREAD_GRAYSCALE)
def image2array(filelist):
    image_array = []
    d = 'path'
    for image in os.listdir(filelist):
        img = cv2.imread(os.path.join(d,image), cv2.IMREAD_GRAYSCALE)
        img = cv2.resize(img, (600,600)).flatten()
        img = np.asarray(img)
        image_array.append(img)
    return np.array(image_array)

In [None]:
filelist = 'path'

train_data = image2array(filelist)

In [None]:
# проверяем размер, убеждаемся, что в папке у нас 1518 картинок, каждая из которых выражена вектором длины 360000
print(train_data.shape)

In [None]:
# проверяю первый элемент
print(train_data[0])

In [None]:
# тип элемента
type(train_data[0])

Ниже экспериментальный блок, для проверки что происходит с одной картинкой при вышеуказанном преобразовании.

In [None]:
img = cv2.imread('path', cv2.IMREAD_GRAYSCALE)
print(img.size, img.shape)
plt.imshow(img)

In [None]:
img = cv2.resize(img, (1024,1024))
print(img.size,img.shape)
print(img)

In [None]:
plt.imshow(img)

Продолжаем предобработку изображений

In [None]:
# создаем функцию для списка имен изображений из папки
def create_namelist(filelist):
    names_array = []
    d = 'path'
    for image in os.listdir(filelist):
        names_array.append(image)
    return names_array

In [None]:
a = create_namelist('path')
a = np.array(a)
print(a)
print(a.size)

In [None]:
# размер тот же - 1518, преобразуем в DataFrame для дальнейшей работы
a = pd.DataFrame(data = a)

In [None]:
# называем колонку с именем 'id'
a.columns=['id']

In [None]:
# так как в scv файле с таргетом не у всех изображений указано расширение, пишем функцию,
# которая добавит нам колонку без расширения
def png_name(column):
    t = column[0]
    result = t.split(".")[0]
    return result

In [None]:
a['id_corr'] = a[['id']].apply(png_name, axis = 1)

In [None]:
a.head(10)

In [None]:
# загружаем таргет
train_target = pd.read_csv('path')
train_target = pd.DataFrame(train_target)
train_target.head(10)

In [None]:
# аналогчно избавляемся от расширений
train_target['id_corr'] = train_target[['id']].apply(png_name, axis = 1)

In [None]:
train_target.head(5)

In [None]:
# соединяем списки по id_corr
new_merged = pd.merge(a, train_target, on = 'id_corr', how = 'left')
new_merged.head(10)
# там, где нет таргета - это тестовая выборк, где целевой класс проставлен - тренировочная(разделим её позже на train, test, validation)

In [None]:
# это действие добавляет дополнительной колонкой вектор, соответствующий картинке в наш DataFrame с таргетами и айди
new_merged['matrix'] = train_data.tolist()

In [None]:
new_merged.head(5)

In [None]:
# из всего числа изображений должно быть 506 без таргета, так как для этого набора будет необходимо предсказать
# вероятность попадания в целевой класс (сгенерировано ИИ)
count_nan = new_merged['target'].isnull().sum()
count_nan

In [None]:
# выделяем тренировочный датафрейм с указанными таргетами
train_set = new_merged.loc[((new_merged['target'] == 1)) | ((new_merged['target'] == 0))]
train_set.head(5)

In [None]:
# формируем тестовый датафрейм
test_set = new_merged.loc[((new_merged['target'].isnull()))]
test_set.head(5)

In [None]:
# следующим шагом формируем матрицу X для тренировочной выборки
def mtx(df):
    array=[]
    for i in df:
        i = np.asarray(i)
        array.append(i)
    return array


In [None]:
X_train = mtx(train_set['matrix'])

In [None]:
X_train = np.asarray(X_train)
X_train

In [None]:
# получили матрицу, где строки - картинки, столбцы - вектора, их описывающие. всего в тренировочной выборке
# 1012 картинок, каждая это вектор размера 360000
X_train.shape

In [None]:
#аналогичным образом достаем таргет для тренировочной выборки
Y_train = mtx(train_set['target'])

In [None]:
Y_train = np.asarray(Y_train)
Y_train

In [None]:
# таргет - это вектор из 0 и 1(обозначающих классы), где класс определен для 1012 изображений
Y_train.shape

In [None]:
# делаем тоже самое для тестовой выборки
X_test = mtx(test_set['matrix'])

In [None]:
X_test = np.asarray(X_test)
X_test

In [None]:
X_test.shape

Тестовая выборка в нашем случае - это выборка, результат классификации для которой нам необходимо отправить на проверку. Работаем и обучаем модель мы только на размеченных данных. Разделим train выборку на тренировочную и тестовую.

In [None]:
import sklearn
from sklearn.model_selection import train_test_split

In [None]:
# делим выборку на тренировочную и тестовую случайным образом
X_train, X_test, y_train, y_test = train_test_split(X_train, Y_train, random_state=42)

In [None]:
print('Train:', X_train.shape, y_train.shape)
print('Test:', X_test.shape, y_test.shape)

Проведем стандартизацию

In [None]:
#делаем преобразование только на основе тренировочной подвыборки, чтобы избежать подглядывания
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()

X_train = pd.DataFrame(scaler.fit_transform(X_train))
X_test = pd.DataFrame(scaler.transform(X_test))

# Моделирование

In [None]:
# задаем функцию сигмоиды, возвращающую вероятность принадлежности к целевому классу
def sigmoid(x):
    return 1/(1+ np.exp(-x))

# Автоматическое обучение

используем модель логистической регрессии из sklearn

In [None]:
import sklearn
from sklearn.linear_model import LogisticRegression

In [None]:
model = LogisticRegression(random_state=42, solver ='liblinear')
model.fit(X_train, y_train)

In [None]:
model_C = LogisticRegression(random_state=42, solver ='liblinear',C = 1e-2 )
model_C.fit(X_train, y_train)

In [None]:
import seaborn as sns

In [None]:
from sklearn import metrics

y_pred = model.predict(X_test)
y_pred_C = model_C.predict(X_test)
confusion_matrix1 = metrics.confusion_matrix(y_test, y_pred)
confusion_matrix2 = metrics.confusion_matrix(y_test, y_pred_C)
# Визуализируем матрицы ошибок
fig, axes = plt.subplots(1, 2, figsize=(12, 5)) #фигура + 2 координатные плоскости
# Строим тепловую карту для матрицы ошибок
sns.heatmap(confusion_matrix1, annot=True, fmt='', ax=axes[0], cmap='Blues')
# Добавляем название графику и подписи осей абсцисс и ординат
axes[0].set_title('Матрица ошибок для классификации')
axes[0].set_xlabel('y prediction')
axes[0].set_ylabel('y true')
# Строим тепловую карту для второй матрицы ошибок
sns.heatmap(confusion_matrix2, annot=True, fmt='', ax=axes[1], cmap='Blues')
# Добавляем название графику и подписи осей абсцисс и ординат
axes[1].set_title('Матрица ошибок для классификации, C=1e-2')
axes[1].set_xlabel('y prediction')
axes[1].set_ylabel('y true')

In [None]:
print(metrics.classification_report(y_test, y_pred))
print(metrics.classification_report(y_test, y_pred_C))

Поиграемся с порогом вероятности

In [None]:
# Нас интересует только вероятность класса 1(второй столбец)
y_proba_pred = model.predict_proba(X_test)[:, 1]

# Для удобства завернём numpy-массив в Pandas Series

y_proba_pred = pd.Series(y_proba_pred)
# Создадим списки, в которых будем хранить значения метрик
recall_scores = []
precision_scores = []
f1_scores = []
accuracy_scores =[]

# Сгенерируем набор вероятностных порогов в диапазоне от 0.1 до 1
thresholds = np.arange(0.1, 1, 0.05)

# В цикле будем перебирать сгенерированные пороги
for threshold in thresholds:
    y_pred = y_proba_pred.apply(lambda x: 1 if x>threshold else 0)
    #Считаем метрики и добавляем их в списки
    recall_scores.append(metrics.recall_score(y_test, y_pred))
    precision_scores.append(metrics.precision_score(y_test, y_pred))
    f1_scores.append(metrics.f1_score(y_test, y_pred))
    accuracy_scores.append(metrics.accuracy_score(y_test, y_pred))

In [None]:
# Строим линейный график зависимости метрик от threshold
plt.figure(figsize=(10, 6))
plt.plot(thresholds, recall_scores, label='Recall')
# Строим линейный график зависимости precision от threshold
plt.plot(thresholds, precision_scores, label='Precision')
# Строим линейный график зависимости F1 от threshold
plt.plot(thresholds, f1_scores, label='F1-score')
# Строим линейный график зависимости F1 от threshold
plt.plot(thresholds, accuracy_scores, label='Accuracy')

plt.title('Зависимость метрик от порога вероятности')
plt.xlabel('Порог вероятности')
plt.ylabel('Метрика')
plt.xticks(thresholds)
plt.legend()

Есть смысл повысить порог вероятности, при котором картинка уже будет считаться первым классом. Так как у нас нет приоритетного класса, нет бизнес-контекста, чтобы приоретизировать метрики precision или recall, нам скорее важна метрика accuracy, и соблюдение баланса между остальными. Замечаем, что accuracy растет вместе с повышением порога вероятности причисление объекта к 1 классу. Однако, имеется дисбаланс классов в сторону 0. Будем иметь этот момент в виду

In [None]:
thresholds = list(thresholds)

In [None]:
equal_score_index = thresholds.index(0.9000000000000002)
accuracy = accuracy_scores[equal_score_index]
recall = recall_scores[equal_score_index]
precision = precision_scores[equal_score_index]
f1 = f1_scores[equal_score_index]
print(f"Accuracy для тренировочной выборки: {accuracy} при threshold: {thresholds[equal_score_index]}")
print(f"F1-score для тренировочной выборки: {f1} при threshold: {thresholds[equal_score_index]}")
print(f"Precision для тренировочной выборки: {precision} при threshold: {thresholds[equal_score_index]}")
print(f"Recall для тренировочной выборки: {recall} при threshold: {thresholds[equal_score_index]}")

In [None]:
#Задаём оптимальный порог вероятностей
threshold_opt = 0.9000000000000002
#Изображения, для которых вероятность быть сгенерированными > 0.9000000000000002, относим к классу 1
#В противном случае — к классу 0
y_pred_opt = y_proba_pred.apply(lambda x: 1 if x > threshold_opt else 0)
#Считаем метрики
print(metrics.classification_report(y_test, y_pred_opt))

In [None]:
# проверяем целемую метрику - logloss
from sklearn.metrics import log_loss

logloss_train = log_loss(y_train, model.predict(X_train))
logloss_test = log_loss(y_test, model.predict(X_test))
print(f'Logloss для тренировочной выборки: {logloss_train}')
print(f'Logloss для тестовой выборки: {logloss_test}')

**В качестве финального решения выбираем логистическую регрессию, с порогом вероятности принадлежности к первому классу равному 0.9000000000000002. Еще раз выведем полученные метрики:**

In [None]:
print(metrics.classification_report(y_test, y_pred_opt))

**Далее следует экспериментальный блок.**

# Ручное обучение модели

Для ручного обучения модели нам необходим максимизировать функцию правдоподобия по весам модели логистической регрессии (функция правдоподобия описывает вероятность совместного появления наблюдений). Эта задача эквивалентна минимизации метрики logloss. В этих целях будем использовать метод градиентного спуска.

In [None]:
# стандартизируем (не использую в итоговом варианте)
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_train.mean(axis=0), X_train.std(axis=0)

In [None]:
X_train

In [None]:
#добавляем единичный признак для свободных весов
X_train = np.c_[np.ones(len(X_train)), X_train]
X_train

In [None]:
# делаю тоже самое для тестовой подвыборки
X_test = np.c_[np.ones(len(X_test)), X_test]

In [None]:
# задаем функцию сигмоиды, возвращающую вероятность принадлежности к целевому классу
def sigmoid(x):
    return 1/(1+ np.exp(-x))

# задаем функцию logloss, которую нам необходимо минимизировать для максимизации функции правдоподобия
def logloss(y, y_proba):
    logloss_1 = np.sum(np.log(y_proba[y == 1] + 1e-30))
    logloss_0 = np.sum(np.log(1 - y_proba[ y == 0] + 1e-30))
    logloss_total = -(logloss_0 + logloss_1)/ len(y)
    return logloss_total

# задаем функцию градиента logloss, так как будем использовать градиентный спуск для минимизации logloss
def gr_logloss(X, W, y):
    y_proba = sigmoid(X @ W)
    grad = X.T @ (y_proba - y)
    return grad


In [None]:
# для визуализации, не использую
x_min, x_max = X_train[:,0].min() - .5, X_train[:,0].max() + .5
y_min, y_max = X_train[:,1].min() - .5, X_train[:,1].max() + .5
h = .02
xx, yy = np.meshgrid(np.arange(x_min, x_max, h), np.arange(y_min, y_max, h))

In [None]:
# для визуализации, не использую
def visualize(W):
    # не уверена, что этот шаг нужен, мы же уже прибавили единичный признак ранее
    #x_t_ones = np.c_[np.ones(x_t.shape[0]), x_t]
    x_t_ones = X_train

    y_proba = sigmoid(x_t_ones @ W)
    Z = np.where(y_proba >= 0.5, 1, 0)

    Z = Z.reshape(xx.shape)

    plt.figure(1, figsize(8,8))
    plt.pcolormesh(xx, yy, Z, cmap=plt.cm.Paired)

    plt.scatter(X[y == 0][:,1], X[y == 0][:, 2], edgecolors ='k')
    plt.scatter(X[y == 1][:,1], X[y == 1][:, 2], edgecolors ='k')

    plt.xlim(xx.min(), xx.max())
    plt.ylim(yy.min(), yy.max())
    plt.xticks()
    plt.show()

In [None]:
# установка минимума сдвига для весов
eps = 0.0001

# рандомные первоначальные веса, от которых начинаем двигаться
np.random.seed(8)
W = np.random.randn(X_train.shape[1])

# размер шага
learning_rate = 0.001

# первоначальные веса
next_W = W

index = []
logloss_list = []
# количество итераций
n = 3000
for i in range(n):
    cur_W = next_W
    next_W = cur_W - learning_rate*gr_logloss(X_train, cur_W, Y_train)

    #если топчемся на одном месте, останавливаемся и выходим из цикла
    if np.linalg.norm(cur_W - next_W) <= eps:
        break

    if i % 50 == 0:
        print(f'Итерация {i}')
        index.append(i)
        ll = logloss(Y_train, y_proba)
        logloss_list.append(ll)
        y_proba = sigmoid (X_train @ next_W)
        y_class = np.where(y_proba >=0.5, 1, 0)
        print(f'Logloss {logloss(Y_train, y_proba)}')
        print('----------------------------------------')

plt.plot(index,logloss_list)
plt.xticks(index,rotation='vertical')
plt.xlabel("Number of Iterarion")
plt.ylabel("Logloss")
plt.show()


In [None]:
# кажую 50-ю итерацию мы выводили размер logloss, на графике видно, как функция приближается к минимуму
# однако logloss, полученный вручную, далек от полученного встроенной моделью sklearn

In [None]:
# выведем вектор весов. веса кажутся огромными, так как мы не применяли регуляризацию
print(cur_W)

In [None]:
# расчитаем вероятности принадлежности к целевому классу
test_set['Y_proba_manual'] = sigmoid(X_test.dot(cur_W.T))

In [None]:
test_set.head(10)

In [None]:
check_gen_man = test_set.loc[(test_set['Y_proba_manual'] == 1)]
check_gen_man

In [None]:
# проверим, в чем наши модели расходятся
check_diff = test_set.loc[(test_set['pred'] != test_set['Y_proba_manual'])]
check_diff.head(20)

In [None]:
check_common = test_set.loc[(test_set['pred'] == test_set['Y_proba_manual'])]
check_common.head(20)

По совокупности факторов - размер logloss и ошибкам в предсказании, останавливаем выбор на модели логистической регрессии из sklearn

# Сверточная нейросеть

Сверточные нейросети классически используются для категоризации изображений на основе нарисованных на них объектов. В данном случае это не кажется уместным, так как сгенерированные изображения не схожи по присутствующим на них объектах. Однако, всё же проверим результат работы алгоритма и на этой выборке.

In [None]:
import shutil

In [None]:
# для начала нам необходимо разложить по папкам изображения в зависимости от их класса. также у нас отдельно у нас
# отдельно будет лежать тестовая и тренировочная выборки. копируем нужный DataFrame
origin = new_merged
origin.head(5)

In [None]:
# очистка в случае повторного рана
shutil.rmtree('/kaggle/working/gen')
shutil.rmtree('/kaggle/working/ngen')
shutil.rmtree('/kaggle/working/test')

In [None]:
# создаем директории
os.mkdir('/kaggle/working/gen')
os.mkdir('/kaggle/working/ngen')
os.mkdir('/kaggle/working/test')
os.mkdir('/kaggle/working/train')

In [None]:
# функция, сортирующая изображения по папкам
def sort_folders(columns):
    nm = columns[0]
    tg = columns[1]

    folder_origin = '/kaggle/input/images/generated-or-not/images'
    folder_train_gen = '/kaggle/working/gen'
    folder_train_notgen = '/kaggle/working/ngen'
    folder_test = '/kaggle/working/test'
    #for img in os.listdir(folder_origin):
    if tg == 1:
        shutil.copy(os.path.join(folder_origin, nm), os.path.join(folder_train_gen, nm))
    if tg == 0:
        shutil.copy(os.path.join(folder_origin, nm), os.path.join(folder_train_notgen, nm))
    else:
        shutil.copy(os.path.join(folder_origin, nm), os.path.join(folder_test, nm))
    return '+'

In [None]:
origin[['id_x', 'target']].apply(sort_folders, axis = 1)

In [None]:
shutil.move('/kaggle/working/gen', '/kaggle/working/train')
shutil.move('/kaggle/working/ngen', '/kaggle/working/train')

In [None]:
II_dir = os.path.join('/kaggle/working/train/gen')
human_dir = os.path.join('/kaggle/working/train/ngen')

In [None]:
print('total training II images:', len(os.listdir(II_dir)))
print('total training human images:', len(os.listdir(human_dir)))

In [None]:
II_files = os.listdir(II_dir)
print(II_files[:10])

human_files = os.listdir(human_dir)
print(human_files[:10])

In [None]:
# предобработка завершена. устанавливаем нужные библиотеки.
import tensorflow as tf
import keras
from tensorflow.keras.preprocessing import image
from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [None]:
TRAINING_DIR = "/kaggle/working/train"
training_datagen = ImageDataGenerator(
      rescale = 1./255,
      rotation_range=40,
      width_shift_range=0.2,
      height_shift_range=0.2,
      shear_range=0.2,
      zoom_range=0.2,
      horizontal_flip=True,
      fill_mode='nearest')

train_generator = training_datagen.flow_from_directory(
	TRAINING_DIR,
	target_size=(224,224),
	class_mode='categorical',
  batch_size=126
)

In [None]:
model = tf.keras.models.Sequential([
    # This is the first convolution
    tf.keras.layers.Conv2D(64, (3,3), activation='relu', input_shape=(224, 224, 3)),
    tf.keras.layers.MaxPooling2D(2, 2),
    # The second convolution
    tf.keras.layers.Conv2D(64, (3,3), activation='relu'),
    tf.keras.layers.MaxPooling2D(2,2),
    # The third convolution
    tf.keras.layers.Conv2D(128, (3,3), activation='relu'),
    tf.keras.layers.MaxPooling2D(2,2),
    # The fourth convolution
    tf.keras.layers.Conv2D(128, (3,3), activation='relu'),
    tf.keras.layers.MaxPooling2D(2,2),
    # Flatten the results to feed into a DNN
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dropout(0.5),
    # 512 neuron hidden layer
    tf.keras.layers.Dense(512, activation='relu'),
    tf.keras.layers.Dense(2, activation='softmax')
])

In [None]:
model.summary()

model.compile(loss = 'categorical_crossentropy', optimizer='rmsprop', metrics=['accuracy'])

history = model.fit(train_generator, epochs=25, steps_per_epoch=20)

model.save("rps.h5")

In [None]:
# predicting images
for image in os.listdir('/kaggle/working/test'):

    path = '/kaggle/working/test/' + image
    #print(path)
    img = keras.utils.load_img(path, target_size=(224, 224))
    #print(type(img))
    x = keras.utils.img_to_array(img)
    x = np.expand_dims(x, axis=0)

    images = np.vstack([x])
    classes = model.predict(images, batch_size=10)
    print(image)
    print(classes)

#результаты получились менее удовлетворительными в сравнении с логистической регрессией