In [None]:
# SEO Content Quality & Duplicate Detector - Complete Pipeline
# Data Science Assignment

# ============================================================================
# SECTION 1: IMPORTS AND SETUP
# ============================================================================
%pip install textstat sentence-transformers beautifulsoup4 requests scikit-learn matplotlib seaborn
%pip install --upgrade pandas numpy
import pandas as pd
import numpy as np
import re
import json
import warnings
from bs4 import BeautifulSoup
import requests
from time import sleep
from urllib.parse import urlparse
import os

# NLP and Feature Engineering
import textstat
from sklearn.feature_extraction.text import TfidfVectorizer
from sentence_transformers import SentenceTransformer

# Machine Learning
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, f1_score
from sklearn.metrics.pairwise import cosine_similarity

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

warnings.filterwarnings('ignore')

print("All libraries imported successfully!")
print(f"Pandas version: {pd.__version__}")


In [None]:
# ============================================================================
# SECTION 2: DATA LOADING AND HTML PARSING
# ============================================================================

def parse_html_content(html_content):
    """
    Parse HTML content and extract meaningful text.

    Args:
        html_content (str): Raw HTML content

    Returns:
        dict: Dictionary with title and body_text
    """
    try:
        soup = BeautifulSoup(html_content, 'html.parser')

        # Remove script and style elements
        for script in soup(["script", "style", "nav", "footer", "header"]):
            script.decompose()

        # Extract title
        title = soup.title.string if soup.title else ""
        title = title.strip() if title else ""

        # Extract body text from main content areas
        body_text = ""

        # Priority order for content extraction
        content_tags = soup.find_all(['article', 'main', 'div', 'p'])

        if content_tags:
            body_text = ' '.join([tag.get_text() for tag in content_tags])
        else:
            body_text = soup.get_text()

        # Clean the text
        body_text = re.sub(r'\s+', ' ', body_text).strip()

        return {
            'title': title,
            'body_text': body_text
        }
    except Exception as e:
        print(f"Error parsing HTML: {str(e)}")
        return {
            'title': "",
            'body_text': ""
        }

def scrape_url(url, timeout=10):
    """
    Scrape a single URL and return HTML content.

    Args:
        url (str): URL to scrape
        timeout (int): Request timeout in seconds

    Returns:
        str: HTML content or empty string on failure
    """
    try:
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        }
        response = requests.get(url, headers=headers, timeout=timeout)
        response.raise_for_status()
        return response.text
    except Exception as e:
        print(f"Error scraping {url}: {str(e)}")
        return ""

# Load the dataset
print("Loading dataset...")
df = pd.read_csv('data/data.csv')
print(f"Dataset loaded: {len(df)} rows")
print(f"Columns: {df.columns.tolist()}")

# Parse HTML content or scrape if needed
print("\nParsing HTML content...")
parsed_data = []

for idx, row in df.iterrows():
    print(f"Processing row {idx + 1}/{len(df)}", end='\r')

    url = row['url']

    # Check if html_content column exists
    if 'html_content' in df.columns and pd.notna(row['html_content']):
        # Use pre-scraped HTML content
        parsed = parse_html_content(row['html_content'])
    else:
        # Need to scrape the URL
        html_content = scrape_url(url)
        if html_content:
            parsed = parse_html_content(html_content)
            sleep(1.5)  # Rate limiting
        else:
            parsed = {'title': '', 'body_text': ''}

    # Calculate word count
    word_count = len(parsed['body_text'].split())

    parsed_data.append({
        'url': url,
        'title': parsed['title'],
        'body_text': parsed['body_text'],
        'word_count': word_count
    })

# Create DataFrame with extracted content
extracted_df = pd.DataFrame(parsed_data)
print(f"\n\nExtracted content from {len(extracted_df)} pages")
print(f"Average word count: {extracted_df['word_count'].mean():.0f}")

# Create data directory if it doesn't exist
if not os.path.exists('data'):
    os.makedirs('data')

