In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as pl
import openpyxl
#import plotly.express as px

In [None]:
# Read data
data = pd.concat((pd.read_excel('./Datos_Metagenetica.xlsx', sheet_name='El_cielo', engine='openpyxl'), pd.read_excel('./Datos_Metagenetica.xlsx', sheet_name='Chamela',engine='openpyxl')))
# replace 0 with NaN
data = data.replace(0, np.nan)
#drop columns que no se van a usar 
data = data.drop(['Database','.id', 'similarity', 'phylum_final', 
                  'class_final', 'subfamily_final', 'tribe_final',
                   'subspecies_final', 'BASE', 'OTU'], axis=1)
data

# Balance data

In [None]:
def balance_one_tax_data(df, col_tax_to_balance, tax_to_balance, max_samples) -> pd.DataFrame:
    '''
    df: dataframe with all data to balance
    col_tax_to_balance: column name of the tax to balance
    tax_to_balance: tax to balance
    max_samples: maximum number to save of each tax
    return: balanced dataframe
    '''
    col_index = df.columns.get_loc(col_tax_to_balance)
    if col_index + 1 < len(df.columns):
        next_tax_col = df.columns[col_index + 1]
        vc = df[df[col_tax_to_balance] == tax_to_balance][next_tax_col].value_counts()
        q25 = vc.quantile(0.25)
        selected_values = vc[vc > 4 * q25]
        balanced = pd.DataFrame(columns = df.columns)
        for i in selected_values.index:
            if vc[i] > max_samples:
                balanced = pd.concat((balanced, df[df[next_tax_col] == i].sample(max_samples)))
            else:
                balanced = pd.concat((balanced, df[df[next_tax_col] == i]))
    return balanced

print('Original data')
print(data.value_counts('family_final'))
print('Balanced data')
balance_one_tax_data(data, 'order_final', 'Diptera', 50).value_counts('family_final')

---
# Encoding & Concatenation

In [None]:
data['Sequence'] = data['Sequence'].apply(lambda x: x.upper())

In [None]:
def sequence_encoding(sequence):
    mapping = {"A": 0, "C": 1, "G": 2, "T": 3}
    encoded_sequence = [mapping[i] for i in sequence]
    return np.eye(4)[encoded_sequence]

In [None]:
elem0 = data['Sequence'].iloc[0]
elem1 = data['Sequence'].iloc[1]
elem0

In [None]:
enc0 = sequence_encoding(elem0)
enc1 = sequence_encoding(elem1)
enc0

In [None]:
enc1.shape

In [None]:
def side_by_side_sequence(seq1, seq2):
    return np.concatenate((seq1, seq2), axis=1)

In [None]:
side_by_side_seq = side_by_side_sequence(enc0,enc1)
side_by_side_seq.shape
#type(syde_by_side_seq)

In [None]:
side_by_side_seq

In [None]:
def deep_sequence(seq1, seq2):
    #s1 = seq1[np.newaxis, :, :]
    #s2 = seq2[np.newaxis, :, :]
    #sequence = np.concatenate((s1, s2), axis=0)
    sequence = np.dstack((seq1, seq2))
    return sequence

In [None]:
deep_seq = deep_sequence(enc0,enc1)
deep_seq.shape

In [None]:
deep_seq

---
# Combinaciones

In [None]:
datos = {'Col1': ['C', 'A', 'R','M', 'E', 'N'],
        'Col2': ['S','S','M','S','M','M']}

df = pd.DataFrame(datos)
#df

In [None]:
import itertools

def combination_list(dataframe,column):
    sequences = dataframe[column].tolist()
    combinations = list(itertools.combinations(sequences, 2))
    return combinations

In [None]:
combinations = combination_list(df,'Col1')
#combinations

In [None]:
def combination_matrix(df,sequence_column,tax_column):
    combinaciones = []
    for sequence1, sequence2 in combination_list(df,sequence_column):
        z = zip(df[df[sequence_column] == sequence1][tax_column], df[df[sequence_column] == sequence2][tax_column])
        for clase1, clase2 in z:
            combinaciones.append([sequence1, sequence2, clase1, clase2])

    df_combinaciones = pd.DataFrame(combinaciones, columns=['Sequence1', 'Sequence2', 'Tax1', 'Tax2'])
    return df_combinaciones

