# Blog Posts Data Ingestion Pipeline

This notebook implements a data ingestion pipeline to extract blog posts from the aymenfurter.ch website, process the information using AI, and store it in a structured format. The pipeline leverages Azure OpenAI for intelligent data extraction and the Microsoft Copilot Platform (MCP) for web browsing capabilities.

### 1. Reading Secrets from Azure Key Vault
This section retrieves the necessary credentials from Azure Key Vault to authenticate with the required services.
- **MCP Subscription Key**: To access the Microsoft Copilot Platform (MCP) server for web crawling.
- **Azure OpenAI API Key**: To authenticate with the Azure OpenAI service for data extraction.

This notebook is designed for environments like Microsoft Fabric, where `notebookutils` provides seamless access to Key Vault secrets. Ensure the Key Vault is populated and the notebook's execution identity has appropriate permissions.

In [None]:
key_vault_uri = "https://yourkeyvault.vault.azure.net/"
MCP_SUBSCRIPTION_KEY = notebookutils.credentials.getSecret(key_vault_uri, "mcp-key")
AZURE_OPENAI_API_KEY = notebookutils.credentials.getSecret(key_vault_uri, "openai-key")

### 2. Service and Tool Configuration
This section configures the clients and tools required for the data extraction process.

- **Azure OpenAI Client**: An `AzureOpenAI` client is initialized with the endpoint, API key, and API version. This client will be used to make calls to the GPT model.
- **MCP Tool**: A tool definition for the Model Context Protocol (MCP) is created. This tool enables the AI model to perform web browsing actions through Playwright, such as navigating to URLs to fetch blog data. The tool is configured with the MCP server URL and the necessary authentication headers.

In [None]:
from datetime import datetime
from openai import AzureOpenAI

AZURE_OPENAI_DEPLOYMENT = "gpt-4.1"
AZURE_OPENAI_ENDPOINT = "https://your-endpoint.openai.azure.com/"
API_VERSION = "preview"

MCP_SERVER_URL = "https://mcp-apim-your-endpoint.azure-api.net/mcp"

client = AzureOpenAI(
    api_key=AZURE_OPENAI_API_KEY,
    base_url=f"{AZURE_OPENAI_ENDPOINT}openai/v1/",
    api_version="preview"
)

mcp_tool = {
    "type": "mcp",
    "server_label": "mcp-server",
    "server_url": MCP_SERVER_URL,
    "headers": {"Ocp-Apim-Subscription-Key": MCP_SUBSCRIPTION_KEY},
    "require_approval": "never"
}

### 3. Defining Data Structures with Pydantic
To ensure the extracted data is structured and validated, we define Pydantic models. These models serve as schemas for the expected output from the AI model.

- **`BlogUrlResult`**: Defines the structure for the initial response containing a list of blog post URLs.
- **`BlogDetails`**: A comprehensive model that defines the schema for all the details of a single blog post, including title, content, posting date, and more.
- **`ExtractionResult`**: A wrapper model to hold the result of a single blog detail extraction, including the status and the `BlogDetails` object.

In [None]:
from pydantic import BaseModel, Field
from typing import List, Optional

class BlogUrlResult(BaseModel):
    blog_urls: List[str] = Field(..., description="List of blog post URLs extracted from the page")
    total_found: int = Field(..., description="Total number of blog post URLs found")
    extraction_successful: bool = Field(..., description="Whether the URL extraction was successful")
    error_message: Optional[str] = Field(None, description="Error message if extraction failed")

class BlogDetails(BaseModel):
    title: str = Field(..., description="The title of the blog post")
    url: str = Field(..., description="The URL of the blog post")
    content: str = Field(..., description="The full content of the blog post")
    posting_date: Optional[str] = Field(None, description="The date the blog post was published")
    duration_minutes: Optional[int] = Field(None, description="The estimated reading time in minutes")
    topics: List[str] = Field(default_factory=list, description="A list of topics or tags associated with the blog post")
    technologies_used: List[str] = Field(default_factory=list, description="A list of technologies mentioned or used in the blog post")

class ExtractionResult(BaseModel):
    extraction_successful: bool = Field(..., description="Whether extraction was successful")
    error_message: Optional[str] = None
    blog_details: Optional[BlogDetails] = None
    url_processed: Optional[str] = None

### 4. AI-Powered Extraction Functions
These functions orchestrate the interaction with the Azure OpenAI service to perform the extraction tasks. They use the `client.responses.parse` method, which simplifies making a request to the model and parsing the structured response directly into the Pydantic models defined earlier.

