# German Startup Newsletter Research Pipeline

This notebook implements an automated research pipeline for generating a German startup newsletter. The system uses AI agents to:

1. **Research** - Find recent German startup news articles using Google Search
2. **Analyze** - Summarize and evaluate article relevance using AI
3. **Report** - Generate a structured newsletter with insights

The pipeline leverages multiple AI models and tools to create a comprehensive, automated news gathering and reporting system.


## Setup and Configuration

This section initializes the research pipeline with:

- **Environment Setup**: Loading environment variables and configuring Langfuse for observability
- **MCP Server Configuration**: Setting up the Serper MCP server for Google Search API access

The system uses Langfuse for tracing and monitoring the AI agent interactions, providing visibility into the research process.


In [None]:
from dotenv import load_dotenv
from agents.mcp import MCPServerStdio
from datetime import datetime
from agents import Agent, Runner, trace
from IPython.display import display, Markdown
import os
import nest_asyncio
from openinference.instrumentation.openai_agents import OpenAIAgentsInstrumentor
from langfuse import get_client
from pydantic import BaseModel
import asyncio
 

nest_asyncio.apply()

# Load .env file
load_dotenv(override=True)

# Setup Langfuse
OpenAIAgentsInstrumentor().instrument()
langfuse = get_client()
 
# Verify connection
if langfuse.auth_check():
    print("Langfuse client is authenticated and ready!")
else:
    print("Authentication failed. Please check your credentials and host.")


In [None]:
serper_mcp_server_params = {"command": "uvx", "args": [ "serper-mcp-server" ], "env": {
        "SERPER_API_KEY": os.getenv("SERPER_API_KEY"),
    }
}

## Data Models and Structures

This section defines the Pydantic models that structure the data flow through the pipeline:

- **Article**: Basic article information (title, date, URL)
- **ResearchResults**: Container for multiple articles from research
- **ArticleSummary**: Extended article with AI-generated summary
- **ArticleEvaluation**: Relevance scoring and topic matching
- **ArticleReport**: Final comprehensive article report

These models ensure type safety and consistent data structure throughout the research pipeline.


In [None]:
class Article(BaseModel):
    title: str
    date: str
    url: str

class ResearchResults(BaseModel):
    articles: list[Article]

class ResearchResultsEvaluation(BaseModel):
    is_acceptable: bool
    acceptable_articles: list[Article]
    reason: str

In [None]:
class ArticleSummary(BaseModel):
    title: str
    date: str
    url: str
    summary: str

class ArticleEvaluation(BaseModel):
    relevance_score: float
    best_matching_topic: str

class ArticleReport(BaseModel):
    title: str
    date: str
    url: str
    summary: str
    relevance_score: float
    best_matching_topic: str

## Research Agent Configuration

This section configures the research agent with:

- **Search Parameters**: Google Search settings (language, number of results, time filter)
- **Search Queries**: Multiple German startup-related search terms
- **Agent Instructions**: Detailed prompts for the research agent to find relevant articles

The research agent uses the Serper MCP server to execute Google News searches and compile a list of relevant German startup articles.


In [None]:
instructions = "You are a highly efficient 'Search & Discovery' agent for a startup news team. Your sole purpose is to find the most recent and relevant news articles about German startups."
request = f"Use the `google_search_news` with the following parameters: \
            `hl = en, de`, `num=20` and `tbs=qdr:w` \
            **Your Plan:** \
            1. Execute the following query: `Startups germany`, `Startups Deutschland`, `Finanzierungsrunden Startups Deutschland` \
            2. Compile a list of the 10 most relevant and unique articles \
            5. Return the list articles."

model = "gpt-4.1-nano"

In [None]:
# Research function
from langfuse import observe

@observe(name="research-agent - researching articles")
async def research_article_urls(instructions: str, request: str) -> list[Article]:
    async with MCPServerStdio(params=serper_mcp_server_params, client_session_timeout_seconds=30) as mcp:
        agent = Agent(name="research-agent", model=model, instructions=instructions, mcp_servers=[mcp], output_type=ResearchResults)
        
        result = await Runner.run(agent, request)

    articles = result.final_output.articles

    return articles

