<a href="https://colab.research.google.com/github/fyas101/Reddit_Group_Project/blob/main/RedditClasses.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# This class utilized the group #3 functions from module 1 which are categorizing by tone, detecting misinformation, and extracting category keywords.

import re
from typing import Optional, Dict, List

# Categorization by tone function from Project 1
def categorize_by_tone(post_text):
    if not isinstance(post_text, str):
        raise TypeError("post_text must be a string")

    clean_text = post_text.strip()
    if not clean_text:
        raise ValueError("post_text cannot be empty")

    lower_text = clean_text.lower()

    # Basic heuristic keywords and cues
    anger_cues = ['!', 'angry', 'hate', 'worst', 'terrible']
    sarcasm_cues = ['yeah right', 'sure', 'totally']
    humor_cues = ['lol', 'funny', 'haha']
    uncertainty_cues = ['maybe', 'not sure', 'idk', 'perhaps']

    # Rule-based detection
    if any(cue in lower_text for cue in sarcasm_cues):
        return 'sarcastic'
    elif any(cue in lower_text for cue in anger_cues) and clean_text.isupper():
        return 'angry'
    elif any(cue in lower_text for cue in humor_cues):
        return 'humorous'
    elif any(cue in lower_text for cue in uncertainty_cues):
        return 'uncertain'
    elif '?' in clean_text:
        return 'neutral'
    else:
        return 'informative'

# Detection of misinformation function from Project 1
def detect_misinformation(post_text, keyword_list=None):
    if not isinstance(post_text, str):
        raise TypeError("post_text must be a string.")
    if keyword_list is not None and not (isinstance(keyword_list, list) and all(isinstance(k, str) for k in keyword_list)):
        raise TypeError("keyword_list must be a list of strings or None.")

    # Default keywords often associated with misinformation
    default_keywords = [
        "rumor", "unconfirmed", "heard", "confirmed??", "sources say", "reportedly", "breaking", "shocking",
        "can't believe", "conspiracy", "fake news", "scam", "hoax", "allegedly"
    ]

    keywords = keyword_list if keyword_list else default_keywords

    # Clean post text for matching
    cleaned_text = post_text.lower()
    cleaned_text = re.sub(r"http\S+|www\S+|https\S+", "", cleaned_text)  # remove URLs
    cleaned_text = re.sub(r"[^a-zA-Z\s]", "", cleaned_text)  # remove punctuation

    matched = [kw for kw in keywords if kw in cleaned_text]

    return {
        "is_misinformation": bool(matched),
        "matched_keywords": matched
    }

# Extraction of category keywords function from project 1
def extract_category_keywords(categories):
    # Returns a dictionary that maps each category to example keywords after given a list of categories.

    categegory_keywords = {
        "humor": ["lol", "lmao", "haha", "joke"],
        "random": ["idk", "random", ],
        "news": ["alert", "news", "update", "announcement", "diamondback", "report"],
        "academics": ["class", "classes", "exam", "professor", "grade", "gpa", "study", "midterm", "final", "project"],
        "advice": ["reccomend", "tips", "help", "should I", "question"],
        "social": ["party", "hangout", "movie", "homecoming", "game"],
    }
    # Lists of categories with assigned example keywords
    return {cat: categegory_keywords.get(cat, []) for cat in categories}


def normalize_word(word):
    # helps to make keywords lowercase as well as clean up extra space
    return word.strip().lower()