# Save extracted content (without html_content to reduce file size)
extracted_df.to_csv('data/extracted_content.csv', index=False)
print("Saved to: data/extracted_content.csv")

# Display sample
print("\nSample extracted content:")
print(extracted_df[['url', 'title', 'word_count']].head())


In [None]:
# ============================================================================
# SECTION 3: TEXT PREPROCESSING & FEATURE ENGINEERING
# ============================================================================

def clean_text(text):
    """Clean text by lowercasing and removing extra whitespace."""
    text = text.lower()
    text = re.sub(r'\s+', ' ', text).strip()
    return text

def count_sentences(text):
    """Count sentences in text."""
    sentences = re.split(r'[.!?]+', text)
    return len([s for s in sentences if s.strip()])

def extract_top_keywords(text, n=5):
    """Extract top N keywords using TF-IDF."""
    try:
        if not text.strip():
            return ""

        vectorizer = TfidfVectorizer(max_features=n, stop_words='english')
        tfidf_matrix = vectorizer.fit_transform([text])
        feature_names = vectorizer.get_feature_names_out()

        return "|".join(feature_names)
    except:
        return ""

print("Extracting features from text...")

# Initialize sentence transformer for embeddings
print("Loading sentence transformer model...")
model = SentenceTransformer('all-MiniLM-L6-v2')

features_list = []

for idx, row in extracted_df.iterrows():
    print(f"Processing features {idx + 1}/{len(extracted_df)}", end='\r')

    text = row['body_text']
    cleaned_text = clean_text(text)

    # Basic metrics
    word_count = row['word_count']
    sentence_count = count_sentences(text)

    # Readability score (Flesch Reading Ease)
    try:
        flesch_score = textstat.flesch_reading_ease(text) if text.strip() else 0
    except:
        flesch_score = 0

    # Top keywords
    keywords = extract_top_keywords(cleaned_text)

    # Generate embedding
    try:
        embedding = model.encode(text[:512])  # Limit text length for efficiency
    except:
        embedding = np.zeros(384)  # Default embedding size for this model

    features_list.append({
        'url': row['url'],
        'word_count': word_count,
        'sentence_count': sentence_count,
        'flesch_reading_ease': flesch_score,
        'top_keywords': keywords,
        'embedding': embedding.tolist()
    })

# Create features DataFrame
features_df = pd.DataFrame(features_list)
print(f"\n\nFeatures extracted for {len(features_df)} pages")

# Save features to CSV
features_df.to_csv('data/features.csv', index=False)
print("Saved to: data/features.csv")

# Display feature statistics
print("\nFeature Statistics:")
print(f"Word count - Mean: {features_df['word_count'].mean():.0f}, "
      f"Min: {features_df['word_count'].min()}, Max: {features_df['word_count'].max()}")
print(f"Sentence count - Mean: {features_df['sentence_count'].mean():.1f}")
print(f"Flesch Reading Ease - Mean: {features_df['flesch_reading_ease'].mean():.1f}")


In [None]:
# ============================================================================
# SECTION 4: DUPLICATE DETECTION
# ============================================================================

print("\n" + "="*80)
print("DUPLICATE DETECTION")
print("="*80)

# Convert embeddings to numpy array
embeddings_array = np.array(features_df['embedding'].tolist())

# Compute cosine similarity matrix
print("Computing cosine similarity matrix...")
similarity_matrix = cosine_similarity(embeddings_array)

# Set similarity threshold
SIMILARITY_THRESHOLD = 0.80

# Find duplicate pairs
duplicates = []
n = len(similarity_matrix)

for i in range(n):
    for j in range(i + 1, n):
        similarity = similarity_matrix[i][j]
        if similarity > SIMILARITY_THRESHOLD:
            duplicates.append({
                'url1': features_df.iloc[i]['url'],
                'url2': features_df.iloc[j]['url'],
                'similarity': similarity
            })

