In [1]:
# Import libraries
import pandas as pd
import numpy as np
import re
import pickle
import spacy
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import nltk
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.models import Model
from tensorflow.keras.layers import (Dense, Dropout, LSTM, Embedding, 
                                    Bidirectional, Conv1D, MaxPooling1D, 
                                    Input, concatenate, GlobalMaxPooling1D)
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.regularizers import l2
from tensorflow.keras.utils import to_categorical
import keras_tuner as kt
import gradio as gr
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import TruncatedSVD
from textblob import TextBlob
import contractions

# Download NLTK data
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('omw-1.4')

# Load spaCy for advanced NLP features
nlp = spacy.load('en_core_web_sm', disable=['parser', 'ner'])





[nltk_data] Downloading package punkt to
[nltk_data]     /Users/lokeshbudda/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/lokeshbudda/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/lokeshbudda/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to
[nltk_data]     /Users/lokeshbudda/nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!


In [4]:
# Enhanced text preprocessing
def enhanced_clean_text(text):
    """Comprehensive text cleaning with multiple pattern removal and normalization"""
    # Remove URLs
    text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
    # Remove HTML tags
    text = re.sub(r'<.*?>', '', text)
    # Remove MBTI type patterns
    text = re.sub(r'\b[A-Z]{4}\b', '', text)
    # Remove special characters except basic punctuation
    text = re.sub(r'[^a-zA-Z\s.,!?]', '', text)
    # Expand contractions
    text = contractions.fix(text)
    # Convert to lowercase
    text = text.lower()
    # Remove extra whitespace
    text = ' '.join(text.split())
    # Correct spelling (basic correction)
    # text = str(TextBlob(text).correct())
    return text

def advanced_tokenization(text):
    # Process with spaCy for better lemmatization
    doc = nlp(text)
    # Keep only nouns, adjectives, verbs, and adverbs
    allowed_pos = {'NOUN', 'ADJ', 'VERB', 'ADV'}
    tokens = [token.lemma_ for token in doc if token.pos_ in allowed_pos and not token.is_stop]
    
    return tokens

def extract_text_features(df):
    # Sentiment analysis
    df['sentiment'] = df['cleaned'].apply(lambda x: TextBlob(x).sentiment.polarity)
    
    # Readability scores
    def flesch_reading_ease(text):
        sentences = text.count('.') + text.count('!') + text.count('?')
        words = len(text.split())
        syllables = sum([len(re.findall(r'[aeiouy]+', word.lower())) for word in text.split()])
        if sentences == 0 or words == 0:
            return 0
        return 206.835 - 1.015*(words/sentences) - 84.6*(syllables/words)
    
    df['readability'] = df['cleaned'].apply(flesch_reading_ease)
    
    # Word and character counts
    df['word_count'] = df['cleaned'].apply(lambda x: len(x.split()))
    df['char_count'] = df['cleaned'].apply(len)
    
    return df

def load_data(max_seq_len=200):
    df = pd.read_csv('../mbti_1.csv')
    
    # Enhanced cleaning
    df['cleaned'] = df['posts'].apply(enhanced_clean_text)
    
    # Extract additional text features
    df = extract_text_features(df)
    
    # Advanced tokenization
    df['tokens'] = df['cleaned'].apply(advanced_tokenization)
    
    # Limit sequence length
    df['tokens'] = df['tokens'].apply(lambda x: x[:max_seq_len])
    
    return df

def load_glove_embeddings(file_path, vocab, embedding_dim=100):
    embeddings = {}
    with open(file_path, encoding='utf-8') as f:
        for line in f:
            values = line.split()
            word = values[0]
            vector = np.array(values[1:], dtype='float32')
            embeddings[word] = vector
    
    # Build embedding matrix with +1 for padding token
    embedding_matrix = np.zeros((len(vocab) + 1, embedding_dim))
    found = 0
    for word, i in vocab.items():
        if word in embeddings:
            embedding_matrix[i] = embeddings[word]
            found += 1
        else:
            # Initialize unknown words with random values
            embedding_matrix[i] = np.random.normal(scale=0.6, size=(embedding_dim,))
    
    print(f"Found embeddings for {found}/{len(vocab)} words ({100*found/len(vocab):.2f}%)")
    return embedding_matrix