# Created class for Group #3 Organization and Categorizing
class ContentCategorizer:
    def __init__(self, custom_keywords: Optional[Dict[str, List[str]]] = None):
        if custom_keywords is not None:
            if not isinstance(custom_keywords, dict):
                raise TypeError("custom_keywords must be a dictionary")
            for key, value in custom_keywords.items():
                if not isinstance(key, str):
                    raise TypeError("All category names must be strings")
                if not isinstance(value, list):
                    raise TypeError(f"Keywords for category '{key}' must be a list")

        self._category_keywords = {
            "humor": ["lol", "lmao", "haha", "joke", "funny"],
            "random": ["idk", "random", "whatever"],
            "news": ["alert", "news", "update", "announcement", "diamondback", "report"],
            "academics": ["class", "classes", "exam", "professor", "grade",
                         "gpa", "study", "midterm", "final", "project", "homework"],
            "advice": ["recommend", "tips", "help", "should i", "question", "advice"],
            "social": ["party", "hangout", "movie", "homecoming", "game", "event"]
        }

        if custom_keywords:
            for category, keywords in custom_keywords.items():
                self._category_keywords[category] = [kw.lower().strip() for kw in keywords]

        self._misinformation_keywords = [
            "rumor", "unconfirmed", "heard", "confirmed??", "sources say",
            "reportedly", "breaking", "shocking", "can't believe",
            "conspiracy", "fake news", "scam", "hoax", "allegedly"
        ]
        self._analysis_history = []

    @property
    def category_keywords(self) -> Dict[str, List[str]]:
        return dict(self._category_keywords)

    def analysis_count(self) -> int:
        return len(self._analysis_history)

    # First method
    def analyze_tone(self, post_text: str) -> str:
        # Call Project 1 Function
        tone = categorize_by_tone(post_text)
        self._analysis_history.append({
            'type': 'tone',
            'text': post_text[:50],
            'result': tone
        })
        return tone

    # Second method
    def check_for_misinformation(self, post_text: str,
                                 custom_keywords: Optional[List[str]] = None) -> Dict[str, any]:
        keywords = custom_keywords if custom_keywords else self._misinformation_keywords
        # Call project 1 function
        result = detect_misinformation(post_text, keywords)
        self._analysis_history.append({
            'type': 'misinformation',
            'text': post_text[:50],
            'result': result['is_misinformation']
        })

        return result

# Third method
    def get_category_keywords(self, categories: Optional[List[str]] = None) -> Dict[str, List[str]]:
        if categories is None:
            categories = list(self._category_keywords.keys())
        for cat in categories:
            if cat not in self._category_keywords:
                raise ValueError(f"Category '{cat}' does not exist")
        return {cat: self._category_keywords[cat] for cat in categories}

# Fourth method
    def categorize_content(self, post_text: str) -> str:
        if not isinstance(post_text, str):
            raise TypeError("post_text must be a string")

        cleaned_text = post_text.lower()

# Fifth method
    def analyze_post(self, post_text: str, check_misinfo: bool = True) -> Dict[str, any]:
        analysis = {
            'tone': self.analyze_tone(post_text),
            'category': self.categorize_content(post_text)
        }

        if check_misinfo:
            analysis['misinformation'] = self.check_for_misinformation(post_text)

        return analysis

  # Group 3 functions class creation

In [None]:
from typing import List, Dict, Any

#track users function from project 1
def track_users(posts_data):
    users_summary = {}
    for post in posts_data:
        user = post["username"]
        if user not in users_summary:
            users_summary[user] = {
                "total_posts": 0,
                "total_upvotes": 0,
                "total_comments": 0,
                "disinformation_posts": 0
            }
        users_summary[user]["total_posts"] += 1
        users_summary[user]["total_upvotes"] += post["upvotes"]
        users_summary[user]["total_comments"] += post["comments"]
        if post.get("is_disinformation"):
            users_summary[user]["disinformation_posts"] += 1
    return users_summary

#top posters function from project 1
def top_posters_list(posts_data, top_n=10):
    user_stats = {}
    for post in posts_data:
        user = post["username"]
        if user not in user_stats:
            user_stats[user] = {"total_posts": 0, "total_engagement": 0}
        user_stats[user]["total_posts"] += 1
        user_stats[user]["total_engagement"] += post["upvotes"] + post["comments"]

    ranked = [{"username": u, **s} for u, s in user_stats.items()]
    ranked.sort(key=lambda x: x["total_engagement"], reverse=True)
    return ranked[:top_n]

#interaction rate function from project 1
def interaction_rate(post_data):
    views = post_data["views"]
    if views <= 0:
        raise ValueError("views must be greater than zero")
    total = post_data["upvotes"] + post_data["comments"]
    return round((total / views) * 100, 2)

