# Agricultural Aid Monitoring Workflow - Modular Version

This notebook demonstrates the complete agricultural monitoring workflow using the modularized package structure.

## Overview
The workflow has been modularized into:
- **`agricultural_monitoring.config`** - Configuration and settings
- **`agricultural_monitoring.models`** - Data models and schemas
- **`agricultural_monitoring.extractors`** - Web content extraction
- **`agricultural_monitoring.processors`** - LLM processing and filtering
- **`agricultural_monitoring.workflows`** - Complete monitoring workflows
- **`agricultural_monitoring.monitoring`** - LangSmith observability

## 1. Setup and Imports

In [1]:
# Core imports
import sys
from pathlib import Path

# Add project root to path
project_root = Path().resolve()
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

from agricultural_monitoring.config.settings import TARGET_URLS, setup_langfuse
from agricultural_monitoring.monitoring import run_monitored_workflow, LangfuseAnalyzer

# Setup environment
langfuse_enabled = setup_langfuse()

print("✅ Modular agricultural monitoring package loaded")
print(f"✅ Langfuse tracing: {'enabled' if langfuse_enabled else 'disabled'}")
print(f"✅ Target URLs: {len(TARGET_URLS)} configured")

# Display target URLs
for i, url in enumerate(TARGET_URLS, 1):
    print(f"   {i}. {url}")

* 'schema_extra' has been renamed to 'json_schema_extra'


✅ Modular agricultural monitoring package loaded
✅ Langfuse tracing: enabled
✅ Target URLs: 3 configured
   1. https://agriculture.gouv.fr/mots-cles/aides
   2. https://ain-rhone.msa.fr/lfp/soutien-exploitant
   3. https://www.franceagrimer.fr/rechercher-une-aide


In [2]:
precision_prompt = """
You are an expert evaluator. You will be given two lists of articles. Each article is represented by a title (and optionally, a short description). Your task is to compare the first list to the second list, and determine the number of articles in list 1 that actually appear in list 2.

Rules:
	1.	Ignore differences in punctuation or minor whitespace.
	2.	Do not count duplicates multiple times — each article should be counted at most once.
	3.	Return only the number of matching articles as an integer.
    4.  source_url has to be exact match, the rest of atributes have to be close but not exactly the same.

    Score is an integer greater or equal to 0. It represents the number of articles in list 1 that actually appear in list 2 (PRECISION).
    Give one sentence reasoning to explain the score.

    Input Format:
    {
        "list_1": [
            {
                "title": <string>,
                "description": <string>,
                "source_url": <string>
            },
            {
                "title": <string>,
                "description": <string>,
                "source_url": <string>
            }
        ]
    }
    {
        "list_2": [
            {
                "title": <string>,
                "description": <string>,
                "source_url": <string>
            },
            {
                "title": <string>,
                "description": <string>,
                "source_url": <string>
            }
        ]
    }
    Return the following format:
    {
        'score': '<PRECISION>',
        'explanation': '<brief_explanation_of_how_the_score_was_determined>'
    }
"""

recall_prompt = """
You are an expert evaluator. You will be given two lists of articles. Each article is represented by a title (and optionally, a short description). Your task is to compare the first list to the second list, and determine how many articles from list 2 don't appear in list 1.

Rules:
	1.	Ignore differences in punctuation or minor whitespace.
	2.	Do not count duplicates multiple times — each article should be counted at most once.
	3.	Return only the number of matching articles as an integer.
    4.  source_url has to be exact match, the rest of atributes have to be close but not exactly the same.

    Score is an integer greater or equal to 0. It represents the number of articles articles from list 2 don't appear in list 1 (RECALL).
    Give one sentence reasoning to explain the score.

    Input Format:
    {
        "list_1": [
            {
                "title": <string>,
                "description": <string>,
                "source_url": <string>
            },
            {
                "title": <string>,
                "description": <string>,
                "source_url": <string>
            }
        ]
    }
    {
        "list_2": [
            {
                "title": <string>,
                "description": <string>,
                "source_url": <string>
            },
            {
                "title": <string>,
                "description": <string>,
                "source_url": <string>
            }
        ]
    }
    Return the following format:
    {
        'score': '<RECALL>',
        'explanation': '<brief_explanation_of_how_the_score_was_determined>'
    }
"""

In [3]:
from typing import Type
from pydantic import BaseModel

def pydantic_to_json_schema(model: Type[BaseModel], schema_name: str = "schema", strict: bool = True) -> dict:
  json_schema = model.model_json_schema()
  no_ref_json_schema = resolve_refs(json_schema)
  formatted_json_schema = {
    "name": schema_name, 
    "strict": strict, 
    "schema": {
      "type": "object", 
      "properties": no_ref_json_schema['properties'],
      "required": no_ref_json_schema.get("required", [])
    }
  }
  
  return formatted_json_schema