In [None]:
nuevo_df = combination_matrix(df,'Col1','Col2')
#nuevo_df

In [None]:
def tax_comparison(dataframe, tax1, tax2):
    dataframe['Same'] = dataframe[tax1] == dataframe[tax2]
    return dataframe

In [None]:
input_matrix = tax_comparison(nuevo_df,'Tax1','Tax2')
input_matrix

---
# Final Matrix

In [None]:
from tensorflow.keras.preprocessing.sequence import pad_sequences

def final_matrix(data,concat_type):
    
    #Pasamos a letras mayúsculas las cadenas
    data['Sequence'] = data['Sequence'].apply(lambda x: x.upper())
    #Llamamos a la función para filtrar los datos
    balanced_data = balance_one_tax_data(data, 'order_final', 'Diptera', 50)
    #Llamamos a la función para realizar todas las combinaciones
    combinations = combination_matrix(balanced_data,'Sequence',"family_final")
    #Llamamos a la función para determinar si los Taxones son iguales
    DNA_matrix = tax_comparison(combinations,'Tax1','Tax2')
    #Pasamos los valores boolean a integer
    DNA_matrix['Same'] = DNA_matrix['Same'].astype(int)
    
    DNA_matrix['Paired_seq'] = ''  # Creamos una columna vacía para almacenar los resultados
    for index, row in DNA_matrix.iterrows():
        sequence1 = row['Sequence1']
        sequence2 = row['Sequence2']
        encoding1 = sequence_encoding(sequence1)
        encoding2 = sequence_encoding(sequence2)
        vectors_padded = pad_sequences([encoding1, encoding2], padding='post')
        paired_sequences = concat_type(vectors_padded[0], vectors_padded[0])
        DNA_matrix.at[index, 'Paired_seq'] = paired_sequences
    return DNA_matrix

In [None]:
final_matrix = final_matrix(data,side_by_side_sequence)
final_matrix

In [None]:
final_matrix.dtypes

---
# Data preparation for CNN

In [None]:
from sklearn.model_selection import train_test_split

# Obtenemos las secuencias de ADN en una variable 'X' y las etiquetas en una variable 'y'
DNA = np.array(list(final_matrix.loc[:, 'Paired_seq'])) # dim = (31626,418,8)
labels = np.array(list(final_matrix.loc[:, 'Same'])) # dim = (31626,1)
#X

In [None]:
# Divide los datos en conjuntos de entrenamiento y prueba (80% para entrenamiento, 20% para prueba)
DNA_train, DNA_test, labels_train, labels_test = train_test_split(
    DNA, labels, test_size=0.20, random_state=42)

print("Forma de X_train:", DNA_train.shape)
print("Forma de y_train:", labels_train.shape)

print("Forma de X_test:", DNA_test.shape)
print("Forma de y_test:", labels_test.shape)

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense
from tensorflow.keras.activations import relu, sigmoid
from tensorflow.keras.optimizers import SGD

"""
CNN = Sequential([
    #Input(shape=(418, 4, 2)),

    Conv1D(filters=32, kernel_size=3, activation=relu, padding='same', input_shape=(418,4,2)),
    MaxPooling1D(pool_size=2),

    Conv1D(filters=64, kernel_size=3, activation=relu),
    MaxPooling1D(pool_size=2),

    Flatten(),
    Dense(128, activation='relu'),
    Dense(1, activation='sigmoid')
])

CNN.summary()
"""
model = Sequential()
model.add(Conv1D(filters=32, kernel_size=3, activation=relu, input_shape=(418, 8)))
model.add(MaxPooling1D(pool_size=2))
model.add(Conv1D(filters=64, kernel_size=3, activation=relu))
model.add(MaxPooling1D(pool_size=2))
model.add(Flatten())
model.add(Dense(128, activation=relu))
model.add(Dense(1, activation=sigmoid))

model.summary()

In [None]:
# Compile: Define training parameters

epochs = 50
lrate = 0.001
decay = lrate / epochs
optim = SGD(learning_rate = lrate, momentum = 0.90, decay = decay, nesterov = True)
model.compile(loss='binary_crossentropy', optimizer=optim, metrics=['binary_accuracy'])

In [None]:
BATCHES = final_matrix.shape[0] // 64

model.fit(DNA_train, labels_train, batch_size=BATCHES, epochs=epochs, verbose=2, validation_split=0.30)