class UserTracker:
    """tracks user activity and statistics."""

    def __init__(self, posts_data: List[Dict] = None):
        if posts_data and not isinstance(posts_data, list):
            raise TypeError("posts_data must be a list")
        self._posts_data = posts_data or []

    @property
    def posts_data(self):
        return self._posts_data

    @property
    def user_count(self):
        return len(set(p["username"] for p in self._posts_data if "username" in p))

    #method 1: get all user statistics
    def get_user_stats(self):
        if not self._posts_data:
            return {}
        return track_users(self._posts_data)

    #method 2: get top posters
    def get_top_posters(self, top_n=10):
        if not self._posts_data:
            return []
        return top_posters_list(self._posts_data, top_n)

    #method 3: calculate user interaction rate
    def user_interaction_rate(self, username):
        user_posts = [p for p in self._posts_data if p.get("username") == username and "views" in p]
        if not user_posts:
            raise ValueError(f"no posts with views for {username}")
        rates = [interaction_rate(p) for p in user_posts]
        return round(sum(rates) / len(rates), 2)

    #method 4: get user reliability score
    def user_reliability(self, username):
        stats = self.get_user_stats()
        if username not in stats:
            raise ValueError(f"user {username} not found")
        total = stats[username]["total_posts"]
        misinfo = stats[username]["disinformation_posts"]
        return round(((total - misinfo) / total) * 100, 2) if total > 0 else 100.0

    #method 5: add new post
    def add_post(self, post_data):
        if not isinstance(post_data, dict):
            raise TypeError("post_data must be a dictionary")
        self._posts_data.append(post_data)

    def __str__(self):
        return f"usertracker: {self.user_count} users, {len(self._posts_data)} posts"

    def __repr__(self):
        return f"usertracker(posts_data=[{len(self._posts_data)} posts])"

In [None]:
from datetime import datetime, timedelta
from typing import List, Dict

#weekly interactions function from project 1
def total_interactions_this_week(posts_data):
    week_start = datetime.utcnow() - timedelta(days=7)
    total_upvotes = 0
    total_comments = 0
    post_count = 0

    for post in posts_data:
        post_date = datetime.fromisoformat(post["created_utc"])
        if post_date >= week_start:
            post_count += 1
            total_upvotes += post["upvotes"]
            total_comments += post["comments"]

    return {
        "total_posts": post_count,
        "total_upvotes": total_upvotes,
        "total_comments": total_comments,
        "total_interactions": total_upvotes + total_comments
    }

#compare engagement function from project 1
def compare_engagement(posts_data, group_by="category"):
    engagement_by_group = {}
    count_by_group = {}

    for post in posts_data:
        group = post[group_by]
        engagement = post.get("upvotes", 0) + post.get("comments", 0)
        engagement_by_group[group] = engagement_by_group.get(group, 0) + engagement
        count_by_group[group] = count_by_group.get(group, 0) + 1

    return {g: round(engagement_by_group[g] / count_by_group[g], 2) for g in engagement_by_group}

#top posts function from project 1
def track_top_posts(posts_data, top_n=5):
    for post in posts_data:
        post["total_interactions"] = post["upvotes"] + post["comments"]
    sorted_posts = sorted(posts_data, key=lambda x: x["total_interactions"], reverse=True)
    return sorted_posts[:top_n]

