In [None]:
import os
import pandas as pd
import numpy as np
import re
import random
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers, losses, metrics
import tensorflow.keras.backend as K

# -------------------------- Reproducibility --------------------------
tf.random.set_seed(42)
np.random.seed(42)
random.seed(42)

# -------------------------- 1. DATA LOADING --------------------------
def load_legal_clause_dataset(root_path: str):
    data = []
    for filename in os.listdir(root_path):
        if filename.endswith(".csv"):
            category = filename.replace(".csv", "")
            fp = os.path.join(root_path, filename)
            df = pd.read_csv(fp)
            text_col = next((c for c in df.columns if c.lower() in ["clause", "text", "clauses", "content"]), df.columns[0])
            texts = df[text_col].astype(str).tolist()
            data.extend([(text, category) for text in texts])
    print(f"Loaded {len(data)} clauses from {len([f for f in os.listdir(root_path) if f.endswith('.csv')])} categories.")
    return data

def clean_text(text: str) -> str:
    text = text.lower()
    text = re.sub(r'[^a-z0-9\s\.,;:\(\)\[\]\{\}/\\\'"-]', ' ', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text

DATASET_PATH = r"C:\Users\dilaw\.cache\kagglehub\datasets\bahushruth\legalclausedataset\versions\1"
raw_data = load_legal_clause_dataset(DATASET_PATH)
cleaned_data = [(clean_text(text), label) for text, label in raw_data]

texts_only = [t for t, _ in cleaned_data]
labels_only = [l for _, l in cleaned_data]

# -------------------------- 2. TOKENIZATION --------------------------
VOCAB_SIZE = 30000
MAX_LEN = 256

tokenizer = tf.keras.preprocessing.text.Tokenizer(
    num_words=VOCAB_SIZE,
    oov_token="<OOV>",
    lower=True
)
tokenizer.fit_on_texts(texts_only)

X = tokenizer.texts_to_sequences(texts_only)
X = tf.keras.preprocessing.sequence.pad_sequences(X, maxlen=MAX_LEN, padding='post', truncating='post')

label_encoder = LabelEncoder()
y_cat = label_encoder.fit_transform(labels_only)
y = tf.keras.utils.to_categorical(y_cat)
num_classes = len(label_encoder.classes_)

print(f"Vocabulary size: {len(tokenizer.word_index)}")
print(f"Classes: {num_classes}")

# -------------------------- 3. CREATE PAIRS --------------------------
def create_pairs(X, y, num_pairs=200_000):
    print("Creating positive/negative pairs...")
    pairs, labels = [], []
    y_arg = np.argmax(y, axis=1)
    label_dict = {}
    for idx, lbl in enumerate(y_arg):
        label_dict.setdefault(lbl, []).append(idx)

    for _ in range(num_pairs):
        if random.random() < 0.5:  # positive
            lbl = random.choice(list(label_dict.keys()))
            if len(label_dict[lbl]) < 2:
                continue
            i, j = random.sample(label_dict[lbl], 2)
            pairs.append((X[i], X[j]))
            labels.append(1)
        else:  # negative
            lbl1, lbl2 = random.sample(list(label_dict.keys()), 2)
            i = random.choice(label_dict[lbl1])
            j = random.choice(label_dict[lbl2])
            pairs.append((X[i], X[j]))
            labels.append(0)

    pairs = np.array(pairs)
    labels = np.array(labels)
    print(f"Created {len(pairs)} pairs.")
    return pairs[:, 0], pairs[:, 1], labels

X1, X2, pair_labels = create_pairs(X, y, num_pairs=200_000)
X1_train, X1_val, X2_train, X2_val, y_train, y_val = train_test_split(
    X1, X2, pair_labels, test_size=0.2, stratify=pair_labels, random_state=42
)

# -------------------------- 4. DATASET --------------------------
def create_pair_dataset(X1, X2, y, batch_size=64):
    def pair_generator():
        n = len(X1)
        idx = np.arange(n)
        while True:
            np.random.shuffle(idx)
            for i in range(0, n, batch_size):
                j = idx[i:i+batch_size]
                yield {'anchor': X1[j], 'other': X2[j]}, y[j]

    output_signature = (
        {'anchor': tf.TensorSpec(shape=(None, MAX_LEN), dtype=tf.int32),
         'other': tf.TensorSpec(shape=(None, MAX_LEN), dtype=tf.int32)},
        tf.TensorSpec(shape=(None,), dtype=tf.int32)
    )

    ds = tf.data.Dataset.from_generator(pair_generator, output_signature=output_signature)
    ds = ds.prefetch(tf.data.AUTOTUNE)
    return ds

train_ds = create_pair_dataset(X1_train, X2_train, y_train)
val_ds = create_pair_dataset(X1_val, X2_val, y_val)

# -------------------------- 5. MODEL BUILDERS --------------------------
EMBED_DIM = 128
DROPOUT = 0.3

def get_embedding():
    return layers.Embedding(VOCAB_SIZE, EMBED_DIM, mask_zero=True)

class BiLSTMEncoder(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.emb = get_embedding()
        self.bilstm = layers.Bidirectional(layers.LSTM(128, dropout=DROPOUT, return_sequences=False))
        self.dense = layers.Dense(64, activation='relu')
        self.drop = layers.Dropout(DROPOUT)
    def call(self, x):
        x = self.emb(x)
        x = self.bilstm(x)
        x = self.dense(x)
        return self.drop(x)

class AttentionEncoder(tf.keras.Model):
    def __init__(self, heads=8):
        super().__init__()
        self.emb = get_embedding()
        self.mha = layers.MultiHeadAttention(num_heads=heads, key_dim=EMBED_DIM//heads)
        self.norm1 = layers.LayerNormalization(epsilon=1e-6)
        self.ffn = models.Sequential([layers.Dense(256, activation='relu'), layers.Dense(EMBED_DIM)])
        self.norm2 = layers.LayerNormalization(epsilon=1e-6)
        self.pool = layers.GlobalAveragePooling1D()
        self.out = layers.Dense(64, activation='relu')
        self.drop = layers.Dropout(DROPOUT)
    def call(self, x):
        e = self.emb(x)
        a = self.mha(e, e)
        x = self.norm1(e + a)
        f = self.ffn(x)
        x = self.norm2(x + f)
        x = self.pool(x)
        x = self.out(x)
        return self.drop(x)

def build_siamese(encoder_class):
    input_a = layers.Input(shape=(MAX_LEN,), name='anchor')
    input_b = layers.Input(shape=(MAX_LEN,), name='other')
    encoder = encoder_class()
    enc_a = encoder(input_a)
    enc_b = encoder(input_b)
    distance = layers.Lambda(lambda t: K.abs(t[0] - t[1]))([enc_a, enc_b])
    output = layers.Dense(1, activation='sigmoid')(distance)
    model = models.Model(inputs={'anchor': input_a, 'other': input_b}, outputs=output)
    return model

bilstm_model = build_siamese(BiLSTMEncoder)
attn_model = build_siamese(AttentionEncoder)

# -------------------------- 6. TRAIN FUNCTION --------------------------
def train_model(model, name, epochs=3, steps_train=200, steps_val=50):
    model.compile(
        optimizer=optimizers.Adam(1e-3),
        loss=losses.BinaryCrossentropy(),
        metrics=['accuracy']
    )
    print(f"Training {name}...")
    history = model.fit(
        train_ds,
        validation_data=val_ds,
        steps_per_epoch=steps_train,
        validation_steps=steps_val,
        epochs=epochs
    )
    model.save_weights(f"{name}_weights.h5")
    return history

# -------------------------- 7. TRAIN BOTH --------------------------
bilstm_hist = train_model(bilstm_model, "BiLSTM", epochs=3)
attn_hist = train_model(attn_model, "Attention", epochs=3)


Loaded 150881 clauses from 395 categories.
Vocabulary size: 45462
Classes: 395
Creating positive/negative pairs...
Created 200000 pairs.




Training BiLSTM...
Epoch 1/3
[1m 76/200[0m [32m━━━━━━━[0m[37m━━━━━━━━━━━━━[0m [1m13:18[0m 6s/step - accuracy: 0.5047 - loss: 0.6926