# Detección de anomalías con redes transformer

In [14]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from ta import add_all_ta_features

# Cargar los datos
df = pd.read_csv('bitcoin_minute_data.csv', parse_dates=['timestamp'], index_col='timestamp')

# Calcular indicadores técnicos
df = add_all_ta_features(df, open="price", high="high", low="low", close="close", volume="volume")

# Calcular los retornos logarítmicos
df['log_return'] = np.log(df['close'].astype(float) / df['close'].astype(float).shift(1))

# Seleccionar características para el modelo
features = ['log_return', 'volume', 'volatility_bbm', 'volatility_bbh', 'volatility_bbl', 'volatility_kcc', 
            'volatility_kch', 'volatility_kcl', 'volatility_atr', 'trend_macd', 'trend_macd_signal', 'trend_macd_diff']
df = df[features]

# Eliminar filas con NaN (pueden ser generados por indicadores técnicos)
df.dropna(inplace=True)

# Dividir en sets de entrenamiento, validación y prueba (70%, 15%, 15%)
train_size = int(len(df) * 0.7)
valid_size = int(len(df) * 0.15)
test_size = len(df) - train_size - valid_size

train_data = df[:train_size]
valid_data = df[train_size:train_size + valid_size]
test_data = df[train_size + valid_size:]

  self._psar[i] = high2


In [15]:
# Normalizar los datos
scaler = StandardScaler()
train_data = scaler.fit_transform(train_data)
valid_data = scaler.transform(valid_data)
test_data = scaler.transform(test_data)

# Función para crear dataset
def create_dataset(data, seq_length):
    X, y = [], []
    for i in range(len(data) - seq_length - 1):
        X.append(data[i:(i + seq_length)])
        y.append(data[i + seq_length])
    return np.array(X), np.array(y)


X_train, y_train = create_dataset(train_data, seq_length)
X_valid, y_valid = create_dataset(valid_data, seq_length)
X_test, y_test = create_dataset(test_data, seq_length)

## Definición del modelo

In [16]:
import tensorflow as tf
from tensorflow.keras import layers, Model

seq_length = 60

class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerBlock, self).__init__()
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = tf.keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training=False):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

class Time2Vector(layers.Layer):
    def __init__(self, seq_len):
        super(Time2Vector, self).__init__()
        self.seq_len = seq_len

    def build(self, input_shape):
        self.weights_linear = self.add_weight(name='weight_linear',
                                              shape=(int(self.seq_len),),
                                              initializer='uniform',
                                              trainable=True)
        self.bias_linear = self.add_weight(name='bias_linear',
                                           shape=(int(self.seq_len),),
                                           initializer='uniform',
                                           trainable=True)
        self.weights_periodic = self.add_weight(name='weight_periodic',
                                                shape=(int(self.seq_len),),
                                                initializer='uniform',
                                                trainable=True)
        self.bias_periodic = self.add_weight(name='bias_periodic',
                                             shape=(int(self.seq_len),),
                                             initializer='uniform',
                                             trainable=True)

    def call(self, x):
        x = tf.math.reduce_mean(x[:,:,:4], axis=-1)
        time_linear = self.weights_linear * x + self.bias_linear
        time_linear = tf.expand_dims(time_linear, axis=-1)
        time_periodic = tf.math.sin(tf.multiply(x, self.weights_periodic) + self.bias_periodic)
        time_periodic = tf.expand_dims(time_periodic, axis=-1)
        return tf.concat([time_linear, time_periodic], axis=-1)

def build_model(input_shape, head_size, num_heads, ff_dim, num_transformer_blocks, mlp_units, dropout, mlp_dropout):
    inputs = layers.Input(shape=input_shape)
    time_embedding = Time2Vector(seq_length)(inputs)
    x = layers.Concatenate(axis=-1)([inputs, time_embedding])
    x = layers.Dense(head_size)(x)  # Embedding to the same dimension as head_size
    for _ in range(num_transformer_blocks):
        x = TransformerBlock(head_size, num_heads, ff_dim, dropout)(x, training=True)

    x = layers.GlobalAveragePooling1D()(x)
    for dim in mlp_units:
        x = layers.Dense(dim, activation="relu")(x)
        x = layers.Dropout(mlp_dropout)(x)
    outputs = layers.Dense(input_shape[-1])(x)
    return Model(inputs, outputs)


input_shape = (seq_length, len(features))
model = build_model(input_shape, head_size=256, num_heads=4, ff_dim=4, num_transformer_blocks=4, mlp_units=[128], dropout=0.1, mlp_dropout=0.1)
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4), loss="mse")
model.summary()


## Entrenamiento del modelo

In [17]:
history = model.fit(
    X_train, y_train,
    validation_data=(X_valid, y_valid),
    epochs=20,
    batch_size=32,
    verbose=1
)


Epoch 1/20
[1m12967/79181[0m [32m━━━[0m[37m━━━━━━━━━━━━━━━━━[0m [1m5:03:49[0m 275ms/step - loss: 0.2600

KeyboardInterrupt: 

## Detección de Anomalías

In [4]:
def detect_anomalies(model, X_test, y_test, scaler, features, threshold_factor=2):
    predictions = model.predict(X_test)
    
    # Inverse transform to original scale
    y_test_inv = scaler.inverse_transform(y_test)
    predictions_inv = scaler.inverse_transform(predictions)
    
    errors = np.abs(y_test_inv - predictions_inv)
    threshold = threshold_factor * np.std(errors, axis=0)
    
    anomalies = (errors > threshold).any(axis=1)
    return anomalies, y_test_inv, predictions_inv, errors

anomalies, true_values, predictions, errors = detect_anomalies(model, X_test, y_test, scaler, features)

# Agregar las anomalías detectadas al DataFrame de prueba
test_data['anomaly_transformer'] = False
test_data.iloc[seq_length+1:, test_data.columns.get_loc('anomaly_transformer')] = anomalies

# Guardar resultados
test_data.to_csv('anomalies_transformer.csv')
print("Anomalías detectadas con Transformer guardadas en 'anomalies_transformer.csv'")


NameError: name 'model' is not defined

In [5]:
import matplotlib.pyplot as plt

# Cargar los datos
df_transformer = pd.read_csv('anomalies_transformer.csv', parse_dates=['timestamp'], index_col='timestamp')

plt.figure(figsize=(15, 10))

# Graficar precios
plt.plot(df_transformer.index, df_transformer['close'], label='Price')

# Anomalías Transformer
anomalies_transformer = df_transformer[df_transformer['anomaly_transformer']]
plt.scatter(anomalies_transformer.index, anomalies_transformer['close'], color='purple', label='Transformer Anomalies')

plt.xlabel('Time')
plt.ylabel('Price')
plt.title('Bitcoin Price Anomalies Detection')
plt.legend()
plt.show()


FileNotFoundError: [Errno 2] No such file or directory: 'anomalies_transformer.csv'