In [None]:
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath("__file__")), ".."))

passed = 0
failed = 0

def check(name, condition):
    global passed, failed
    if condition:
        passed += 1
        print(f"  ✓ {name}")
    else:
        failed += 1
        print(f"  ✗ {name}")

In [None]:
# ── Test 1: Data Models (type.py) ──
print("Test 1: Data Models")

from core.type import (
    Attachment, Reactions, Comment, Analytics, Post, Profile,
    Follows, SocialGraph, EngagementRecord, Filters, UserData,
    ScoredPost, get_current_user, get_current_posts,
)
from datetime import datetime, timezone

# Reactions
r = Reactions(data={"like": 10, "dislike": 3})
check("Reactions.likes", r.likes == 10)
check("Reactions.dislikes", r.dislikes == 3)
check("Reactions.net_score", r.net_score == 7)

r_default = Reactions()
check("Reactions default zeros", r_default.likes == 0 and r_default.dislikes == 0)

# Comment
c = Comment(id="c1", owner="alice", content="test")
check("Comment fields", c.id == "c1" and c.owner == "alice")
check("Comment default reactions", c.reactions.likes == 0)

# Analytics engagement score
a = Analytics(
    reactions=Reactions(data={"like": 10, "dislike": 2}),
    comments=[
        Comment(id="c1", owner="a", content="x", reactions=Reactions(data={"like": 3, "dislike": 0})),
        Comment(id="c2", owner="b", content="y", reactions=Reactions(data={"like": 1, "dislike": 1})),
    ]
)
# formula: likes - dislikes*1.5 + comments*2.0 + comment_reactions_net*0.5
# = 10 - 3.0 + 4.0 + (3 + 0)*0.5 = 12.5
expected_eng = 10 - (2 * 1.5) + (2 * 2.0) + ((3 + 0) * 0.5)
check("Analytics.engagement_score", abs(a.engagement_score - expected_eng) < 0.01)

# Post
p = Post(id="p1", title="Test", owner="bob", content="hello", tags=["a", "b"])
check("Post created_at is datetime", isinstance(p.created_at, datetime))
check("Post tags", p.tags == ["a", "b"])
check("Post.hydrate() returns self", p.hydrate() is p)
check("Post default network is 'out'", p.network == "out")

# get_current_posts from dict with key
raw_dict = {"posts": [
    {"id": "p1", "title": "T", "owner": "o", "content": "c"},
    {"id": "p2", "title": "T2", "owner": "o2", "content": "c2"},
]}
posts = get_current_posts(raw_dict)
check("get_current_posts from dict", len(posts) == 2 and posts[0].id == "p1")

# get_current_posts from list
posts2 = get_current_posts([{"id": "p3", "title": "T", "owner": "o", "content": "c"}])
check("get_current_posts from list", len(posts2) == 1)

# UserData round-trip
raw_user = {
    "id": "u1",
    "profile": {"name": "Test", "country": "US", "preferences": "eng", "verified": False},
    "socialgraph": {"followers": {"len": 5, "chain": []}, "following": {"len": 3, "chain": []}, "friends": []},
    "anti_filters": {"disliked_posts": ["p1"], "blocked_owners": ["spam_bot"], "blocked_tags": ["nsfw"], "engagement_history": []},
}
u = get_current_user(raw_user)
check("UserData validates", u.id == "u1" and u.profile.name == "Test")
check("Filters loaded", u.anti_filters.disliked_posts == ["p1"])
check("Blocked owners loaded", u.anti_filters.blocked_owners == ["spam_bot"])

print()

In [None]:
# ── Test 2: Database Layer (db.py) ──
print("Test 2: Database Layer")

from core.db import setup_db, get_user, Database

# setup_db loads both network files
db = setup_db()
all_posts = db.retrieve_all()
check("retrieve_all loads posts", len(all_posts) == 15)
check("All posts have IDs", all(p.id for p in all_posts))
check("All posts have created_at", all(isinstance(p.created_at, datetime) for p in all_posts))