def prepare_data(df, max_seq_len=200):
    all_tokens = [token for tokens in df['tokens'] for token in tokens]
    vocab = {word: i+1 for i, word in enumerate(set(all_tokens))}
    
    # Convert tokens to sequences of indices
    X_sequences = [[vocab.get(token, 0) for token in tokens] for tokens in df['tokens']]
    X_padded = np.array([seq + [0]*(max_seq_len - len(seq)) if len(seq) < max_seq_len else seq[:max_seq_len] 
                         for seq in X_sequences])
    
    # Prepare MLP features (TF-IDF + SVD + additional features)
    tfidf = TfidfVectorizer(max_features=5000, ngram_range=(1, 2))
    tfidf_features = tfidf.fit_transform(df['cleaned'])
    svd = TruncatedSVD(n_components=100)
    tfidf_svd = svd.fit_transform(tfidf_features)
    
    # Combine with additional features
    additional_features = df[['sentiment', 'readability', 'word_count', 'char_count']].values
    mlp_features = np.concatenate([tfidf_svd, additional_features], axis=1)
    
    # Normalize features
    from sklearn.preprocessing import StandardScaler
    scaler = StandardScaler()
    mlp_features = scaler.fit_transform(mlp_features)
    
    y = df['type']
    le = LabelEncoder()
    y_encoded = le.fit_transform(y)
    y_categorical = to_categorical(y_encoded)
    
    # Split data
    X_train, X_test, X_mlp_train, X_mlp_test, y_train, y_test = train_test_split(
        X_padded, mlp_features, y_categorical, test_size=0.2, stratify=y_encoded, random_state=42
    )
    
    return X_train, X_test, X_mlp_train, X_mlp_test, y_train, y_test, vocab, le, tfidf, svd, scaler

