In [2]:
import os, json, re, getpass
from dotenv import load_dotenv

load_dotenv(override=True)

True

In [3]:
from typing import Type
from pydantic import BaseModel, Field
from linkup import LinkupClient
from crewai import Agent, Task, Crew, Process, LLM
from crewai.tools import BaseTool


def get_llm_client():
    """Initialize and return the LLM client"""
    return LLM(model="openai/gpt-4o-mini")


In [4]:
# Define Linkup Search Tool
class LinkupSearchInput(BaseModel):
    """Input schema for Linkup Search Tool."""
    query: str = Field(description="The search query to perform")
    depth: str = Field(default="standard",
                       description="Depth of search: 'standard' or 'deep'")
    output_type: str = Field(
        default="searchResults", description="Output type: 'searchResults', 'sourcedAnswer', or 'structured'")

class LinkupSearchTool(BaseTool):
    name: str = "Linkup Search"
    description: str = "Search the web for information using Linkup and return comprehensive results"
    args_schema: Type[BaseModel] = LinkupSearchInput

    def __init__(self):
        super().__init__()

    def _run(self, query: str, depth: str = "standard", output_type: str = "searchResults") -> str:
        """Execute Linkup search and return results."""
        try:
            # Initialize Linkup client with API key from environment variables
            linkup_client = LinkupClient(api_key=os.getenv("LINKUP_API_KEY"))

            # Perform search
            search_response = linkup_client.search(
                query=query,
                depth=depth,
                output_type=output_type
            )

            return str(search_response)
        except Exception as e:
            return f"Error occurred while searching: {str(e)}"

In [5]:

from youtube_search import YoutubeSearch
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_comment_downloader import YoutubeCommentDownloader, SORT_BY_POPULAR
from itertools import islice

def extract_video_id(url: str) -> str:
    """Extract the video ID from YouTube URL."""
    if "v=" in url:
        return url.split("v=")[-1].split("&")[0]
    elif "youtu.be/" in url:
        return url.split("youtu.be/")[-1].split("?")[0]
    else:
        return None


def get_transcript_text(video_url: str) -> str:
    """Fetch English transcript and return as plain text."""
    try:
        video_id = extract_video_id(video_url)
        if not video_id:
            return "Invalid YouTube URL."

        ytt_api = YouTubeTranscriptApi()
        transcript = ytt_api.fetch(video_id)  # Correct up-to-date method
        # Combine transcript text
        full_text = " ".join([snippet.text for snippet in transcript])
        return full_text
    except Exception as e:
        return f"Transcript error: {e}"


def get_top_comments(video_url: str, max_comments: int = 50) -> str:
    """Fetch top YouTube comments (up to `max_comments`)."""
    try:
        video_id = extract_video_id(video_url)
        if not video_id:
            return "Invalid YouTube URL."

        downloader = YoutubeCommentDownloader()
        comments_iter = downloader.get_comments_from_url(
            f"https://www.youtube.com/watch?v={video_id}",
            sort_by=SORT_BY_POPULAR
        )

        texts = [c.get("text", "").strip() for c in islice(comments_iter, max_comments) if c.get("text")]
        return "\n\n".join(texts) if texts else "No comments found."
    except Exception as e:
        return f"Comments error: {e}"



class YouTubeSearchInput(BaseModel):
    query: str = Field(description="The search query for YouTube")


class YouTube_Search_Tool(BaseTool):
    name: str = "YouTube Search"
    description: str = "Searches YouTube and returns video metadata and engagement info"
    args_schema: Type[BaseModel] = YouTubeSearchInput 

    def __init__(self):
        super().__init__()

    def _run(self, query: str) -> str:
        try:
            results = json.loads(YoutubeSearch(query, max_results=10).to_json())
            video_summaries = []

            for video in results.get("videos", []):
                title = video.get("title")
                channel = video.get("channel")
                duration = video.get("duration")
                views = video.get("views")
                publish_time = video.get("publish_time")
                url_suffix = video.get("url_suffix")

                full_url = f"https://www.youtube.com{url_suffix}"
                
                if "/shorts/" in full_url:
                    continue

                summary = f"""Title       : {title}
                                Channel     : {channel}
                                Duration    : {duration}
                                Views       : {views}
                                Published   : {publish_time}
                                URL         : {full_url}

                                Transcript  : {get_transcript_text(full_url)}

                                Top Comments:
                                {get_top_comments(full_url)}
                                {'-' * 80}
                                                """.strip()

                video_summaries.append(summary)

            return str("\n\n".join(video_summaries))

        except Exception as e:
            return f"Error during YouTube search: {str(e)}"