def resolve_refs(schema: dict) -> dict:
  """
  By default, pydantic creates $refs and $defs as constants outside of the main schema.
  Those $ attributes serve as reference in the main schema : write {'properties': {$def: $ref}}.

  This format is supported for json validation, but not for structured output APIs.
  This function replaces the $ref by the schema defined by $def outside the main schema.
  """
  defs = schema.pop("$defs", {})
  def resolve(obj: dict) -> dict:
      if isinstance(obj, dict):
          if "$ref" in obj:
              ref = obj["$ref"].split("/")[-1]
              return resolve(defs[ref])
          return {k: resolve(v) for k, v in obj.items()}
      elif isinstance(obj, list):
          return [resolve(v) for v in obj]
      else:
          return obj
  return resolve(schema)

from dotenv import load_dotenv
import os
from typing import Type, Dict, Any

class ChatAlbert():

  def __init__(self, pydantic_schema: Type[BaseModel]) -> None:
    load_dotenv()
    self.api_key = os.getenv("ALBERT_API_KEY")
    self.endpoint = "https://albert.api.etalab.gouv.fr/v1/chat/completions"
    self.json_schema = pydantic_to_json_schema(pydantic_schema)

  def get_header(self):
      headers = {
          "accept": "application/json",
          "Authorization": f"Bearer {self.api_key}",
          "Content-Type": "application/json",
      }
      return headers

  def get_body(self, model_name, system_prompt, user_message, temperature: float, **kwargs) -> dict:
      payload = {
          "model": model_name,
          "messages": [
              {
                  "role": "system",
                  "content": system_prompt
              },
              {
                  "role": "user",
                  "content": user_message
              }
          ],
          "temperature": temperature,
          "response_format": {
              "type": "json_schema",
              "json_schema": self.json_schema
          },
          **kwargs
      }

      return payload

  def completions(self, model_name: str, system_prompt: str, user_message: str, temperature: float = 0.2, **kwargs) -> str:
      import requests
      headers = self.get_header()
      body = self.get_body(model_name, system_prompt, user_message, temperature, **kwargs)

      response = requests.post(self.endpoint, headers=headers, json=body)
      response.raise_for_status()
      json_response = response.json()
      return json_response["choices"][0]["message"]["content"]

In [4]:
from dotenv import load_dotenv
import json
import inspect

from langfuse import get_client
from langchain_openai import ChatOpenAI
from langfuse import Evaluation
from pydantic import BaseModel, Field

from agricultural_monitoring import HybridWorkflow, AgentWorkflow, LLMWorkflow

class MetricResult(BaseModel):
    explanation: str = Field(..., description="Explanation of the metric result")
    score: float = Field(..., description="Score of the metric result")

load_dotenv()
_langfuse_client = get_client()

def list_articles_llm(*, item, **kwargs):
    url = item.input["url"]
    workflow = LLMWorkflow()
    # Wrap sync function in an awaitable for uniformity
    result = workflow.monitor_url(url)
    return result.get("normalized_data", {})

async def list_articles_hybrid(*, item, **kwargs):
    url = item.input["url"]
    workflow = HybridWorkflow()
    # Wrap sync function in an awaitable for uniformity
    result = await workflow.monitor_url(url)
    return result.get("normalized_data", {})

async def list_articles_agent(*, item, **kwargs):
    url = item.input["url"]
    workflow = AgentWorkflow()
    # Wrap sync function in an awaitable for uniformity
    result = await workflow.monitor_url(url)
    return result.get("llm_extraction", {})

#list_tasks = (list_articles_llm, list_articles_hybrid, list_articles_agent)

def make_validate_filtered_task(workflow_cls):
    async def task(*, item, **kwargs):
        url = item.input["url"]
        workflow = workflow_cls()
        # Determine if monitor_url is async
        if inspect.iscoroutinefunction(workflow.monitor_url):
            result = await workflow.monitor_url(url)
        else:
            # Wrap sync function in an awaitable for uniformity
            result = workflow.monitor_url(url)
        found = result.get("memory_filtered_data", {})
        filtered_articles = found.get("aides", [])
        return {"output": filtered_articles}
    task.__name__ = f"validate_filtered_articles_{workflow_cls.__name__}"
    return task

def make_validate_filtered_llm(*, item, **kwargs):
    url = item.input["url"]
    workflow = LLMWorkflow()
    # Wrap sync function in an awaitable for uniformity
    result = workflow.monitor_url(url)
    found = result.get("memory_filtered_data", {})
    filtered_articles = found.get("aides", [])
    return {"output": filtered_articles}