# retrieve_by_network splits correctly
db2 = setup_db()
in_posts, out_posts = db2.retrieve_by_network()
check("In-network count", len(in_posts) == 5)
check("Out-network count", len(out_posts) == 10)
check("In-network tagged 'in'", all(p.network == "in" for p in in_posts))
check("Out-network tagged 'out'", all(p.network == "out" for p in out_posts))

# no duplicate loading of in_network.json (was a bug before)
in_ids = [p.id for p in in_posts]
check("No duplicate in-network posts", len(in_ids) == len(set(in_ids)))

# get_user loads valid UserData
user_data = get_user()
check("get_user returns UserData", isinstance(user_data, UserData))
check("User has profile", user_data.profile.name != "")
check("User has social graph", user_data.socialgraph.following.len > 0)
check("User has engagement history", len(user_data.anti_filters.engagement_history) > 0)

# bad file path handled gracefully
bad_db = Database(["/nonexistent/file.json"])
bad_result = bad_db.retrieve_all()
check("Missing file returns empty", len(bad_result) == 0)

print()

In [None]:
# ── Test 3: Engine — Embeddings & Similarity (models.py) ──
print("Test 3: Engine — Embeddings & Similarity")

from core.models import Engine, _cosine_similarity, _time_decay, _tokenize, _tf

# tokenize
check("Tokenize lowercase", _tokenize("Hello World") == ["hello", "world"])

# TF
tf_result = _tf(["a", "b", "a", "a"])
check("TF correct", abs(tf_result["a"] - 0.75) < 0.01 and abs(tf_result["b"] - 0.25) < 0.01)

# cosine similarity
check("Identical vectors sim=1", abs(_cosine_similarity([1, 0, 1], [1, 0, 1]) - 1.0) < 0.001)
check("Orthogonal vectors sim=0", abs(_cosine_similarity([1, 0, 0], [0, 1, 0])) < 0.001)
check("Zero vector sim=0", _cosine_similarity([0, 0], [1, 1]) == 0.0)

# partial similarity
sim = _cosine_similarity([1, 1, 0], [1, 0, 1])
check("Partial similarity in (0,1)", 0 < sim < 1)

# time decay
recent_post = Post(id="t1", title="T", owner="o", content="c",
                   created_at=datetime.now(timezone.utc))
old_post = Post(id="t2", title="T", owner="o", content="c",
                created_at=datetime(2026, 1, 1, tzinfo=timezone.utc))
check("Recent post decay ~1.0", _time_decay(recent_post) > 0.95)
check("Old post decay < recent", _time_decay(old_post) < _time_decay(recent_post))

# compute_embeddings
test_posts = [
    Post(id="p1", title="machine learning", owner="a", content="deep neural networks", tags=["ml", "ai"]),
    Post(id="p2", title="cooking recipe", owner="b", content="pasta tomato sauce", tags=["food"]),
]
test_user = get_current_user({
    "id": "u1",
    "profile": {"name": "ML Fan", "country": "US", "preferences": "machine learning ai", "verified": True, "interest_tags": ["ml", "ai"]},
    "socialgraph": {"followers": {"len": 0}, "following": {"len": 0}, "friends": []},
})
vocab, idf_map, vectors = Engine.compute_embeddings(test_posts, test_user)
check("Vocab built", len(vocab) > 0)
check("IDF map built", len(idf_map) > 0)
check("Vectors match post count", len(vectors) == 2)
check("Vector dimensions match vocab", len(vectors[0]) == len(vocab))

# user vector should be more similar to ML post than cooking post
user_vec = Engine.compute_user_vector(test_user, vocab, idf_map)
sim_ml = Engine.score_relevance(vectors[0], user_vec)
sim_food = Engine.score_relevance(vectors[1], user_vec)
check("ML post more relevant to ML user", sim_ml > sim_food)

print()

In [None]:
# ── Test 4: Engine — Classification & Scoring ──
print("Test 4: Engine — Classification & Scoring")

from core.db import setup_db, get_user

db = setup_db()
in_posts, out_posts = db.retrieve_by_network()
user_data = get_user()