duplicates_df = pd.DataFrame(duplicates)

# Thin content detection (word count < 500)
features_df['is_thin'] = features_df['word_count'] < 500

# Save duplicates
if len(duplicates_df) > 0:
    duplicates_df.to_csv('data/duplicates.csv', index=False)
    print(f"Saved {len(duplicates_df)} duplicate pairs to: data/duplicates.csv")
else:
    print("No duplicate pairs found above threshold")

# Summary statistics
print("\n" + "-"*80)
print("DUPLICATE DETECTION SUMMARY")
print("-"*80)
print(f"Total pages analyzed: {len(features_df)}")
print(f"Duplicate pairs found: {len(duplicates_df)}")
print(f"Thin content pages: {features_df['is_thin'].sum()} "
      f"({features_df['is_thin'].sum() / len(features_df) * 100:.1f}%)")

if len(duplicates_df) > 0:
    print("\nTop duplicate pairs:")
    print(duplicates_df.sort_values('similarity', ascending=False).head())


In [None]:
# ============================================================================
# SECTION 5: CONTENT QUALITY SCORING MODEL
# ============================================================================

print("\n" + "="*80)
print("CONTENT QUALITY SCORING")
print("="*80)

# Create synthetic labels based on clear criteria
def assign_quality_label(row):
    """Assign quality label based on word count and readability."""
    wc = row['word_count']
    fre = row['flesch_reading_ease']

    if wc > 1500 and 50 <= fre <= 70:
        return 'High'
    elif wc < 500 or fre < 30:
        return 'Low'
    else:
        return 'Medium'

# Assign labels
features_df['quality_label'] = features_df.apply(assign_quality_label, axis=1)

print("Quality label distribution:")
print(features_df['quality_label'].value_counts())

# Prepare features for modeling
X = features_df[['word_count', 'sentence_count', 'flesch_reading_ease']].values
y = features_df['quality_label'].values

# Encode labels
from sklearn.preprocessing import LabelEncoder
le = LabelEncoder()
y_encoded = le.fit_transform(y)

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y_encoded, test_size=0.3, random_state=42, stratify=y_encoded
)

print(f"\nTraining set: {len(X_train)} samples")
print(f"Test set: {len(X_test)} samples")

# Train Random Forest model
print("\nTraining Random Forest model...")
rf_model = RandomForestClassifier(n_estimators=100, random_state=42, max_depth=5)
rf_model.fit(X_train, y_train)

# Predictions
y_pred = rf_model.predict(X_test)

# Baseline model (rule-based using only word count)
def baseline_predict(word_counts):
    """Simple baseline using word count only."""
    predictions = []
    for wc in word_counts:
        if wc > 1500:
            predictions.append(le.transform(['High'])[0])
        elif wc < 500:
            predictions.append(le.transform(['Low'])[0])
        else:
            predictions.append(le.transform(['Medium'])[0])
    return np.array(predictions)

y_pred_baseline = baseline_predict(X_test[:, 0])

# Evaluate models
print("\n" + "-"*80)
print("MODEL PERFORMANCE")
print("-"*80)

# Random Forest performance
rf_accuracy = accuracy_score(y_test, y_pred)
rf_f1 = f1_score(y_test, y_pred, average='weighted')

print("\nRandom Forest Classifier:")
print(f"Overall Accuracy: {rf_accuracy:.2f}")
print(f"Weighted F1-Score: {rf_f1:.2f}")
print("\nClassification Report:")
print(classification_report(y_test, y_pred, target_names=le.classes_))

# Baseline performance
baseline_accuracy = accuracy_score(y_test, y_pred_baseline)
print(f"Baseline Accuracy (word count only): {baseline_accuracy:.2f}")
print(f"Improvement over baseline: {(rf_accuracy - baseline_accuracy):.2f}")

# Feature importance
feature_names = ['word_count', 'sentence_count', 'flesch_reading_ease']
importances = rf_model.feature_importances_
feature_importance_df = pd.DataFrame({
    'feature': feature_names,
    'importance': importances
}).sort_values('importance', ascending=False)