def build_hybrid_model(hp, vocab_size, embedding_matrix, embedding_dim=100, max_seq_len=200):
    """Build an enhanced hybrid CNN-LSTM-MLP model with hyperparameter tuning"""
    # Sequence input branch (CNN-LSTM)
    seq_input = Input(shape=(max_seq_len,))
    
    embedding_layer = Embedding(
        input_dim=vocab_size + 1,
        output_dim=embedding_dim,
        input_length=max_seq_len,
        weights=[embedding_matrix],
        trainable=hp.Boolean('trainable_embedding', default=True)  # Now trainable
    )(seq_input)
    
    # CNN layers with more options
    conv1 = Conv1D(
        filters=hp.Int('conv_filters1', 64, 256, step=64),
        kernel_size=hp.Int('kernel_size1', 3, 7),
        activation='relu',
        padding='same',
        kernel_regularizer=l2(hp.Float('conv_l2', 1e-5, 1e-3, sampling='log'))
    )(embedding_layer)
    pool1 = MaxPooling1D(pool_size=2)(conv1)
    
    conv2 = Conv1D(
        filters=hp.Int('conv_filters2', 32, 256, step=32),
        kernel_size=hp.Int('kernel_size2', 2, 5),
        activation='relu',
        padding='same'
    )(pool1)
    pool2 = MaxPooling1D(pool_size=2)(conv2)
    
    # Bidirectional LSTM with more options
    lstm = Bidirectional(LSTM(
        units=hp.Int('lstm_units', 64, 512, step=64),
        return_sequences=False,
        dropout=hp.Float('lstm_dropout', 0.1, 0.5),
        recurrent_dropout=hp.Float('recurrent_dropout', 0.1, 0.3),
        kernel_regularizer=l2(hp.Float('lstm_l2', 1e-5, 1e-3, sampling='log'))
    ))(pool2)
    
    # MLP input branch
    mlp_input = Input(shape=(104,))  # 100 (SVD) + 4 (additional features)
    
    # Enhanced MLP layers
    mlp_layer = Dense(
        units=hp.Int('mlp_units1', 128, 1024, step=128),
        activation='relu',
        kernel_regularizer=l2(hp.Float('mlp_l2', 1e-5, 1e-3, sampling='log'))
    )(mlp_input)
    mlp_layer = Dropout(hp.Float('mlp_dropout1', 0.2, 0.5))(mlp_layer)
    
    mlp_layer = Dense(
        units=hp.Int('mlp_units2', 64, 512, step=64),
        activation='relu'
    )(mlp_layer)
    mlp_layer = Dropout(hp.Float('mlp_dropout2', 0.1, 0.3))(mlp_layer)
    
    # Combine features with attention
    combined = concatenate([lstm, mlp_layer])
    
    # Additional dense layers with skip connections
    dense = Dense(
        units=hp.Int('dense_units1', 256, 1024, step=128),
        activation='relu'
    )(combined)
    dense = Dropout(hp.Float('dense_dropout1', 0.2, 0.5))(dense)
    
    dense = Dense(
        units=hp.Int('dense_units2', 128, 512, step=64),
        activation='relu'
    )(dense)
    dense = Dropout(hp.Float('dense_dropout2', 0.1, 0.3))(dense)
    
    # Output layer
    output = Dense(16, activation='softmax')(dense)
    
    # Create model
    model = Model(inputs=[seq_input, mlp_input], outputs=output)
    
    # Enhanced optimizer with more options
    optimizer = Adam(
        learning_rate=hp.Choice('learning_rate', [1e-4, 3e-4, 1e-3, 3e-3]),
        clipvalue=hp.Float('clipvalue', 0.1, 1.0),
        beta_1=hp.Float('beta_1', 0.8, 0.99),
        beta_2=hp.Float('beta_2', 0.9, 0.999)
    )
    
    model.compile(
        optimizer=optimizer,
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    
    return model

In [5]:
def main():
    # Parameters
    max_seq_len = 200  # Increased sequence length
    embedding_dim = 100
    
    # Load and prepare data
    print("Loading and preprocessing data...")
    df = load_data(max_seq_len)
    (X_train, X_test, X_mlp_train, X_mlp_test, 
     y_train, y_test, vocab, le, tfidf, svd, scaler) = prepare_data(df, max_seq_len)
    
    # Load GloVe embeddings
    print("Loading GloVe embeddings...")
    glove_path = '../glove.6B/glove.6B.100d.txt'  # Update with your path
    glove = load_glove_embeddings(glove_path, vocab, embedding_dim)
    
    # Hyperparameter tuning
    print("Starting hyperparameter tuning...")
    tuner = kt.RandomSearch(
        lambda hp: build_hybrid_model(hp, len(vocab), glove, embedding_dim, max_seq_len),
        objective='val_accuracy',
        max_trials=10,
        executions_per_trial=1,
        directory='tuner_dir',
        project_name='mbti_hybrid_enhanced'
    )
    
    tuner.search(
        [X_train, X_mlp_train],
        y_train,
        epochs=50,
        batch_size=128,
        validation_split=0.2,
        callbacks=[
            EarlyStopping(patience=5, restore_best_weights=True),
            ReduceLROnPlateau(factor=0.1, patience=3)
        ],
        verbose=2
    )
    
    # Get best model
    best_model = tuner.get_best_models(num_models=1)[0]
    
    # Evaluate
    test_loss, test_acc = best_model.evaluate([X_test, X_mlp_test], y_test)
    print(f"Test Accuracy: {test_acc:.4f}")
    
    # Save model and assets
    best_model.save('mbti_enhanced_model.h5')
    with open('label_encoder.pkl', 'wb') as f:
        pickle.dump(le, f)
    with open('vocab.pkl', 'wb') as f:
        pickle.dump(vocab, f)
    with open('tfidf.pkl', 'wb') as f:
        pickle.dump(tfidf, f)
    with open('svd.pkl', 'wb') as f:
        pickle.dump(svd, f)
    with open('scaler.pkl', 'wb') as f:
        pickle.dump(scaler, f)
    
    # Gradio interface
    def predict(text):
        # Clean and tokenize text
        cleaned = enhanced_clean_text(text)
        tokens = advanced_tokenization(cleaned)[:max_seq_len]
        sequence = [vocab.get(token, 0) for token in tokens]
        padded_seq = np.array([sequence + [0]*(max_seq_len - len(sequence)) or sequence])
        
        # Prepare MLP features
        tfidf_features = tfidf.transform([cleaned])
        svd_features = svd.transform(tfidf_features)
        
        # Additional features
        sentiment = TextBlob(cleaned).sentiment.polarity
        readability = flesch_reading_ease(cleaned)
        word_count = len(cleaned.split())
        char_count = len(cleaned)
        additional_features = np.array([[sentiment, readability, word_count, char_count]])
        
        # Combine and scale features
        mlp_features = np.concatenate([svd_features, additional_features], axis=1)
        mlp_features = scaler.transform(mlp_features)
        
        # Predict
        proba = best_model.predict([padded_seq, mlp_features], verbose=0)[0]
        return {le.classes_[i]: float(proba[i]) for i in range(16)}
    
    # Gradio interface
    iface = gr.Interface(
        fn=predict,
        inputs=gr.Textbox(label="Enter your text"),
        outputs=gr.Label(num_top_classes=4),
        title="Enhanced MBTI Personality Predictor",
        description="This model uses advanced NLP techniques to predict MBTI personality types from text."
    )
    iface.launch()

if __name__ == "__main__":
    main()

Trial 10 Complete [01h 55m 45s]
val_accuracy: 0.39769452810287476

Best val_accuracy So Far: 0.4135446548461914
Total elapsed time: 09h 03m 29s


  saveable.load_own_variables(weights_store.get(inner_path))


[1m55/55[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 85ms/step - accuracy: 0.4237 - loss: 1.8257




Test Accuracy: 0.4081


TypeError: No method for generating JsonSchema for core_schema.type='invalid' (expected: GenerateJsonSchema.invalid_schema)