async def make_validate_filtered_hybrid(*, item, **kwargs):
    url = item.input["url"]
    workflow = HybridWorkflow()
    # Wrap sync function in an awaitable for uniformity
    result = await workflow.monitor_url(url)
    found = result.get("memory_filtered_data", {})
    filtered_articles = found.get("aides", [])
    return {"output": filtered_articles}

async def make_validate_filtered_agent(*, item, **kwargs):
    url = item.input["url"]
    workflow = AgentWorkflow()
    # Wrap sync function in an awaitable for uniformity
    result = await workflow.monitor_url(url)
    found = result.get("memory_filtered_data", {})
    filtered_articles = found.get("aides", [])
    return {"output": filtered_articles}

list_tasks = (make_validate_filtered_llm, make_validate_filtered_hybrid, make_validate_filtered_agent)

def recall_evaluator(*, input, output, expected_output, **kwargs):
    chat_albert = ChatAlbert(MetricResult)
    chat_response = chat_albert.completions(
        model_name="albert-large",
        system_prompt=recall_prompt,
        user_message=json.dumps(output) + "\n" + json.dumps(expected_output),
        temperature=0
    )
    try:
        json_result = json.loads(chat_response)
        recall = 1 - (json_result["score"] / len(expected_output["articles_trouves"])) if len(expected_output["articles_trouves"]) > 0 else 0
    except json.JSONDecodeError:
        json_result = {"explanation": "oui oui baguette", "score": 0}
    return Evaluation(name="recall", comment=json_result["explanation"], value=recall)

def precision_evaluator(*, input, output, expected_output, **kwargs):
    chat_albert = ChatAlbert(MetricResult)
    chat_response = chat_albert.completions(
        model_name="albert-large",
        system_prompt=precision_prompt,
        user_message=json.dumps(output) + "\n" + json.dumps(expected_output),
        temperature=0
    )
    try:
        json_result = json.loads(chat_response)
        precision = json_result["score"] / len(output) if len(output) > 0 else 0
    except json.JSONDecodeError:
        json_result = {"explanation": "oui oui baguette", "score": 0}
    return Evaluation(name="precision", comment=json_result["explanation"], value=precision)

In [None]:
# Run 1
dataset = _langfuse_client.get_dataset("filtered-article-dataset")
for task in list_tasks:
    print(f"Running experiment for workflow: {task.__name__}")
    result = dataset.run_experiment(
        name="List article experiment with " + task.__name__,
        description=f"Testing {task.__name__} for article listing task",
        task=task,
        evaluators=[recall_evaluator, precision_evaluator],
        max_concurrency=5
    )

print(result.format())

Running experiment for workflow: make_validate_filtered_llm
🔍 Starting monitoring workflow for: https://www.bretagne.bzh/aides/?mot-clef=&profil=entreprises-et-professionnels&cloture=0&showall=0
📥 Fetching content from: https://www.bretagne.bzh/aides/?mot-clef=&profil=entreprises-et-professionnels&cloture=0&showall=0 (attempt 1)
🔍 Starting monitoring workflow for: https://www.morbihan.fr/aides-et-services/rechercher-une-aide
📥 Fetching content from: https://www.morbihan.fr/aides-et-services/rechercher-une-aide (attempt 1)
🔍 Starting monitoring workflow for: https://www.auvergnerhonealpes.fr/aides?f%5B0%5D=profil%3A3
📥 Fetching content from: https://www.auvergnerhonealpes.fr/aides?f%5B0%5D=profil%3A3 (attempt 1)
🔍 Starting monitoring workflow for: https://ain-rhone.msa.fr/lfp/soutien-exploitant
📥 Fetching content from: https://ain-rhone.msa.fr/lfp/soutien-exploitant (attempt 1)
🔍 Starting monitoring workflow for: https://agriculture.gouv.fr/mots-cles/aides
📥 Fetching content from: https

In [None]:
from agricultural_monitoring.workflows.agent_workflow import WebAgentStandaloneProcessor

web_agent_processor = WebAgentStandaloneProcessor()
result = await web_agent_processor.process_content("https://www.franceagrimer.fr/rechercher-une-aide")

In [None]:
result["aides_identifiees"]

In [None]:
from agricultural_monitoring.workflows.agent_workflow import DataNormalizer

data_norm = DataNormalizer()
new_result = data_norm.normalize_data(result, "https://www.franceagrimer.fr/rechercher-une-aide")

In [None]:
new_result

## 2. Quick Start - Complete Workflow

