# Customer Complaint **Severity Classification** (TensorFlow)

**Objective:** Train a deep learning text classifier to predict complaint **severity** (`low`, `medium`, `high`) from narratives using **weak supervision** via TextBlob sentiment.

**Pipeline:** Load → Clean → Pseudo-label (TextBlob) → Tokenize/Pad → Train LSTM → Evaluate → Save.

**Inputs required:** path to the CSV with at least one text column named `narrative`.

> Note: This notebook does not build a dashboard.

In [None]:
# Install dependencies if needed. Uncomment when running locally.
# %pip install -q tensorflow==2.15.0.post1 pandas scikit-learn textblob matplotlib

import os, re, json
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import classification_report, confusion_matrix

import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Dense, Dropout, Bidirectional
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau

from textblob import TextBlob

print('TensorFlow:', tf.__version__)

## Configuration

In [None]:
# ==== User-configurable paths and hyperparameters ====
CSV_PATH = os.getenv('CSV_PATH', 'complaints_processed.csv')  # change if needed
TEXT_COLUMN = 'narrative'
MIN_CHARS = 20            # drop very short narratives
MAX_SAMPLES = None        # set int to subsample for quicker runs

# Tokenization
MAX_WORDS = 30000         # vocab size
MAX_LEN = 160             # sequence length

# Model
EMBED_DIM = 128
LSTM_UNITS = 96
DROPOUT = 0.3
BATCH_SIZE = 128
EPOCHS = 8                # increase if you have GPU/time

# Training
VAL_SIZE = 0.15
TEST_SIZE = 0.15
RANDOM_STATE = 42

# Output
MODEL_DIR = 'artifacts'
MODEL_BASENAME = 'severity_lstm_tf'
os.makedirs(MODEL_DIR, exist_ok=True)
print('CSV_PATH =', CSV_PATH)

## Load data

In [None]:
# Load CSV
df = pd.read_csv(CSV_PATH)
expected_cols = {TEXT_COLUMN}
missing = expected_cols - set(df.columns)
if missing:
    raise ValueError(f'Missing required columns: {missing}. Found: {list(df.columns)}')

# Basic cleaning: drop NA and very short entries
df = df.dropna(subset=[TEXT_COLUMN]).copy()
df[TEXT_COLUMN] = df[TEXT_COLUMN].astype(str).str.strip()
df = df[df[TEXT_COLUMN].str.len() >= MIN_CHARS]

# Optional subsample for speed
if MAX_SAMPLES is not None and len(df) > MAX_SAMPLES:
    df = df.sample(MAX_SAMPLES, random_state=RANDOM_STATE)

df.head()

## Normalize text

In [None]:
import re
_url_pat = re.compile(r'https?://\S+|www\.\S+')
_email_pat = re.compile(r'\S+@\S+')
_nonprint_pat = re.compile(r'[^\x00-\x7F]+')  # drop non-ascii to avoid tokenizer oddities

def clean_text(s: str) -> str:
    s = s.lower()
    s = _url_pat.sub(' ', s)
    s = _email_pat.sub(' ', s)
    s = _nonprint_pat.sub(' ', s)
    s = re.sub(r'[^a-z0-9\s\.\,\!\?\$]', ' ', s)
    s = re.sub(r'\s+', ' ', s).strip()
    return s

df['text'] = df[TEXT_COLUMN].apply(clean_text)
df[['text']].head()

## Weak supervision: map TextBlob polarity to severity

In [None]:
def polarity_to_severity(p: float, text: str) -> str:
    # Thresholds chosen by quick inspection; tweak as needed.
    # Add hard triggers for escalation keywords.
    t = text.lower()
    escalators = [
        'fraud','scam','lawsuit','legal','attorney','regulator',
        'chargeback','bbb','complaint filed','not resolved','escalate',
        'cancel my account','close my account','lost my money','stolen'
    ]
    if any(k in t for k in escalators) or p < -0.35:
        return 'high'
    if -0.35 <= p < 0.05:
        return 'medium'
    return 'low'

def map_severity_series(texts: pd.Series) -> pd.Series:
    pols = texts.apply(lambda x: TextBlob(x).sentiment.polarity)
    return pd.Series([
        polarity_to_severity(p, t) for p, t in zip(pols, texts)
    ], index=texts.index)

df['severity'] = map_severity_series(df['text'])
df['severity'].value_counts(normalize=True).mul(100).round(2)

## Encode labels

In [None]:
label2id = {'low':0,'medium':1,'high':2}
id2label = {v:k for k,v in label2id.items()}
df['label'] = df['severity'].map(label2id)
assert df['label'].isna().sum() == 0
df[['text','severity','label']].head()

## Train/validation/test split (stratified)

In [None]:
X_train, X_temp, y_train, y_temp = train_test_split(
    df['text'], df['label'], test_size=(VAL_SIZE + TEST_SIZE),
    random_state=RANDOM_STATE, stratify=df['label']
)
rel_test = TEST_SIZE / (VAL_SIZE + TEST_SIZE)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=rel_test, random_state=RANDOM_STATE,
    stratify=y_temp
)
print(len(X_train), len(X_val), len(X_test))

## Tokenize and pad sequences