class EngagementAnalyzer:
    """analyzes post engagement and trends."""

    def __init__(self, posts_data: List[Dict] = None):
        if posts_data and not isinstance(posts_data, list):
            raise TypeError("posts_data must be a list")
        self._posts_data = posts_data or []

    @property
    def posts_data(self):
        return self._posts_data

    @property
    def total_engagement(self):
        return sum(p.get("upvotes", 0) + p.get("comments", 0) for p in self._posts_data)

    #method 1: get weekly summary
    def weekly_summary(self):
        if not self._posts_data:
            return {"total_posts": 0, "total_upvotes": 0, "total_comments": 0, "total_interactions": 0}
        return total_interactions_this_week(self._posts_data)

    #method 2: compare by category
    def compare_categories(self, group_by="category"):
        if not self._posts_data:
            return {}
        return compare_engagement(self._posts_data, group_by)

    #method 3: get top posts
    def top_posts(self, top_n=5):
        if not self._posts_data:
            return []
        return track_top_posts(self._posts_data, top_n)

    #method 4: calculate average engagement
    def avg_engagement(self):
        if not self._posts_data:
            return 0.0
        return round(self.total_engagement / len(self._posts_data), 2)

    #method 5: add new post
    def add_post(self, post_data):
        if not isinstance(post_data, dict):
            raise TypeError("post_data must be a dictionary")
        self._posts_data.append(post_data)

    def __str__(self):
        return f"engagementanalyzer: {len(self._posts_data)} posts, {self.total_engagement} engagement"

    def __repr__(self):
        return f"engagementanalyzer(posts_data=[{len(self._posts_data)} posts])"

In [None]:
import re
from typing import List, Dict, Any

#Group 1 Cleaning function original function
def clean_post_text(text: str) -> str:
    if not isinstance(text, str):
        raise TypeError("Input must be a string.")

    text = re.sub(r"http\S+|www\S+|https\S+", "", text)
    text = re.sub(r"[@#]\w+", "", text)
    text = re.sub(r"[^a-zA-Z\s]", "", text)
    text = text.lower()
    text = re.sub(r"\s+", " ", text).strip()
    return text

#Group 1 removing duplicates orignal function
def check_duplicate(new_post: str, existing_posts: List[str], similarity_threshold: float = 0.9) -> bool:
    if not isinstance(new_post, str):
        raise TypeError("new_post must be a string.")
    if not isinstance(existing_posts, list) or not all(isinstance(p, str) for p in existing_posts):
        raise TypeError("existing_posts must be a list of strings.")
    if not (0 <= similarity_threshold <= 1):
        raise ValueError("similarity_threshold must be between 0 and 1.")

    def _clean_text(text):
        text = re.sub(r"http\S+|www\S+", "", text)
        text = re.sub(r"[^a-zA-Z\s]", "", text).lower()
        text = re.sub(r"\s+", " ", text).strip()
        return text

    new_post_clean = _clean_text(new_post)
    new_words = set(new_post_clean.split())

    for post in existing_posts:
        existing_clean = _clean_text(post)
        existing_words = set(existing_clean.split())
        if not existing_words:
            continue

        overlap = len(new_words & existing_words) / len(new_words | existing_words)
        if overlap >= similarity_threshold:
            return True
    return False

class PostCleaner:

    def __init__(self, post_text: str, existing_posts: List[str] = None, similarity_threshold: float = 0.9):
        if not isinstance(post_text, str):
            raise TypeError("post_text must be a string.")
        if existing_posts is not None:
            if not isinstance(existing_posts, list) or not all(isinstance(p, str) for p in existing_posts):
                raise TypeError("existing_posts must be a list of strings.")
        if not (0 <= similarity_threshold <= 1):
            raise ValueError("similarity_threshold must be between 0 and 1.")

        self._post_text = post_text
        self._existing_posts = existing_posts or []
        self._similarity_threshold = similarity_threshold
        self._clean_history: List[Dict[str, Any]] = []

    @property
    def post_text(self) -> str:
        return self._post_text

    @property
    def existing_posts(self) -> List[str]:
        return list(self._existing_posts)

    @property
    def similarity_threshold(self) -> float:
        return self._similarity_threshold

    # Method 1
    def clean_text(self) -> str:
        cleaned = clean_post_text(self._post_text)
        self._clean_history.append({"type": "clean", "original": self._post_text[:50], "result": cleaned})
        return cleaned

    # Method 2
    def is_duplicate(self) -> bool:
        result = check_duplicate(self._post_text, self._existing_posts, self._similarity_threshold)
        self._clean_history.append({"type": "duplicate", "text": self._post_text[:50], "result": result})
        return result

    # Method 3
    def summarize_post(self) -> Dict[str, Any]:
        cleaned = self.clean_text()
        duplicate = self.is_duplicate()
        return {"cleaned_text": cleaned, "is_duplicate": duplicate}

    # Method 4
    def history_count(self) -> int:
        return len(self._clean_history)