## Article Evaluation System

This section implements an AI-powered article evaluation system using:

- **Sentence Transformers**: Used to calculate embeddings
- **Cosine Similarity**: Calculates similarity between articles and target topics
- **Relevance Scoring**: Assigns relevance scores and best matching topics
- **Topic Classification**: Automatically categorizes articles by topic

The evaluation system ensures only relevant, high-quality articles are included in the final newsletter.


### Topic Classification System

This section defines a comprehensive set of target topics for article classification:

- **Funding & Investment**: Covers funding rounds, investments, IPOs, exits
- **Industry-Specific**: Fintech, healthtech, edtech, AI, cybersecurity, etc.
- **Geographic & Ecosystem**: Berlin, Munich, Hamburg startup scenes
- **Technology & Innovation**: AI, blockchain, IoT, robotics, deep tech
- **Business & Market**: Hiring, scaling, partnerships, growth
- **Policy & Regulation**: Government support, visas, EU policy

These topics are used for semantic similarity matching to evaluate article relevance and categorize content.


In [None]:
target_topics = [
    # Funding & Investment
    "startups funding rounds",
    "startup investments", 
    "venture capital news",
    "startup Series A funding",
    "startup seed funding",
    "startup IPO news",
    "startup exit deals",
    "startup M&A activity",
    "startup unicorn news",
    
    # Industry-Specific
    "fintech startups",
    "healthtech startups",
    "edtech startups", 
    "proptech startups",
    "cleantech startups",
    "AI startups ",
    "cybersecurity startups",
    "SaaS startups",
    "biotech startups",
    "mobility startups",
    
    # Geographic & Ecosystem
    "Berlin startup ecosystem",
    "Munich startup scene", 
    "Hamburg startups",
    "startup accelerators",
    "startup incubators",
    "startup hubs",
    "startup events",
    "startup conferences",
    
    # Technology & Innovation
    "AI innovation",
    "machine learning startups",
    "blockchain startups",
    "IoT startups",
    "robotics startups",
    "deep tech startups",
    "B2B startups",
    "platform startups",
    
    # Business & Market
    "startup hiring",
    "startup talent",
    "startup founders",
    "startup scaling",
    "startup growth",
    "startup expansion",
    "startup international",
    "startup partnerships",
    
    # Policy & Regulation
    "startup policy",
    "startup regulation",
    "startup visa",
    "startup government support",
    "startup EU policy",
    "startup innovation policy"
]

In [None]:
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity


embedding_model = SentenceTransformer('all-MiniLM-L6-v2')

target_topic_embeddings = embedding_model.encode(target_topics)

@observe(name="evaluate article")
def evaluate_article(article: Article) -> ArticleEvaluation:
    article_embedding = embedding_model.encode(article.title + " " + article.summary)

    article_embedding_2d = article_embedding.reshape(1, -1)

    # Calculate similarity with all target topics
    similarities = cosine_similarity(article_embedding_2d, target_topic_embeddings)

    # Get the highest similarity score
    max_similarity = float(np.max(similarities))
    best_topic_idx = int(np.argmax(similarities))
    best_topic = target_topics[best_topic_idx]

    return ArticleEvaluation(
        relevance_score=max_similarity,
        best_matching_topic=best_topic
    )
    

## Article Summarization Pipeline

This section implements the article analysis and summarization process:

- **Content Fetching**: Uses MCP fetch server to retrieve full article content
- **AI Summarization**: Analyzes articles and creates structured summaries
- **Quality Control**: Combines summarization with relevance evaluation
- **Report Generation**: Creates comprehensive article reports with scores and topics

The analyst agent processes each article to extract key insights and create engaging, structured summaries for the newsletter.


In [None]:
fetch_mcp_server_params = {"command": "uvx", "args": ["mcp-server-fetch"]}