In [None]:
tokenizer = Tokenizer(num_words=MAX_WORDS, oov_token='<OOV>')
tokenizer.fit_on_texts(X_train.tolist())

def to_seq(texts):
    return pad_sequences(tokenizer.texts_to_sequences(texts), maxlen=MAX_LEN, padding='post')

Xtr = to_seq(X_train)
Xv = to_seq(X_val)
Xte = to_seq(X_test)
ytr = np.array(y_train)
yv = np.array(y_val)
yte = np.array(y_test)

vocab_size = min(MAX_WORDS, len(tokenizer.word_index) + 1)
print('Vocab size:', vocab_size)

## Handle class imbalance with class weights

In [None]:
import numpy as np
classes = np.array(sorted(label2id.values()))
class_weights_arr = compute_class_weight(class_weight='balanced', classes=classes, y=ytr)
class_weights = {int(c): float(w) for c, w in zip(classes, class_weights_arr)}
class_weights

## Build LSTM model

In [None]:
model = Sequential([
    Embedding(input_dim=vocab_size, output_dim=EMBED_DIM, input_length=MAX_LEN),
    Bidirectional(LSTM(LSTM_UNITS, return_sequences=False)),
    Dropout(DROPOUT),
    Dense(64, activation='relu'),
    Dropout(DROPOUT),
    Dense(3, activation='softmax')
])

model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=3e-4),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)
model.summary()

## Train

In [None]:
ckpt_path = os.path.join(MODEL_DIR, f'{MODEL_BASENAME}.keras')
callbacks = [
    EarlyStopping(monitor='val_accuracy', patience=2, restore_best_weights=True),
    ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=1, verbose=1),
    ModelCheckpoint(ckpt_path, monitor='val_accuracy', save_best_only=True)
]

history = model.fit(
    Xtr, ytr,
    validation_data=(Xv, yv),
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    class_weight=class_weights,
    verbose=1,
    callbacks=callbacks
)

# Plot accuracy
plt.figure()
plt.plot(history.history['accuracy'], label='train_acc')
plt.plot(history.history['val_accuracy'], label='val_acc')
plt.xlabel('epoch'); plt.ylabel('accuracy'); plt.legend(); plt.title('Accuracy'); plt.show()

# Plot loss
plt.figure()
plt.plot(history.history['loss'], label='train_loss')
plt.plot(history.history['val_loss'], label='val_loss')
plt.xlabel('epoch'); plt.ylabel('loss'); plt.legend(); plt.title('Loss'); plt.show()

## Evaluate on test set

In [None]:
probs = model.predict(Xte, batch_size=512)
pred = probs.argmax(axis=1)
print(classification_report(yte, pred, target_names=[id2label[i] for i in range(3)]))
cm = confusion_matrix(yte, pred)
cm

## Save model, tokenizer, and label maps

In [None]:
# Save Keras model
model_out = os.path.join(MODEL_DIR, f'{MODEL_BASENAME}.keras')
model.save(model_out)

# Save tokenizer
import pickle
tok_out = os.path.join(MODEL_DIR, f'{MODEL_BASENAME}_tokenizer.pkl')
with open(tok_out, 'wb') as f:
    pickle.dump(tokenizer, f)

# Save label maps
with open(os.path.join(MODEL_DIR, f'{MODEL_BASENAME}_labels.json'), 'w') as f:
    json.dump({'label2id': label2id, 'id2label': id2label}, f)

print('Saved:', model_out)
print('Saved:', tok_out)

## Inference helper function

In [None]:
def predict_severity(texts):
    if isinstance(texts, str):
        texts = [texts]
    cleaned = [clean_text(t) for t in texts]
    seq = to_seq(cleaned)
    probs = model.predict(seq)
    preds = probs.argmax(axis=1)
    labels = [id2label[i] for i in preds]
    return list(zip(labels, probs.max(axis=1).round(4)))

# Example
predict_severity([
    'They stole my money and refuse to refund. I will file with the regulator.',
    'App login failed once but works now.',
    'Customer support is slow and I am disappointed.'
])

## Additional Visualizations

In [None]:
# 1. Class distribution visualization
import seaborn as sns
plt.figure(figsize=(5,4))
sns.countplot(x='severity', data=df, order=['low','medium','high'], palette='Blues')
plt.title('Severity Class Distribution')
plt.xlabel('Severity Level')
plt.ylabel('Count')
plt.show()

In [None]:
# 2. Confusion Matrix Heatmap
from sklearn.metrics import confusion_matrix
cm = confusion_matrix(yte, pred)
plt.figure(figsize=(5,4))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=[id2label[i] for i in range(3)],
            yticklabels=[id2label[i] for i in range(3)])
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix Heatmap')
plt.show()

In [None]:
# 3. Word Clouds per severity level
from wordcloud import WordCloud
for label in ['low','medium','high']:
    text_blob = ' '.join(df[df['severity']==label]['text'].tolist())
    wc = WordCloud(width=800, height=400, background_color='white').generate(text_blob)
    plt.figure(figsize=(8,4))
    plt.imshow(wc, interpolation='bilinear')
    plt.axis('off')
    plt.title(f'WordCloud - {label.upper()} severity')
    plt.show()