# CNN Classification With News

***


# Getting Started

In [4]:
import time
import json

import numpy as np
import tensorflow as tf
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split
from src.repository import BarsRepo, DbLocation, DbSample

In [5]:
# image_dir = Path('../../data/charts/5min')

# Create File DataFrame

In [14]:
N_AFTER_BARS = 5  # 5 / 0
CHOSEN_N = 12  # from gap
PRICE_DIFF_FROM = 0  # 5 / 0

CHOSEN_N = CHOSEN_N - PRICE_DIFF_FROM  # FROM last bar in picture


bars_repo = BarsRepo(DbLocation.LOCAL, DbSample.ALL)
charts_df = bars_repo.get_charts_news_nth_price(n_after_bars=N_AFTER_BARS)

In [15]:
charts_df = charts_df.sample(frac=1.0, random_state=1).reset_index(drop=True)
# charts_df = charts_df[charts_df['news_count'] > 0]  # filter only gaps with news
charts_df[f'n_{CHOSEN_N}'] = charts_df.apply(lambda row : str(int(json.loads(row[f'n_after_{PRICE_DIFF_FROM}'])[f'{CHOSEN_N}'] >= 0)), axis = 1)
charts_df['filepath_jupyter'] = charts_df.apply(lambda row : f'../../{row["filepath"]}', axis = 1)

In [16]:
train_df, test_df = train_test_split(charts_df, train_size=0.7, shuffle=True, random_state=1)
train_df, val_df = train_test_split(train_df, train_size=0.8, shuffle=True, random_state=1)

# Loading Images and News Sentiment

In [17]:
class CustomDataGen(tf.keras.utils.Sequence):
    def __init__(self, df, x_col_img, x_col_attr, y_col, batch_size, target_size, shuffle=True):
        self.df = df.copy()
        self.x_col_img = x_col_img
        self.x_col_attr = x_col_attr
        self.y_col = y_col
        self.batch_size = batch_size
        self.target_size = target_size
        self.shuffle = shuffle

    def __len__(self):
        return len(self.df) // self.batch_size

    def __get_input_img(self, path, target_size):
        image = tf.keras.preprocessing.image.load_img(path)
        image_arr = tf.keras.preprocessing.image.img_to_array(image)
        image_arr = tf.image.resize(image_arr,(target_size[0], target_size[1])).numpy()
        return image_arr/255.

    def __get_input_attr(self, value):
        return value

    def __get_output(self, label):
        return int(label)

    def __get_data(self, batches):
        # Generates data containing batch_size samples
        x_batch_img = np.asarray(
            [self.__get_input_img(x, self.target_size) for x in batches[self.x_col_img]]
        )
        x_batch_attr = np.asarray(
            [self.__get_input_attr(x) for x in batches[self.x_col_attr]]
        )

        y_batch = np.asarray(
            [self.__get_output(y) for y in batches[self.y_col]]
        )

        return (x_batch_attr, x_batch_img), y_batch

    def __getitem__(self, index):
        batches = self.df[index * self.batch_size:(index + 1) * self.batch_size]
        x, y = self.__get_data(batches)
        return x, y

    def on_epoch_end(self):
        if self.shuffle:
            self.df = self.df.sample(frac=1.0, random_state=1).reset_index(drop=True)

In [18]:
x_col_img = 'filepath_jupyter'
x_col_attr = 'news_sentiment'
y_col = f'n_{CHOSEN_N}'
batch_size = 32
target_size = (119, 86)  # (714, 516), (119, 86)

train_generator = CustomDataGen(
    train_df, x_col_img, x_col_attr, y_col=y_col, batch_size=batch_size, target_size=target_size
)

val_generator = CustomDataGen(
    val_df, x_col_img, x_col_attr, y_col=y_col, batch_size=batch_size, target_size=target_size
)

test_generator = CustomDataGen(
    test_df, x_col_img, x_col_attr, y_col=y_col, batch_size=batch_size, target_size=target_size, shuffle=False
)

# Training

In [19]:
inputs_attr = tf.keras.Input(shape=(1))
x_attr = tf.keras.layers.Dense(32, activation='relu')(inputs_attr)
model_attr = tf.keras.Model(inputs=inputs_attr, outputs=x_attr)