instructions_summarize_article = "You are a highly efficient analyst with a background in startup domain. You have a talent in presenting news and articles in a structured, easy-to-read, enganging and fun way. \
    Your sole purpose is to summarize news articles about German startups."

@observe(name="analyst-agent - summarizing article")
async def summarize_article(article: Article) -> ArticleReport:
    request_summarize_article = f"Use your tool `fetch` to fetch the article by its `url`. \
        Analyse the article and create a insightful report, including all key take aways. The report should be easy-to-read und up to 150 words. \
        **Article**: {article}"
    
    async with MCPServerStdio(params=fetch_mcp_server_params, client_session_timeout_seconds=30) as mcp:
        agent = Agent(name="analyst-agent", model=model, instructions=instructions_summarize_article, mcp_servers=[mcp], output_type=ArticleSummary)
        
        result = await Runner.run(agent, request_summarize_article)

    article_summary = result.final_output
    article_evaluation = evaluate_article(article_summary)

    return ArticleReport(
        title=article_summary.title,
        date=article_summary.date,
        url=article_summary.url,
        summary=article_summary.summary,
        relevance_score=article_evaluation.relevance_score,
        best_matching_topic=article_evaluation.best_matching_topic
    )
    

## Newsletter Report Generation

This section handles the final newsletter creation:

- **Content Assembly**: Combines all analyzed articles into a structured newsletter
- **Template Formatting**: Uses a predefined template with Introduction, TLDR, Articles, and Conclusion
- **Markdown Generation**: Creates clean, formatted markdown output
- **File Management**: Automatically saves reports with timestamps

The reporter agent creates a professional newsletter format that's ready for publication or further editing.


In [None]:
# Write up

instructions_write_up = "You are a highly efficient writer for a news team. Your sole purpose is to write up a the summarized articles in a structured way."

@observe(name="reporter-agent - writing up report")
async def write_report(article_reports: list[ArticleReport]) -> str:
    request_write_up = f"Write up the following articles into a newsletter: {article_reports} \
        Follow the following template: \
        # Introduction \
        # TLDR \
        # Articles \
        # Per article: Headline, Date, Summary, Read more with Link, Relevance Score, Best Matching Topic \
        # Conclusion \
        Only return clean markdown, no other text."
    
    agent = Agent(name="reporter-agent", model=model, instructions=instructions_write_up)
        
    result = await Runner.run(agent, request_write_up)

    report_content = result.final_output

     # Save to markdown file
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"newsletter_report_{timestamp}.md"
    
    with open(filename, 'w', encoding='utf-8') as f:
        f.write(report_content)
    
    print(f"Report saved to: {filename}")
    
    return report_content


## Complete Research Pipeline

This section orchestrates the entire research process:

1. **Research Phase**: Finds relevant German startup articles using Google Search
2. **Analysis Phase**: Summarizes and evaluates each article for relevance
3. **Reporting Phase**: Generates a comprehensive newsletter report

The pipeline runs asynchronously and includes observability through Langfuse tracing. The final output is a timestamped markdown file containing the complete newsletter.


In [None]:
@observe(name="News research")
async def run_research():
    articles = await research_article_urls(instructions, request)

    article_summaries = []

    for article in articles:
        summary = await summarize_article(article)
        article_summaries.append(summary)

    report_content = await write_report(article_summaries)

    return report_content

## Execution and Results

This section executes the complete research pipeline and displays the results:

- **Pipeline Execution**: Runs the full research, analysis, and reporting process
- **Result Display**: Shows the generated newsletter in markdown format
- **File Output**: Automatically saves the newsletter to a timestamped file
- **Observability**: All steps are traced and monitored through Langfuse

The final output is a comprehensive German startup newsletter ready for distribution.


In [None]:
import asyncio

loop = asyncio.get_running_loop()

# Execute the function
report_content = await loop.create_task(run_research())

# Display the content
display(Markdown(report_content))