In [None]:
import pandas as pd
import numpy as np
import spacy
import re
from typing import List, Dict, Tuple, Set, Union
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import Model
import json

nlp = spacy.load("en_core_web_sm")
def tokenize(text:str)->List[str]:
    """
    Tokenize text using spacy.
    """
    text = text.lower()
    text = re.sub(r"[^a-zA-Z0-9]", " ", text)
    #remove numbers and punctuation
    text = re.sub(r"[0-9]", "", text)
    doc = nlp(text)
    return [token.text for token in doc]

def create_tokenmap(tokens:List[str])->Dict[str, int]:
    """
    Create a token map for the given tokens.
    """
    tokenmap = {}
    for token in tokens:
        if token not in tokenmap:
            tokenmap[token] = 1
        else:
            tokenmap[token] += 1
    return tokenmap

def assemble_token_maps(
    token_maps:List[Dict[str,int]]
    )->Dict[Dict[Union[str,int],Union(str,int)]]:
    """
    Assemble all token maps into one.
    """
    tokenmap = {}
    for token_map in token_maps:
        for token in token_map:
            if token not in tokenmap:
                tokenmap[token] = token_map[token]
            else:
                tokenmap[token] += token_map[token]
    
    tok_to_idx = {"<PAD>": 0, "<UNK>": 1}
    idx_to_tok = {0: "<PAD>", 1: "<UNK>"}
    idx_to_cnt = {0: 0, 1: 0}
    for idx, token in enumerate(tokenmap):
        tok_to_idx[token] = idx
        idx_to_tok[idx] = token
        idx_to_cnt[idx] = tokenmap[token]

    tokenmap = {
        "tok_to_idx": tok_to_idx,
        "idx_to_tok": idx_to_tok,
        "idx_to_cnt": idx_to_cnt
    }
    return tokenmap
    
def preprocess_text(
    text:str,tokenmap:Dict[Dict[Union[str,int],Union(str,int)]]
    )->List[int]:
    """
    tokenize text and replace words with indices.
    """
    text = [tokenmap["tok_to_idx"].get(token, 1) for token in text]
    return text


def test_train_split(
    df:pd.DataFrame, test_size:float=0.2, random_state:int=42
    )->Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Split data into train and test set.
    """
    df = df.sample(frac=1, random_state=random_state)
    test_size = int(len(df) * test_size)
    df_train = df[:-test_size]
    df_test = df[-test_size:]
    return df_train, df_test

def create_dataset(
    df:pd.DataFrame, tokenmap:Dict[Dict[Union[str,int],Union(str,int)]]
    )->Tuple[List[List[int]], List[int]]:
    """
    Create dataset from dataframe.
    """
    X = []
    y = []
    for _, row in df.iterrows():
        text = row["text"]
        label = row["label"]
        text = preprocess_text(text, tokenmap)
        X.append(text)
        y.append(label)
    return X, y

def pad_sequences(
    sequences:List[List[int]], maxlen:int, padding:str="post", truncating:str="post"
    )->List[List[int]]:
    """
    Pad sequences to the same length.
    """
    padded = []
    for sequence in sequences:
        if len(sequence) > maxlen:
            if truncating == "post":
                sequence = sequence[:maxlen]
            elif truncating == "pre":
                sequence = sequence[-maxlen:]
        elif len(sequence) < maxlen:
            if padding == "post":
                sequence = sequence + [0] * (maxlen - len(sequence))
            elif padding == "pre":
                sequence = [0] * (maxlen - len(sequence)) + sequence
        padded.append(sequence)
    return padded

def create_embedding_matrix(
    tokenmap:Dict[Dict[Union[str,int],Union(str,int)]], embedding_dim:int
    )->np.ndarray:
    """
    Create embedding matrix from glove embeddings.
    """
    embedding_matrix = np.zeros((len(tokenmap["tok_to_idx"]), embedding_dim))
    with open("glove.6B.100d.txt", "r") as f:
        for line in f:
            values = line.split()
            word = values[0]
            if word in tokenmap["tok_to_idx"]:
                idx = tokenmap["tok_to_idx"][word]
                embedding_matrix[idx] = np.asarray(values[1:], dtype="float32")
    return embedding_matrix

def init_rnn(
    embedding_dim:int, vocab_size:int, embedding_matrix:np.ndarray, 
    rnn_units:int, batch_size:int
    )->Tuple[tf.keras.Model, tf.keras.Model]:
    """
    Initialize RNN model.
    """
    #encoder
    encoder = tf.keras.Sequential([
        tf.keras.layers.Embedding(
            vocab_size, embedding_dim, 
            weights=[embedding_matrix], trainable=False
        ),
        tf.keras.layers.Bidirectional(
            tf.keras.layers.LSTM(rnn_units, return_sequences=True)
        ),
        tf.keras.layers.Bidirectional(
            tf.keras.layers.LSTM(rnn_units, return_sequences=True)
        ),
        tf.keras.layers.Bidirectional(
            tf.keras.layers.LSTM(rnn_units)
        )
    ])
    #decoder
    decoder = tf.keras.Sequential([
        tf.keras.layers.Dense(1, activation="sigmoid")
    ])
    return encoder, decoder

def loss_function(
    y_true:tf.Tensor, y_pred:tf.Tensor
    )->tf.Tensor:
    """
    Custom loss function.
    """
    loss = tf.keras.losses.BinaryCrossentropy(
        from_logits=False, reduction=tf.keras.losses.Reduction.NONE
    )
    loss = loss(y_true, y_pred)
    return loss

def train_step(
    encoder:tf.keras.Model, decoder:tf.keras.Model, 
    optimizer:tf.keras.optimizers.Optimizer, 
    x:tf.Tensor, y_true:tf.Tensor
    )->Tuple[tf.Tensor, tf.Tensor]:
    """
    Train step.
    """
    with tf.GradientTape() as tape:
        y_pred = decoder(encoder(x))
        loss = loss_function(y_true, y_pred)
    gradients = tape.gradient(loss, encoder.trainable_variables + decoder.trainable_variables)
    optimizer.apply_gradients(zip(gradients, encoder.trainable_variables + decoder.trainable_variables))
    return loss, y_pred


def main():
    df = pd.read_csv("data.csv")
    df_train, df_test = test_train_split(df)
    df_train.to_csv("train.csv", index=False)
    df_test.to_csv("test.csv", index=False)
    tokens = []
    for _, row in df_train.iterrows():
        text = row["text"]
        text = tokenize(text)
        tokens.append(text)
    tokenmap = create_token_maps(tokens)
    with open("token map.json", "w") as f:
        json.dump(tokenmap, f)
    X_train, y_train = create_dataset(df_train, tokenmap)
    X_test, y_test = create_dataset(df_test, tokenmap)
    X_train = pad_sequences(X_train, maxlen=100)
    X_test = pad_sequences(X_test, maxlen=100)
    embedding_matrix = create_embedding_matrix(tokenmap, embedding_dim=100)
    np.save("embedding_matrix.npy", embedding_matrix)
    with open("train.json", "w") as f:
        json.dump({"X": X_train, "y": y_train}, f)
    with open("test.json", "w") as f:
        json.dump({"X": X_test, "y": y_test}, f)

if __name__ == "__main__":
    main()