In [None]:
# Create enhanced workflow with inline link processing
workflow = EnhancedAgriculturalMonitoringWorkflow()

## 3. Test Single URL

In [None]:
# Test with one URL first
test_url = TARGET_URLS[2]
print(f"🧪 Testing workflow with: {test_url}")

result = await workflow.monitor_url(test_url)

print(f"\n📊 Results:")
print(f"Web extraction: {result['web_content']['status']}")
print(f"LLM processing: {result['llm_extraction']['status']}")
print(f"Data normalization: {result['normalized_data']['metadata']['status']}")
print(f"Memory filtering: {result.get('memory_filtered_data', {}).get('status', 'N/A')}")

# Show found aids
memory_data = result.get('memory_filtered_data', {})
if memory_data.get('status') == 'success':
    aids_found = memory_data.get('aides', [])
    before_filtering = len(result['normalized_data']['aides'])
    after_filtering = len(aids_found)
    
    print(f"\n🔍 Filtering results:")
    print(f"   Before filtering: {before_filtering} aids")
    print(f"   After filtering: {after_filtering} aids")
    print(f"   Filtered out: {memory_data.get('filtered_count', 0)} aids")
    
    if aids_found:
        print(f"\n✅ New agricultural aids found:")
        for i, aide in enumerate(aids_found[:3], 1):
            print(f"   {i}. {aide['titre_aide']}")
            print(f"      {aide['description'][:100]}...")
    else:
        print(f"\n📝 No new aids found (all filtered by memory)")
else:
    print(f"\n❌ Workflow failed at some stage")

In [None]:
for item in dataset.items:
    print(item.input, item.expected_output)

In [None]:
for item in dataset.items:
    print(item.input, item.expected_output)

## 4. Production Monitoring - All URLs

In [None]:
# Run production monitoring with all URLs
print("🚀 Running production monitoring with all target URLs...")
print("=" * 60)

# Use the monitoring function for comprehensive tracking
results = run_monitored_workflow(workflow, TARGET_URLS)

# Detailed results analysis
successful_results = results['successful_results']
failed_results = results['failed_results']
summary = results['summary']

print(f"\n📋 DETAILED RESULTS:")
for i, result in enumerate(successful_results + failed_results, 1):
    url = result.get('web_content', {}).get('url', 'Unknown')
    status = result.get('normalized_data', {}).get('metadata', {}).get('status', 'Unknown')
    
    print(f"\n{i}. {url}")
    print(f"   Status: {status}")
    
    if status == 'success':
        aids_before = len(result['normalized_data']['aides'])
        memory_data = result.get('memory_filtered_data', {})
        aids_after = memory_data.get('new_count', aids_before) if memory_data.get('status') == 'success' else aids_before
        
        print(f"   Web extraction: ✅")
        print(f"   Content length: {len(result['web_content']['content']):,} chars")
        print(f"   Links found: {result['web_content'].get('links_count', 0)}")
        print(f"   Aids before filtering: {aids_before}")
        print(f"   Aids after filtering: {aids_after}")
        
        # Show sample aids
        final_aids = memory_data.get('aides', result['normalized_data']['aides']) if memory_data.get('status') == 'success' else result['normalized_data']['aides']
        if final_aids:
            print(f"   Sample aids:")
            for aide in final_aids[:2]:
                print(f"     • {aide['titre_aide']}")
    else:
        error = result.get('normalized_data', {}).get('metadata', {}).get('error', 'Unknown error')
        print(f"   Error: {error}")

In [None]:
from agricultural_monitoring.extractors import EnhancedWebContentExtractor
from agricultural_monitoring.processors import LLMProcessor, WebAgentProcessor
from agricultural_monitoring.processors import DataNormalizer

URL = "https://www.bretagne.bzh/aides/?mot-clef=&profil=entreprises-et-professionnels&cloture=0&showall=0"

web_extractor = EnhancedWebContentExtractor()
llm_processor = LLMProcessor()
web_agent_processor = WebAgentProcessor()
normalizer = DataNormalizer()

# Step 2: Process the extracted content
processed_content = await web_agent_processor.process_content(URL)

# Step 3: Normalize the processed content
normalized_content = normalizer.normalize_data(processed_content, URL)

print(processed_content)

In [None]:
import json

with open("test.json", "w") as f:
    json.dump(normalized_content, f, indent=2, ensure_ascii=False)

In [None]:
processed_content

In [None]:
from selenium import webdriver

# 1. Take screenshot
driver = webdriver.Chrome()
driver.get("https://www.franceagrimer.fr/rechercher-une-aide")
driver.save_screenshot("page.png")
driver.quit()