scored_in, scored_out = Engine.classify_posts(in_posts, out_posts, user_data)

check("Scored in-network count matches", len(scored_in) == len(in_posts))
check("Scored out-network count matches", len(scored_out) == len(out_posts))
check("All scores > 0", all(sp.score > 0 for sp in scored_in + scored_out))
check("In-network sorted descending", all(scored_in[i].score >= scored_in[i+1].score for i in range(len(scored_in)-1)))
check("Out-network sorted descending", all(scored_out[i].score >= scored_out[i+1].score for i in range(len(scored_out)-1)))
check("Source labels correct (in)", all(sp.source == "in_network" for sp in scored_in))
check("Source labels correct (out)", all(sp.source == "out_network" for sp in scored_out))

# ScoredPost has all breakdown fields populated
sp = scored_in[0]
check("ScoredPost has relevance", sp.relevance >= 0)
check("ScoredPost has engagement", sp.engagement >= 0)
check("ScoredPost has recency", 0 <= sp.recency <= 1)

# ── Diversity re-ranking ──
combined = scored_in + scored_out
diverse = Engine.rank_with_diversity(combined)
check("Diversity keeps all posts", len(diverse) == len(combined))

# check that diversity doesn't just return identical order — same-owner posts should be spread out
owners_before = [sp.post.owner for sp in sorted(combined, key=lambda s: s.score, reverse=True)]
owners_after = [sp.post.owner for sp in diverse]
# at minimum diversity should move something
check("Diversity re-ranks (order changed)", owners_before != owners_after or len(set(owners_before)) == len(owners_before))

# ── Cold-start fallback ──
cold_posts = in_posts + out_posts
cold_result = Engine.cold_start_fallback(cold_posts, k=5)
check("Cold-start returns k posts", len(cold_result) == 5)
check("Cold-start sorted descending", all(cold_result[i].score >= cold_result[i+1].score for i in range(len(cold_result)-1)))
check("Cold-start scores are positive", all(sp.score > 0 for sp in cold_result))

print()

In [None]:
# ── Test 5: User Context (user.py) ──
print("Test 5: User Context")

from context.user.user import User

user = User()

check("User loads tree", user.tree.id == "user_001")
check("User not cold start", not user.is_cold_start)
check("User.embedding returns UserData", user.embedding().id == "user_001")
check("get_profile_embedding", user.get_profile_embedding().name == "Alex Dev")
check("get_real_graph", user.get_real_graph().following.len == 80)
check("get_anti_signals", "post_108" in user.get_anti_signals().disliked_posts)
check("get_following_owners", "alice" in user.get_following_owners())
check("get_friends", "bob" in user.get_friends())

# pre-filtering: should remove disliked post_108
test_scored = [
    ScoredPost(post=Post(id="post_108", title="Incident", owner="ops", content="outage"), score=0.5),
    ScoredPost(post=Post(id="post_001", title="Getting Started", owner="alice", content="intro"), score=0.8),
    ScoredPost(post=Post(id="post_999", title="Spam", owner="blocked_guy", content="spam"), score=0.3),
]
# add a blocked owner for testing
user.tree.anti_filters.blocked_owners.append("blocked_guy")

filtered = user.filter(test_scored)
filtered_ids = [sp.post.id for sp in filtered]
check("Pre-filter removes disliked post", "post_108" not in filtered_ids)
check("Pre-filter removes blocked owner", "post_999" not in filtered_ids)
check("Pre-filter keeps clean post", "post_001" in filtered_ids)

# remove test blocked owner
user.tree.anti_filters.blocked_owners.remove("blocked_guy")

# engagement recording
initial_history_len = len(user.tree.anti_filters.engagement_history)
user.record_engagement("post_999", "like", dwell_seconds=12.0)
check("record_engagement appends", len(user.tree.anti_filters.engagement_history) == initial_history_len + 1)

user.record_engagement("post_998", "dislike", dwell_seconds=1.0)
check("Dislike auto-adds to disliked_posts", "post_998" in user.tree.anti_filters.disliked_posts)