In [None]:
from typing import List, Dict, Any

#Group 2 original function
def total_metadata_type(posts_metadata: List[Dict[str, Any]]) -> Dict[str, Dict[str, int]]:
    if not isinstance(posts_metadata, list):
        raise TypeError("posts_metadata must be a list of dictionaries.")
    if not posts_metadata:
        raise ValueError("posts_metadata cannot be empty.")
    if not all(isinstance(post, dict) for post in posts_metadata):
        raise TypeError("Each element must be a dictionary.")

    summary = {}
    for post in posts_metadata:
        for key, value in post.items():
            summary.setdefault(key, {})
            str_value = str(value)
            summary[key][str_value] = summary[key].get(str_value, 0) + 1
    return summary

#Group 2 original function
def post_type(post_data: Dict[str, Any]) -> str:
    if not isinstance(post_data, dict):
        raise TypeError("post_data must be a dictionary.")

    if post_data.get("post_hint") == "image":
        return "Pictures"

    url = post_data.get("url", "").lower()
    image_extensions = [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]
    if any(url.endswith(ext) for ext in image_extensions):
        return "Pictures"

    selftext = post_data.get("selftext", "")
    if isinstance(selftext, str) and selftext.strip():
        return "Words"

    return "Other"

#Group 2 original function
def analyze_post_lengths_by_category(posts: List[str], category_keywords: Dict[str, List[str]]) -> Dict[str, Any]:
    if not isinstance(posts, list):
        raise TypeError("posts must be a list of strings.")
    if not isinstance(category_keywords, dict):
        raise TypeError("category_keywords must be a dictionary.")

    categorized = {}
    for post in posts:
        if not isinstance(post, str) or not post.strip():
            continue
        text = post.lower()
        category = "other"
        for cat, keywords in category_keywords.items():
            if any(kw in text for kw in keywords):
                category = cat
                break
        word_count = len(post.split())
        categorized.setdefault(category, []).append((post, word_count))

    results = {}
    for cat, items in categorized.items():
        lengths = [length for _, length in items]
        avg_length = round(sum(lengths) / len(lengths), 2)
        results[cat] = {"posts": items, "average_length": avg_length}
    return results


class MetadataAnalyzer:
    def __init__(self, posts_metadata: List[Dict[str, Any]] = None):
        if posts_metadata is not None:
            if not isinstance(posts_metadata, list) or not all(isinstance(p, dict) for p in posts_metadata):
                raise TypeError("posts_metadata must be a list of dictionaries.")
        self._posts_metadata = posts_metadata or []
        self._analysis_history: List[Dict[str, Any]] = []

    @property
    def posts_metadata(self) -> List[Dict[str, Any]]:
        return list(self._posts_metadata)

    @property
    def analysis_count(self) -> int:
        return len(self._analysis_history)

    # Method 1
    def summarize_metadata(self, posts_metadata: List[Dict[str, Any]] = None) -> Dict[str, Dict[str, int]]:
        data = posts_metadata or self._posts_metadata
        summary = total_metadata_type(data)
        self._analysis_history.append({"type": "summary", "records": len(data)})
        return summary

    # Method 2
    def classify_post(self, post_data: Dict[str, Any]) -> str:
        category = post_type(post_data)
        self._analysis_history.append({"type": "classification", "result": category})
        return category

    # Method 3
    def analyze_lengths(self, posts: List[str], category_keywords: Dict[str, List[str]]) -> Dict[str, Any]:
        results = analyze_post_lengths_by_category(posts, category_keywords)
        self._analysis_history.append({"type": "length_analysis", "categories": len(results)})
        return results

    # Method 4
    def metadata_report(self, posts: List[str], category_keywords: Dict[str, List[str]]) -> Dict[str, Any]:
        summary = self.summarize_metadata()
        lengths = self.analyze_lengths(posts, category_keywords)
        report = {"summary": summary, "lengths": lengths}
        self._analysis_history.append({"type": "report", "result": "combined"})
        return report