inputs_img = tf.keras.Input(shape=(714, 516, 3))
x_img = tf.keras.layers.Conv2D(filters=32, kernel_size=3, activation='relu')(inputs_img)
x_img = tf.keras.layers.MaxPool2D(pool_size=2)(x_img)
x_img = tf.keras.layers.Conv2D(filters=64, kernel_size=3, activation='relu')(x_img)
x_img = tf.keras.layers.MaxPool2D(pool_size=2)(x_img)
x_img = tf.keras.layers.Conv2D(filters=128, kernel_size=3, activation='relu')(x_img)
x_img = tf.keras.layers.MaxPool2D(pool_size=2)(x_img)
x_img = tf.keras.layers.Flatten()(x_img)
x_img = tf.keras.layers.Dropout(0.5)(x_img)
model_img = tf.keras.Model(inputs=inputs_img, outputs=x_img)

combined_input = tf.keras.layers.concatenate([model_attr.output, model_img.output])
x = tf.keras.layers.Dense(128, activation="relu")(combined_input)
outputs = tf.keras.layers.Dense(1, activation='sigmoid')(x)

model = tf.keras.Model(inputs=[model_attr.input, model_img.input], outputs=outputs)

In [None]:
start = time.time()

def get_f1(y_true, y_pred):
    import tensorflow.keras.backend as K
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    recall = true_positives / (possible_positives + K.epsilon())
    f1_val = 2*(precision*recall)/(precision+recall+K.epsilon())
    return f1_val


optimizer = tf.keras.optimizers.Adam(lr=0.000_1)
model.compile(
    optimizer=optimizer,
    loss='binary_crossentropy',
    metrics=['accuracy', 'AUC', get_f1]
)

history = model.fit(
    train_generator,
    validation_data=val_generator,
    epochs=50,
    callbacks=[
        tf.keras.callbacks.EarlyStopping(
            monitor='val_loss',
            patience=5,
            restore_best_weights=True,
            verbose=1
        )
    ]
)

end = time.time()
print(f'training TIME: {end - start}')

# Results

In [None]:
predicted = np.squeeze(model.predict(test_generator))
actual = test_generator.df[test_generator.y_col]

test_loss, test_acc, test_auc, test_f1 = model.evaluate(test_generator)
print(f"Test loss: {test_loss:.3f}")
print(f"Test accuracy: {test_acc:.3f}")
print(f"Test AUC: {test_auc:.3f}")
print(f"Test F1-score: {test_f1:.3f}")

r2 = r2_score(actual[:-9], predicted)
print("Test R^2 Score: {:.5f}".format(r2))

print(f' - loss: **{test_loss:.3f}**; acc: **{test_acc:.3f}**; AUC: **{test_auc:.3f}**; F1: **{test_f1:.3f}**; R2: **{r2:.5f}**')

In [None]:
def m2tex(model, modelName):
    stringlist = []
    model.summary(line_length=70, print_fn=lambda x: stringlist.append(x))
    del stringlist[1:-4:2]
    del stringlist[-1]
    for ix in range(1, len(stringlist) - 3):
        tmp = stringlist[ix]
        stringlist[ix] = tmp[0:31] + "& " + tmp[31:59] + "& " + tmp[59:] + "\\\\ \hline"
    stringlist[0] = "Model: {} \\\\ \hline".format(modelName)
    stringlist[1] = stringlist[1] + " \hline"
    stringlist[-4] += " \hline"
    stringlist[-3] += " \\\\"
    stringlist[-2] += " \\\\"
    stringlist[-1] += " \\\\ \hline"
    prefix = ["\\begin{table}[]", "\\begin{tabular}{lll}"]
    suffix = ["\end{tabular}", "\caption{{Model summary for {}.}}".format(modelName), "\label{tab:model-summary}",
              "\end{table}"]
    stringlist = prefix + stringlist + suffix
    out_str = " \n".join(stringlist)
    out_str = out_str.replace("_", "\_")
    out_str = out_str.replace("#", "\#")
    print(out_str)


m2tex(model, 'OLA')

In [None]:
model.summary()

In [None]:
import matplotlib.pyplot as plt
accuracy = history.history["accuracy"]
val_accuracy = history.history["val_accuracy"]
loss = history.history["loss"]
val_loss = history.history["val_loss"]
epochs = range(1, len(accuracy) + 1)
plt.plot(epochs, accuracy, "bo", label="Training accuracy")
plt.plot(epochs, val_accuracy, "b", label="Validation accuracy")
plt.title("Training and validation accuracy")
plt.legend()
plt.figure()
plt.plot(epochs, loss, "bo", label="Training loss")
plt.plot(epochs, val_loss, "b", label="Validation loss")
plt.title("Training and validation loss")
plt.legend()
plt.show()