# update_filters batch
user.update_filters({
    "events": [{"post_id": "post_997", "action": "click", "dwell_seconds": 5.0}],
    "block_owners": ["spammer_x"],
    "block_tags": ["crypto_scam"],
})
check("update_filters adds event", any(e.post_id == "post_997" for e in user.tree.anti_filters.engagement_history))
check("update_filters blocks owner", "spammer_x" in user.tree.anti_filters.blocked_owners)
check("update_filters blocks tag", "crypto_scam" in user.tree.anti_filters.blocked_tags)

# update_filters with None does nothing
prev_len = len(user.tree.anti_filters.engagement_history)
user.update_filters(None)
check("update_filters(None) is no-op", len(user.tree.anti_filters.engagement_history) == prev_len)

# cold-start detection for fresh user
from core.type import get_current_user
fresh_user_data = get_current_user({
    "id": "new_user",
    "profile": {"name": "New", "country": "US", "preferences": "", "verified": False},
    "socialgraph": {"followers": {"len": 0}, "following": {"len": 0}, "friends": []},
    "anti_filters": {"engagement_history": []},
})
check("Empty history = cold start data", len(fresh_user_data.anti_filters.engagement_history) == 0)

print()

In [None]:
# ── Test 6: Utils — rank() and filter() ──
print("Test 6: Utils — rank & filter")

from core.utils import rank, filter

# rank normalizes scores to [0, 1]
test_ranked = [
    ScoredPost(post=Post(id="a", title="A", owner="x", content="c"), score=0.8),
    ScoredPost(post=Post(id="b", title="B", owner="y", content="c"), score=0.4),
    ScoredPost(post=Post(id="c", title="C", owner="z", content="c"), score=0.2),
]
ranked = rank(test_ranked)
check("rank sorts descending", ranked[0].post.id == "a")
check("rank normalizes top to 1.0", abs(ranked[0].score - 1.0) < 0.001)
check("rank normalizes proportionally", ranked[1].score < ranked[0].score)
check("rank empty input", rank([]) == [])

# filter — deduplication
dup_posts = [
    ScoredPost(post=Post(id="dup", title="T", owner="o", content="c"), score=0.5),
    ScoredPost(post=Post(id="dup", title="T", owner="o", content="c"), score=0.3),
]
deduped = filter(dup_posts)
check("filter removes duplicates", len(deduped) == 1)

# filter — blocked keywords
spam_post = ScoredPost(post=Post(id="s1", title="Get rich SCAM", owner="o", content="c"), score=0.5)
clean_post = ScoredPost(post=Post(id="s2", title="Good Post", owner="o", content="hello"), score=0.5)
filtered = filter([spam_post, clean_post])
check("filter removes blocked keyword", len(filtered) == 1 and filtered[0].post.id == "s2")

# filter — high dislike ratio suppression
toxic = Post(id="toxic", title="T", owner="o", content="c",
             analytics=Analytics(reactions=Reactions(data={"like": 3, "dislike": 5})))
toxic_sp = ScoredPost(post=toxic, score=0.5)
ok_post = ScoredPost(post=Post(id="ok", title="T", owner="o2", content="c"), score=0.5)
result = filter([toxic_sp, ok_post])
check("filter suppresses high-dislike ratio", len(result) == 1 and result[0].post.id == "ok")

# filter — with user context (blocked owners/tags)
test_user_data = get_current_user({
    "id": "u1",
    "profile": {"name": "T", "country": "US", "preferences": "", "verified": False},
    "socialgraph": {"followers": {"len": 0}, "following": {"len": 0}, "friends": []},
    "anti_filters": {"blocked_owners": ["bad_actor"], "blocked_tags": ["nsfw"], "disliked_posts": ["hated_post"]},
})
posts_to_filter = [
    ScoredPost(post=Post(id="hated_post", title="T", owner="o", content="c"), score=0.5),
    ScoredPost(post=Post(id="bad_owner", title="T", owner="bad_actor", content="c"), score=0.5),
    ScoredPost(post=Post(id="nsfw_post", title="T", owner="o", content="c", tags=["nsfw"]), score=0.5),
    ScoredPost(post=Post(id="good_post", title="T", owner="o", content="clean"), score=0.5),
]
result = filter(posts_to_filter, test_user_data)
result_ids = [sp.post.id for sp in result]
check("filter removes user-disliked", "hated_post" not in result_ids)
check("filter removes blocked owner", "bad_owner" not in result_ids)
check("filter removes blocked tag", "nsfw_post" not in result_ids)
check("filter keeps clean post", "good_post" in result_ids)