- **`extract_blog_urls()`**: Instructs the AI model to navigate to the `aymenfurter.ch` articles page and extract all available blog post URLs.
- **`extract_blog_details(blog_url)`**: Takes a blog post URL, instructs the model to navigate to it, and extracts detailed information according to the `BlogDetails` model.

In [None]:
def extract_blog_urls():
    prompt = "Navigate to: https://aymenfurter.ch/articles/ and extract all blog post URLs."
    print("STEP 1: Extracting blog URLs")
    return client.responses.parse(
        model=AZURE_OPENAI_DEPLOYMENT,
        input=[
            {"role": "system", "content": "You are a scraper that extracts blog post URLs from a webpage."},
            {"role": "user", "content": prompt}
        ],
        tools=[mcp_tool],
        text_format=BlogUrlResult
    ).output_parsed

def extract_blog_details(blog_url):
    detail_prompt = f"""Navigate to: {blog_url}
Extract the blog post details. This includes the full content, posting date, duration in minutes, topics, and any technologies mentioned.
The URL of the blog post is {blog_url}."""
    return client.responses.parse(
        model=AZURE_OPENAI_DEPLOYMENT,
        input=[
            {"role": "system", "content": "Extract blog post data clearly and return structured output."},
            {"role": "user", "content": detail_prompt}
        ],
        tools=[mcp_tool],
        text_format=ExtractionResult
    ).output_parsed

### 5. Spark DataFrame Schema and Record Preparation
This section defines helper functions to prepare the extracted data for storage in a Spark DataFrame.

- **`spark_schema_from_model(model_cls)`**: Dynamically generates a Spark `StructType` schema from a Pydantic model. It also adds metadata fields like `extraction_status`, `error_message`, and `extraction_timestamp`.
- **`build_record(...)`**: Transforms an extracted `BlogDetails` object into a dictionary that conforms to the Spark schema. It handles the serialization of list-based fields (like topics and technologies) into JSON strings for storage.

In [None]:
import json
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType
from typing import get_origin, get_args

def spark_schema_from_model(model_cls):
    fields = []
    type_mapping = {
        str: StringType(),
        int: IntegerType()
    }
    for name, model_field in model_cls.model_fields.items():
        field_type = model_field.annotation
        if get_origin(field_type) is type(Optional[str]):
            field_type = get_args(field_type)[0]
        if get_origin(field_type) is list:
            spark_type = StringType()
        else:
            spark_type = type_mapping.get(field_type, StringType())
        fields.append(StructField(name, spark_type, True))
    
    extras = [
        ("extraction_status", StringType()),
        ("error_message", StringType()),
        ("extraction_timestamp", TimestampType())
    ]
    for name, dtype in extras:
        fields.append(StructField(name, dtype, True))
    return StructType(fields)

def build_record(extraction_timestamp, blog_details: BlogDetails):
    base = blog_details.model_dump(exclude_none=True)
    record = {}
    for k, v in base.items():
        if isinstance(v, list):
            record[k] = json.dumps(v)
        else:
            record[k] = v
    record["extraction_status"] = "success"
    record["error_message"] = None
    record["extraction_timestamp"] = extraction_timestamp
    return record

### 6. Main Execution: Extract, Transform, and Load
This is the main execution block that runs the end-to-end data ingestion pipeline.

1.  **Extract Blog URLs**: Calls `extract_blog_urls()` to get a list of all blog postings.
2.  **Extract Blog Details**: Iterates through the list of URLs, calling `extract_blog_details()` for each one to get detailed information.
3.  **Create DataFrame**: The extracted and transformed records are used to create a Spark DataFrame.
4.  **Save to Table**: The DataFrame is written to a Delta table, overwriting any existing data. This makes the blog posts data available for querying and analysis.

In [None]:
from pyspark.sql import functions as F

print("STEP 1: Extracting blog URLs")
urls_result = extract_blog_urls()
print(f"STEP 1 RESULTS: Found {urls_result.total_found} blog URLs")

if urls_result.extraction_successful and urls_result.blog_urls:
    extraction_timestamp = datetime.now()
    to_process = urls_result.blog_urls
    records = []
    for url in to_process:
        print(f"STEP 2: Extracting blog details from {url}")
        result = extract_blog_details(url)
        blog = result.blog_details
        if blog:
            print(f"Success: {blog.title}")
            records.append(build_record(extraction_timestamp, blog))

    schema = spark_schema_from_model(BlogDetails)
    df = spark.createDataFrame(records, schema=schema)
    table_name = "aymenfurter_blogs"

    df.write.mode("overwrite").saveAsTable(table_name)

    success_count = df.filter(F.col("extraction_status") == "success").count()
    total = len(to_process)
    print("Ingestion Complete")
    print(f"Overwrote table {table_name}")
    print(f"Total processed {total} Success {success_count}")