print("\nTop Features:")
for idx, row in feature_importance_df.iterrows():
    print(f"{idx + 1}. {row['feature']} (importance: {row['importance']:.2f})")

# Confusion Matrix
print("\nConfusion Matrix:")
cm = confusion_matrix(y_test, y_pred)
print(cm)

# Visualize confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=le.classes_, yticklabels=le.classes_)
plt.title('Confusion Matrix - Quality Classification')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.tight_layout()
plt.savefig('data/confusion_matrix.png', dpi=300, bbox_inches='tight')
print("Confusion matrix saved to: data/confusion_matrix.png")

# Create models directory if it doesn't exist
if not os.path.exists('models'):
    os.makedirs('models')

# Save the model
import pickle
with open('models/quality_model.pkl', 'wb') as f:
    pickle.dump({'model': rf_model, 'label_encoder': le}, f)
print("Model saved to: models/quality_model.pkl")


In [None]:
# ============================================================================
# SECTION 6: REAL-TIME ANALYSIS FUNCTION
# ============================================================================

print("\n" + "="*80)
print("REAL-TIME ANALYSIS FUNCTION")
print("="*80)

def analyze_url(url):
    """
    Analyze a URL for content quality and duplicate detection.

    Args:
        url (str): URL to analyze

    Returns:
        dict: Analysis results including quality score and similar content
    """
    try:
        # Scrape the URL
        print(f"Scraping: {url}")
        html_content = scrape_url(url)

        if not html_content:
            return {"error": "Failed to scrape URL"}

        # Parse HTML
        parsed = parse_html_content(html_content)
        text = parsed['body_text']

        if not text.strip():
            return {"error": "No content extracted from URL"}

        # Extract features
        cleaned_text = clean_text(text)
        word_count = len(text.split())
        sentence_count = count_sentences(text)

        try:
            flesch_score = textstat.flesch_reading_ease(text)
        except:
            flesch_score = 0

        # Generate embedding
        embedding = model.encode(text[:512])

        # Predict quality
        features = np.array([[word_count, sentence_count, flesch_score]])
        quality_encoded = rf_model.predict(features)[0]
        quality_label = le.inverse_transform([quality_encoded])[0]

        # Check if thin content
        is_thin = word_count < 500

        # Find similar content
        similarities = cosine_similarity([embedding], embeddings_array)[0]
        similar_indices = np.where(similarities > 0.70)[0]  # Lower threshold for similarity

        similar_content = []
        for idx in similar_indices[:5]:  # Top 5 similar
            similar_content.append({
                'url': features_df.iloc[idx]['url'],
                'similarity': float(similarities[idx])
            })

        # Sort by similarity
        similar_content = sorted(similar_content, key=lambda x: x['similarity'], reverse=True)

        # Prepare result
        result = {
            'url': url,
            'title': parsed['title'],
            'word_count': word_count,
            'sentence_count': sentence_count,
            'readability': round(flesch_score, 2),
            'quality_label': quality_label,
            'is_thin': is_thin,
            'similar_to': similar_content
        }

        return result

    except Exception as e:
        return {"error": f"Analysis failed: {str(e)}"}

# Test the function with a sample URL from the dataset
print("\nTesting analyze_url() function...")
test_url = features_df.iloc[0]['url']
result = analyze_url(test_url)

print("\nSample Analysis Result:")
print(json.dumps(result, indent=2))

print("\n" + "="*80)
print("PIPELINE COMPLETED SUCCESSFULLY!")
print("="*80)
print("\nGenerated files:")
print("  - data/extracted_content.csv")
print("  - data/features.csv")
print("  - data/duplicates.csv")
print("  - models/quality_model.pkl")
print("  - data/confusion_matrix.png")
print("\nYou can now use analyze_url(url) to analyze any URL in real-time!")