In [6]:
def create_atomberg_sov_crew(query: str, top_n: int = 10):
    """Create CrewAI agents and tasks for Share of Voice analysis across Web and YouTube."""

    # Tools
    linkup_search_tool = LinkupSearchTool()
    youtube_search_tool = YouTube_Search_Tool()
    llm = get_llm_client()

    # Agents
    search_agent = Agent(
        role="Web Searcher",
        goal="Collect top-N search results from web sources for a given query.",
        backstory="Expert in search engines like Google, X, Instagram, and Reddit.",
        tools=[linkup_search_tool],
        llm=llm,
        verbose=False,
    )

    youtube_agent = Agent(
        role="YouTube Researcher",
        goal="Find top YouTube videos, transcripts, and comments for a given query.",
        backstory="Specialist in sourcing relevant YouTube videos, transcripts, and audience sentiment.",
        tools=[youtube_search_tool],
        llm=llm,
        verbose=False,
    )

    sov_analyst = Agent(
        role="SoV Analyst",
        goal="Quantify Atomberg's Share of Voice (mentions, sentiment, engagement) across web and YouTube.",
        backstory="Skilled at entity recognition, sentiment analysis, and competitive benchmarking.",
        llm=llm,
        verbose=False,
    )

    insight_agent = Agent(
        role="Insight Synthesizer",
        goal="Generate strategic insights from Share of Voice data across all platforms.",
        backstory="Combines analytical insight with strategic vision to guide marketing direction.",
        llm=llm,
        verbose=False,
    )

    writer_agent = Agent(
        role="Marketing Report Writer",
        goal="Write a comprehensive markdown report with insights, SoV tables, and recommendations.",
        backstory="Crafts engaging and data-driven briefs for internal teams.",
        llm=llm,
        verbose=False,
    )

    # Tasks
    search_task = Task(
        description=f"Use Linkup to fetch top {top_n} Google/Reddit/Instagram results for query: '{query}'. Return titles, snippets, URLs, and any engagement metrics.",
        agent=search_agent,
        tools=[linkup_search_tool],
        expected_output="Top web results with metadata and engagement signals.",
    )

    youtube_search_task = Task(
        description=f"Use YouTubeSearchTool to fetch top {top_n} YouTube videos for query: '{query}'. Return title, channel, URL, transcript, and top comments.",
        agent=youtube_agent,
        tools=[youtube_search_tool],
        expected_output="Top YouTube video summaries with transcript and audience comments.",
    )

    analysis_task = Task(
        description="Analyze both web and YouTube results. Identify Atomberg and competitor mentions, perform sentiment and engagement analysis. Compute Atomberg's Share of Voice (SoV).",
        agent=sov_analyst,
        context=[search_task, youtube_search_task],
        expected_output="Table comparing SoV across platforms, with sentiment and engagement metrics.",
    )

    insight_task = Task(
        description="Synthesize key findings: where Atomberg leads or lags, content opportunities, competitor strengths, and platform-specific patterns.",
        agent=insight_agent,
        context=[analysis_task],
        expected_output="Strategic recommendations with insights across platforms and keywords.",
    )

    report_task = Task(
        description="Write a markdown report including SoV table, insights, platform trends, and action recommendations.",
        agent=writer_agent,
        context=[insight_task],
        expected_output="Final markdown marketing brief with citations and takeaways.",
    )

    crew = Crew(
        agents=[search_agent, youtube_agent, sov_analyst, insight_agent, writer_agent],
        tasks=[search_task, youtube_search_task, analysis_task, insight_task, report_task],
        process=Process.sequential,
        verbose=True,
    )

    return crew


In [None]:
query = "smart fan"
crew = create_atomberg_sov_crew(query, top_n=10)
result = crew.kickoff()
#print(result.raw)
with open('markdown.md', 'w', encoding='utf-8') as f:
    f.write(result.raw)



In [11]:
import pypandoc

pypandoc.convert_file(
    'markdown.md',
    'pdf',
    outputfile='output.pdf',
    extra_args=[
        '--standalone',
        '--pdf-engine=xelatex',  # or lualatex
        '--variable', 'geometry:margin=1in',  # you can make this smaller
    ]
)


xelatex: major issue: So far, no MiKTeX administrator has checked for updates.
miktex-dvipdfmx: major issue: So far, no MiKTeX administrator has checked for updates.
xelatex: major issue: So far, no MiKTeX administrator has checked for updates.



''