In [None]:
# | default_exp seo_site_analysis

In [None]:
# | export
from sqlmodel import Session, select
from seo_rat.article import Article
from seo_rat.content_parser import remove_metadata, normalize_text, calculate_similarity
from seo_rat.seo_content_analysis import calculate_keyword_density

In [None]:
# | export
def detect_duplicate_content(
    session: Session, file_path: str, similarity_threshold: float = 0.8
) -> dict:
    """Find articles in the DB with content similar to the given file"""
    with open(file_path, "r") as f:
        current_content = normalize_text(remove_metadata(f.read()))

    articles = session.exec(select(Article)).all()
    similar = []

    for article in articles:
        if article.file_path == file_path:
            continue
        with open(article.file_path, "r") as f:
            other_content = normalize_text(remove_metadata(f.read()))

        similarity = calculate_similarity(current_content, other_content)
        if similarity >= similarity_threshold:
            similar.append({"file_path": article.file_path, "similarity": similarity})

    return {"has_duplicates": len(similar) > 0, "similar_articles": similar}

In [None]:
# | export
def analyze_keyword_cannibalization(session: Session, keyword: str) -> dict:
    """Find articles competing for the same focus keyword"""
    articles = session.exec(
        select(Article).where(Article.focus_keyword == keyword)
    ).all()

    if len(articles) <= 1:
        return {
            "has_cannibalization": False,
            "keyword": keyword,
            "count": len(articles),
        }

    results = []
    for article in articles:
        with open(article.file_path, "r") as f:
            content = remove_metadata(f.read())
        density = calculate_keyword_density(content, keyword)
        results.append(
            {
                "file_path": article.file_path,
                "density": density["density"],
                "count": density["count"],
            }
        )

    return {
        "has_cannibalization": True,
        "keyword": keyword,
        "count": len(articles),
        "articles": results,
    }

In [None]:
# | export
def analyze_content_groups(session: Session, similarity_threshold: float = 0.8) -> dict:
    """Group similar articles together across the whole site"""
    articles = session.exec(select(Article)).all()
    groups = []
    processed: set[int] = set()

    for article in articles:
        if article.id in processed:
            continue

        with open(article.file_path, "r") as f:
            main_content = normalize_text(remove_metadata(f.read()))

        group: dict = {"main_article": article.file_path, "similar_articles": []}

        for other in articles:
            if other.id == article.id or other.id in processed:
                continue

            with open(other.file_path, "r") as f:
                other_content = normalize_text(remove_metadata(f.read()))

            similarity = calculate_similarity(main_content, other_content)
            if similarity >= similarity_threshold:
                group["similar_articles"].append(
                    {"file_path": other.file_path, "similarity": similarity}
                )
                processed.add(other.id)

        if group["similar_articles"]:
            groups.append(group)
            processed.add(article.id)

    return {
        "total_articles": len(articles),
        "groups": groups,
        "duplicate_groups": len(groups),
    }

In [None]:
# | test
from fastcore.test import test_eq
from pprint import pprint
from sqlmodel import create_engine, Session, SQLModel
from seo_rat.models import Website
from seo_rat.article import Article, insert_article
import tempfile, os


In [None]:
# | test
from pathlib import Path

sample_dir = Path("sample")
if not sample_dir.exists():
    sample_dir = Path("../sample")

with open(sample_dir / "example.md", "r") as f:
    content = f.read()

engine = create_engine("sqlite:///:memory:")
SQLModel.metadata.create_all(engine)

with Session(engine) as session:
    website = Website(url="https://test.com", name="Test", lang="en")
    session.add(website)
    session.commit()
    session.refresh(website)

    with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f1:
        f1.write(content)
        path1 = f1.name

    with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f2:
        f2.write(content)
        path2 = f2.name

    article1 = insert_article(session, website.id, path1, focus_keyword="Kareem")
    article2 = insert_article(session, website.id, path2, focus_keyword="Kareem")

    # detect_duplicate_content
    dup_result = detect_duplicate_content(session, path1)
    test_eq(dup_result["has_duplicates"], True)
    pprint(dup_result)

    # analyze_keyword_cannibalization
    cannibal_result = analyze_keyword_cannibalization(session, "Kareem")
    test_eq(cannibal_result["has_cannibalization"], True)
    test_eq(cannibal_result["count"], 2)
    pprint(cannibal_result)

    # analyze_content_groups
    groups_result = analyze_content_groups(session, 0.8)
    test_eq(groups_result["duplicate_groups"] >= 1, True)
    pprint(groups_result)

    os.unlink(path1)
    os.unlink(path2)