print()

In [None]:
# ── Test 7: Embedding Retrieval (retrieve.py) ──
print("Test 7: Embedding Retrieval")

from datasets.retrieve import Embedding

embed = Embedding()
check("Embedding loads in-network", len(embed.get_innetwork_posts()) == 5)
check("Embedding loads out-network", len(embed.get_outnetwork_posts()) == 10)
check("get() returns all", len(embed.get()) == 15)

# classify returns scored posts
user_data = get_user()
scored_in, scored_out = embed.classify(user_data)
check("classify returns ScoredPost in", all(isinstance(sp, ScoredPost) for sp in scored_in))
check("classify returns ScoredPost out", all(isinstance(sp, ScoredPost) for sp in scored_out))
check("classify in count", len(scored_in) == 5)
check("classify out count", len(scored_out) == 10)

# hydrate
hydrated = embed.hydrate(embed.get_innetwork_posts())
check("hydrate returns same posts", len(hydrated) == 5)
check("hydrated posts have content", all(p.content for p in hydrated))

print()

In [None]:
# ── Test 8: Full Pipeline Integration ──
print("Test 8: Full Pipeline Integration")

from datasets.retrieve import Embedding as E2
from context.user.user import User as U2
from core.utils import filter as post_filter, rank as final_rank
from core.models import Engine as E

# step 1: load
emb = E2()
total_posts = len(emb.get())
check("Pipeline: posts loaded", total_posts == 15)

# step 2: user
u = U2()
ud = u.embedding()
check("Pipeline: user loaded", ud.id == "user_001")

# step 3: classify
s_in, s_out = emb.classify(ud)
check("Pipeline: classification done", len(s_in) + len(s_out) == total_posts)

# step 4: combine
combined = s_in + s_out
check("Pipeline: combined", len(combined) == total_posts)

# step 5: pre-filter
prefiltered = u.filter(combined)
check("Pipeline: pre-filter reduces", len(prefiltered) <= len(combined))
check("Pipeline: disliked post_108 removed", all(sp.post.id != "post_108" for sp in prefiltered))

# step 6: rank
ranked = final_rank(prefiltered)
check("Pipeline: ranked scores normalized", ranked[0].score == 1.0)

# step 7: diversity
diverse = E.rank_with_diversity(ranked)
check("Pipeline: diversity preserves count", len(diverse) == len(ranked))

# step 8: post-filter
final = post_filter(diverse, ud)
check("Pipeline: post-filter produces results", len(final) > 0)
check("Pipeline: no duplicates in final", len(set(sp.post.id for sp in final)) == len(final))

# verify ordering is sensible — top post should have score >= all others
check("Pipeline: final is sorted-ish", final[0].score >= final[-1].score)

# verify the feed has posts from multiple owners (diversity working)
unique_owners = set(sp.post.owner for sp in final[:5])
check("Pipeline: top 5 has diverse owners", len(unique_owners) >= 3)

# step 9: feedback loop
u.update_filters({"events": [{"post_id": final[0].post.id, "action": "like", "dwell_seconds": 20.0}]})
check("Pipeline: feedback recorded", any(
    e.post_id == final[0].post.id and e.action == "like"
    for e in u.tree.anti_filters.engagement_history
))

print()

In [None]:
# ── Summary ──
print("=" * 50)
print(f"Results: {passed} passed, {failed} failed, {passed + failed} total")
if failed == 0:
    print("All tests passed.")
else:
    print(f"FAILURES: {failed} test(s) failed.")
print("=" * 50)