# Airflow DAG Generation: Evaluation

This notebook evaluates the quality of Airflow DAGs generated by different models, comparing a baseline model (Qwen 2.5 1.5B Instruct) against a fine-tuned version.

## üìã Setup Note

**This notebook is designed to run locally** (not in Colab). To set it up:

```bash
# From project root
pip install -e ".[research]"
pip install jupyter

# Launch Jupyter and select the venv kernel
jupyter notebook
```

Make sure you're using the Python kernel from your virtual environment to ensure all imports work correctly.

---

## üìä Performance Summary (Fine-tuned vs Base)

Our evaluation reveals significant improvements across multiple dimensions:

### Key Improvements
- **Syntax Validity**: ~8% reduction in syntactically invalid DAGs
- **Modern Patterns**: Strong adoption of latest Airflow syntax and operators (TaskFlow API, modern decorators)
- **Reduced Hallucinations**: Significantly fewer instances of invented imports or non-existent operators
- **Error Distribution Alignment**: The fine-tuned model's error patterns closely match real-world DAG distributions

### Notable Observations
- **Base Model**: Often generates deprecated patterns (e.g., legacy operators from Airflow 1.x)
- **Fine-tuned Model**: Occasionally hallucinates internal testing libraries seen in training data, but far less than general hallucinations in the base model
- **Syntax Accuracy**: Fine-tuned model shows consistent adherence to Python and Airflow syntax rules

---

## üî¨ Evaluation Methodology

We employ a two-pronged evaluation approach to assess both structural correctness and semantic quality:

### 1. Parser-Based Evaluation (Structural Analysis)

Uses a custom AST-based validator (`DAGValidator`) to check:
- **Syntax Correctness**: Valid Python syntax, no parse errors
- **Task ID Validation**: Unique task IDs, proper naming conventions (alphanumeric, dashes, dots, underscores)
- **Dependency Analysis**: Detection of circular dependencies in task graphs
- **DAG Structure**: Proper DAG instantiation, task definitions, and relationships

**Advantages**: Fast, deterministic, catches critical structural errors that would prevent DAG execution.

### 2. LLM-Based Evaluation (Semantic Analysis)

Uses Claude 4.5 Sonnet via the Batch API to evaluate:
- **Correctness** (1-5): Does the generated DAG logically implement the user's request?
- **Completeness** (1-5): Are all necessary imports, arguments, and task dependencies present?
- **Best Practices** (1-5): Does it follow Airflow conventions and modern patterns?

**Advantages**: Captures semantic quality, intent alignment, and code quality aspects that structural analysis misses.

**Note**: LLM evaluation requires an Anthropic API key and incurs costs. Results are saved for reproducibility.

## 1. Setup and Configuration

In [65]:
import json
import glob
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from pathlib import Path
from typing import List, Dict, Any
import re

# hf
from datasets import load_dataset

# Import from installed packages
from research.lib.batch_processor import ClaudeBatchProcessor
from airflow_net.validation import DAGValidator

# Set visualization style
sns.set_theme(style="whitegrid", context="notebook", palette="viridis")

In [48]:
# API Key Configuration
import os

ANTHROPIC_API_KEY = os.environ.get('ANTHROPIC_API_KEY')

if not ANTHROPIC_API_KEY:
    print("WARNING: No ANTHROPIC_API_KEY found in environment. LLM evaluation steps will be skipped.")
    print("To enable LLM evaluation, set the ANTHROPIC_API_KEY environment variable.")

## 2. Load Data
We load the generated DAGs from the JSONL artifacts produced by the inference step and the test dataset containing the ground truth DAG files.

In [49]:
# Define paths to artifacts (relative to notebook location)
ARTIFACTS_DIR = Path("../../artifacts/finetuning/01_inference_results").resolve()

print(f"Looking for artifacts in: {ARTIFACTS_DIR}")

# Find latest inference files
base_files = list(ARTIFACTS_DIR.glob("base_model_outputs*.jsonl"))
finetuned_files = list(ARTIFACTS_DIR.glob("finetuned_model_outputs*.jsonl"))

if not base_files or not finetuned_files:
    print("WARNING: Could not find one or both inference result files.")
    print(f"Available files: {list(ARTIFACTS_DIR.glob('*.jsonl'))}")
    BASE_MODEL_FILE = None
    FINETUNED_MODEL_FILE = None
else:
    # Take the most recent one
    BASE_MODEL_FILE = sorted(base_files)[-1]
    FINETUNED_MODEL_FILE = sorted(finetuned_files)[-1]
    print(f"Selected Baseline: {BASE_MODEL_FILE.name}")
    print(f"Selected Fine-tuned: {FINETUNED_MODEL_FILE.name}")

Looking for artifacts in: /Users/andreatamburri/Desktop/airflowNet/research/artifacts/finetuning/01_inference_results
Selected Baseline: base_model_outputs_20251217_151724.jsonl
Selected Fine-tuned: finetuned_model_outputs_20251217_151724.jsonl


In [52]:
def load_jsonl(file_path):
    """Load JSONL file and extract code from messages format"""
    if not file_path or not file_path.exists():
        return []
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            if line.strip():
                entry = json.loads(line)
                # Extract assistant content from messages
                messages = entry.get('messages', [])
                user_content = ''
                assistant_content = ''
                
                for msg in messages:
                    if msg['role'] == 'user':
                        user_content = msg['content']
                    elif msg['role'] == 'assistant':
                        assistant_content = msg['content']
                    if msg['role'] == 'system':
                        system_content = msg['content']
                
                data.append({
                    'system': system_content,
                    'prompt': user_content,
                    'code': assistant_content,
                    'metadata': entry.get('metadata', {})
                })
    return data

baseline_data = load_jsonl(BASE_MODEL_FILE)
finetuned_data = load_jsonl(FINETUNED_MODEL_FILE)

print(f"Loaded {len(baseline_data)} baseline samples.")
print(f"Loaded {len(finetuned_data)} fine-tuned samples.")

Loaded 412 baseline samples.
Loaded 412 fine-tuned samples.


In [56]:
def parse_hf_ds(dataset):
    """Convert HuggingFace dataset to the same structure as load_jsonl function"""
    data = []

    for entry in dataset:
        messages = entry.get('messages', [])
        system_content = ''
        user_content = ''
        assistant_content = ''

        for msg in messages:
            if msg['role'] == 'system':
                system_content = msg['content']
            elif msg['role'] == 'user':
                user_content = msg['content']
            elif msg['role'] == 'assistant':
                assistant_content = msg['content']

        data.append({
            'system': system_content,
            'prompt': user_content,
            'code': assistant_content,
            'metadata': entry.get('metadata', {})
        })

    return data

dataset = load_dataset(
    "andrea-t94/airflow-dag-dataset",
    split="test",
    download_mode="reuse_cache_if_exists"  # Use cached version if available
)

ground_data = parse_hf_ds(dataset)
print(f"Loaded {len(ground_data)} ground truth samples.")

Generating train split:   0%|          | 0/7414 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/412 [00:00<?, ? examples/s]

Generating eval split:   0%|          | 0/412 [00:00<?, ? examples/s]

Loaded 412 ground truth samples.


In [57]:
def evaluate_parser_results(data, model_name):
    """Evaluate DAG code using the DAGValidator"""
    results = []
    validator = DAGValidator()
    
    for entry in data:
        code = entry.get('code', '')
        
        # Use DAGValidator.validate_content which returns List[ValidationError]
        errors = validator.validate_content(code, source_name="<generated>")
        
        # Extract error information
        is_valid = len(errors) == 0
        error_messages = [str(e) for e in errors]
        error_types = [e.error_type for e in errors]
        
        results.append({
            'model': model_name,
            'is_valid': is_valid,
            'error_count': len(errors),
            'errors': '; '.join(error_messages) if error_messages else '',
            'has_syntax_error': any('SYNTAX_ERROR' in et or 'PARSE_ERROR' in et for et in error_types),
            'has_duplicate_task': any('DUPLICATE_TASK_ID' in et for et in error_types),
            'has_invalid_task_id': any('INVALID_TASK_ID' in et for et in error_types),
            'has_circular_dependency': any('CIRCULAR_DEPENDENCY' in et for et in error_types)
        })
    return pd.DataFrame(results)

df_ground = evaluate_parser_results(ground_data, 'Ground')
df_base = evaluate_parser_results(baseline_data, 'Baseline')
df_fine = evaluate_parser_results(finetuned_data, 'Fine-tuned')
df_parser = pd.concat([df_ground, df_base, df_fine], ignore_index=True)

# Display summary statistics
summary = df_parser.groupby('model').agg(
    valid_rate=('is_valid', 'mean'),
    syntax_errors=('has_syntax_error', 'mean'),
    duplicate_tasks=('has_duplicate_task', 'mean'),
    invalid_task_ids=('has_invalid_task_id', 'mean'),
    circular_deps=('has_circular_dependency', 'mean')
) * 100

print("Parser Evaluation Results (%):")
display(summary.round(2))


Parser Evaluation Results (%):


Unnamed: 0_level_0,valid_rate,syntax_errors,duplicate_tasks,invalid_task_ids,circular_deps
model,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
Baseline,95.87,3.16,0.0,0.97,0.0
Fine-tuned,71.12,27.91,0.49,0.0,0.73
Ground,79.61,16.99,0.73,0.0,0.24


## 4. LLM-Based Evaluation with Claude

To complement the parser-based structural analysis, we employ Claude Sonnet 4.5 as an LLM judge to evaluate semantic quality across three critical dimensions:

### Evaluation Metrics

**1. Idiomatic Airflow Usage (0/1 Binary Score)**
- **Pass (1):** Code uses provider-specific operators designed for the task (e.g., `SnowflakeOperator`, `S3Operator`)
- **Fail (0):** Code wraps logic in generic `PythonOperator` with hooks instead of using native operators
- **Rationale:** Idiomatic Airflow leverages the rich ecosystem of 100+ provider packages with purpose-built operators

**2. No Hallucination/Leakage (0/1 Binary Score)**
- **Pass (1):** Clean, production-ready code using only standard Airflow libraries
- **Fail (0):** Code imports internal testing modules (e.g., `from tests_common.test_utils.system_tests`) or fabricates non-existent APIs
- **Rationale:** Model hallucinations indicate training data contamination or knowledge gaps that would cause runtime failures

**3. Instruction Adherence (0/1 Binary Score)**
- **Pass (1):** Generated DAG fully implements the requested business logic
- **Fail (0):** Missing key requirements from the user instruction
- **Rationale:** Measures the model's ability to follow specifications accurately

### Key Findings

The evaluation reveals a nuanced performance profile for the fine-tuned model:

**Strengths:**
- **Airflow Idioms:** Fine-tuned model significantly outperforms the baseline in using provider-specific operators (43% vs 11% pass rate)
- **Syntax Knowledge:** Demonstrates strong understanding of modern Airflow 3.x API patterns

**Weaknesses - Root Cause Analysis:**

1. **Instruction Following (8% pass rate vs 15% baseline):** Training dataset exhibits poor diversity in numerical parameters‚Äî85.7% of examples use "20" as a dummy value across various contexts (e.g., "insert 20 records", "20 retries", "20 seconds"). The model overfitted to this pattern and fails to generalize to different numerical specifications.

2. **Hallucination Rate (6% pass rate vs 24% baseline):** Two primary causes:
   - **Data Leakage:** Training set included internal Airflow test files that import `tests_common.test_utils.system_tests` utilities. Model reproduces these non-production imports.
   - **Incomplete Knowledge:** Limited training examples for niche libraries (e.g., `airflow.timetables`, `airflow.example_dags.plugins`) cause the model to fabricate API interactions rather than admit uncertainty.

The subsequent cells provide concrete examples demonstrating each failure mode with side-by-side comparisons between ground truth, baseline, and fine-tuned outputs.

In [None]:
EVAL_PROMPT_TEMPLATE = """
You are an expert Senior Apache Airflow Architect. Evaluate the following Airflow DAG code generated by an AI model based on a user instruction.

### Scoring Criteria & Examples

**1. Idiomatic Airflow (Score 0 or 1)**
* **Score 1 (Pass):** Uses specific Providers and Operators designed for the task.
    * *Example:* `from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator`
* **Score 0 (Fail):** Relies on generic "Pythonic" patterns where it wraps logic in a `PythonOperator` + Hook instead of using the native Operator.
    * *Example:* `def load(): hook = SnowflakeHook(...) \n PythonOperator(python_callable=load ...)`

**2. No Hallucination/Leakage (Score 0 or 1)**
* **Score 1 (Pass):** Code is clean, production-ready, and uses only standard Airflow libraries.
* **Score 0 (Fail):** Code imports internal testing modules or includes test harness boilerplate.
    * *Example:* `from tests_common.test_utils.system_tests import get_test_run`
    * *Example:* `test_run = get_test_run(dag)`

**3. Instruction Adherence (Score 0 or 1)**
* **Score 1 (Pass):** Fulfills the specific business logic requested (e.g., "load data AND validate").
* **Score 0 (Fail):** Misses a key step of the instruction.

---

### Task
USER INSTRUCTION:
{instruction}

DAG CODE:
```python
{dag_content}
```


Evaluate the code based on the criteria above. Return valid JSON only.

{{{{
  "idiomatic_airflow": {{{{ "score": 0, "reasoning": "..." }}}}
  "no_hallucination": {{{{ "score": 0, "reasoning": "..." }}}},
  "instruction_adherence": {{{{ "score": 0, "reasoning": "..." }}}}
}}}}
"""

def prepare_llm_batch_requests(dags: List[Dict],
                                model_name: str, 
                                prompt_template: str = EVAL_PROMPT_TEMPLATE) -> List[Dict]:
    """Prepare batch requests for Claude LLM evaluation.
    
    Includes all DAGs with DAG generation requests (even if they failed parser validation).
    """
    batch_requests = []
    
    print(f"Preparing requests for all {len(dags)} DAGs...")
    
    skipped_non_dag = 0
    
    for idx, dag_record in enumerate(dags):
        
        system_message = dag_record.get('system', '')
        # Only include DAG generation requests (filter out other types)
        if not system_message.startswith("You are an expert Apache Airflow developer"):
            skipped_non_dag += 1
            continue
        
        # Create prompt
        prompt = prompt_template.format(
            dag_content=dag_record.get('code', ''), 
            instruction=dag_record.get('prompt', '')
        )
        
        # Create batch request
        request = {
            "custom_id": f"{model_name}_{idx}",
            "params": {
                "model": "claude-sonnet-4-5-20250929",  # Claude Sonnet 4.5
                "max_tokens": 2000,
                "temperature": 0.0,
                "messages": [{"role": "user", "content": prompt}]
            }
        }
        
        batch_requests.append(request)
    if skipped_non_dag > 0:
        print(f"Skipped {skipped_non_dag} non-DAG generation requests")
    
    return batch_requests

print("Preparing LLM evaluation batch requests (including DAGs that failed parser validation)...")


if ANTHROPIC_API_KEY:
    processor = ClaudeBatchProcessor(api_key=ANTHROPIC_API_KEY)
    
    # Prepare batches for full datasets
    print(f"Preparing evaluation batches...")
    ground_batch_requests = prepare_llm_batch_requests(
    baseline_data, 
    "ground"
    )
    baseline_batch_requests = prepare_llm_batch_requests(
    baseline_data, 
    "baseline"
    )
    finetuned_batch_requests = prepare_llm_batch_requests(
        finetuned_data,
        "finetuned"
    )
    all_batch_requests = ground_batch_requests + baseline_batch_requests + finetuned_batch_requests
    
    print(f"‚úì Prepared {len(all_batch_requests)} evaluation requests ({len(ground_batch_requests)} ground + {len(baseline_batch_requests)} baseline + {len(finetuned_batch_requests)} fine-tuned)")
else:
    print("‚ö†Ô∏è Skipping LLM evaluation setup:")
    if not ANTHROPIC_API_KEY:
        print("  - No ANTHROPIC_API_KEY found in environment")

Preparing LLM evaluation batch requests (including DAGs that failed parser validation)...
Preparing evaluation batches...
Preparing requests for all 412 DAGs...
Skipped 70 non-DAG generation requests
Preparing requests for all 412 DAGs...
Skipped 70 non-DAG generation requests
Preparing requests for all 412 DAGs...
Skipped 70 non-DAG generation requests
‚úì Prepared 1026 evaluation requests (342 ground + 342 baseline + 342 fine-tuned)


In [None]:
# Execute Batch Processing and Parse Results
print("üöÄ Submitting batch request to Claude API...")
print(f"   This will evaluate {len(all_batch_requests)} DAGs using Claude Sonnet 4")
print(f"   Estimated cost: ~${len(all_batch_requests) * 0.015:.2f} (at $15/1M input tokens)")
print()

# Submit batch
batch_id = processor.submit_batch(all_batch_requests)
print(f"‚úì Batch submitted: {batch_id}")
print()

2025-12-30 16:44:41,775 - INFO - üöÄ Submitting batch with 824 requests...


üöÄ Submitting batch request to Claude API...
   This will evaluate 824 DAGs using Claude Sonnet 4
   Estimated cost: ~$12.36 (at $15/1M input tokens)



2025-12-30 16:44:44,569 - INFO - HTTP Request: POST https://api.anthropic.com/v1/messages/batches?beta=true "HTTP/1.1 200 OK"
2025-12-30 16:44:44,573 - INFO - ‚úÖ Batch submitted: msgbatch_01GuxFdkP6XrMjqjzxNwjCGb
2025-12-30 16:44:44,574 - INFO - ‚è≥ Waiting for batch msgbatch_01GuxFdkP6XrMjqjzxNwjCGb...
2025-12-30 16:44:44,753 - INFO - HTTP Request: GET https://api.anthropic.com/v1/messages/batches/msgbatch_01GuxFdkP6XrMjqjzxNwjCGb?beta=true "HTTP/1.1 200 OK"
2025-12-30 16:44:44,756 - INFO - üìä Status: in_progress (elapsed: 0.2s)
2025-12-30 16:44:44,757 - INFO -    Progress: 0/824 (0.0%)


‚úì Batch submitted: msgbatch_01GuxFdkP6XrMjqjzxNwjCGb

‚è≥ Waiting for batch to complete (this may take 10-30 minutes)...


2025-12-30 16:45:15,001 - INFO - HTTP Request: GET https://api.anthropic.com/v1/messages/batches/msgbatch_01GuxFdkP6XrMjqjzxNwjCGb?beta=true "HTTP/1.1 200 OK"
2025-12-30 16:45:15,005 - INFO - üìä Status: in_progress (elapsed: 30.4s)
2025-12-30 16:45:15,006 - INFO -    Progress: 0/824 (0.0%)
2025-12-30 16:45:45,259 - INFO - HTTP Request: GET https://api.anthropic.com/v1/messages/batches/msgbatch_01GuxFdkP6XrMjqjzxNwjCGb?beta=true "HTTP/1.1 200 OK"
2025-12-30 16:45:45,263 - INFO - üìä Status: in_progress (elapsed: 60.7s)
2025-12-30 16:45:45,263 - INFO -    Progress: 0/824 (0.0%)
2025-12-30 16:46:15,515 - INFO - HTTP Request: GET https://api.anthropic.com/v1/messages/batches/msgbatch_01GuxFdkP6XrMjqjzxNwjCGb?beta=true "HTTP/1.1 200 OK"
2025-12-30 16:46:15,518 - INFO - üìä Status: in_progress (elapsed: 90.9s)
2025-12-30 16:46:15,519 - INFO -    Progress: 0/824 (0.0%)
2025-12-30 16:46:45,750 - INFO - HTTP Request: GET https://api.anthropic.com/v1/messages/batches/msgbatch_01GuxFdkP6XrMjq


üì• Downloading results...


2025-12-30 16:47:46,823 - INFO - HTTP Request: GET https://api.anthropic.com/v1/messages/batches/msgbatch_01GuxFdkP6XrMjqjzxNwjCGb/results "HTTP/1.1 200 OK"
2025-12-30 16:47:47,475 - INFO - üì• Downloaded 824 items


‚úì Downloaded 824 results

üìä Parsing evaluation scores...
‚úì Parsed 489 evaluation scores
‚ö†Ô∏è 335 results had errors or couldn't be parsed

LLM Evaluation Summary (1-5 scale):


Unnamed: 0_level_0,correctness,completeness,best_practices
model,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Baseline,1.98,1.78,1.82
Fine-tuned,1.78,1.57,1.76


In [None]:
 # Wait for completion (can take 10-30 minutes for large batches)
print("‚è≥ Waiting for batch to complete (this may take 10-30 minutes)...")
batch = processor.wait_for_batch_completion(batch_id)
print()

# Download results
print("üì• Downloading results...")
results = processor.download_batch_results(batch_id)
print(f"‚úì Downloaded {len(results)} results")
print()

# Parse results into dataframe
print("üìä Parsing evaluation scores...")
llm_eval_results = []
parse_errors = 0

for result in results:
    if result['result']['type'] == 'succeeded':
        # Parse model name and dag_id from custom_id
        # Handle format: "baseline_123" or "finetuned_123"
        custom_id = result['custom_id']
        parts = custom_id.split('_')
        model_name = parts[0]  # 'ground', 'baseline' or 'finetuned'
        dag_id = int(parts[1])
        text = result['result']['message']['content'][0]['text']
        try:
            json_match = re.search(r'\{.*\}', text, re.DOTALL)
            if json_match:
                json_str = json_match.group(0)
                eval_data = json.loads(json_str)
            else:
                raise ValueError("No JSON object found in response")
            llm_eval_results.append({
                'model': model_name,
                'dag_id': dag_id,
                'idiomatic_airflow_score': eval_data['idiomatic_airflow']['score'],
                'idiomatic_airflow_reasoning': eval_data['idiomatic_airflow']['reasoning'],
                'no_hallucination_score': eval_data['no_hallucination']['score'],
                'no_hallucination_reasoning': eval_data['no_hallucination']['reasoning'],
                'instruction_adherence_score': eval_data['instruction_adherence']['score'],
                'instruction_adherence_reasoning': eval_data['instruction_adherence']['reasoning'],
            })
        except json.JSONDecodeError:
            parse_errors += 1
    elif result['result']['type'] == 'errored':
        parse_errors += 1


df_llm = pd.DataFrame(llm_eval_results)
print(f"‚úì Parsed {len(llm_eval_results)} evaluation scores")
if parse_errors > 0:
    print(f"‚ö†Ô∏è {parse_errors} results had errors or couldn't be parsed")

# Display summary statistics
summary = df_llm.groupby('model').agg({
    'idiomatic_airflow_score': 'mean',
    'no_hallucination_score': 'mean',
    'instruction_adherence_score': 'mean'
}).round(2)

print("\nLLM Evaluation Summary (1-5 scale):")
display(summary)
df_llm.to_csv('llm_eval.csv')

2025-12-31 11:08:27,422 - INFO - ‚è≥ Waiting for batch msgbatch_01Mue6HSzr5mR9g3XR1aEST9...


‚è≥ Waiting for batch to complete (this may take 10-30 minutes)...


2025-12-31 11:08:27,676 - INFO - HTTP Request: GET https://api.anthropic.com/v1/messages/batches/msgbatch_01Mue6HSzr5mR9g3XR1aEST9?beta=true "HTTP/1.1 200 OK"
2025-12-31 11:08:27,679 - INFO - üìä Status: ended (elapsed: 0.3s)
2025-12-31 11:08:27,679 - INFO -    Progress: 1026/1026 (100.0%)
2025-12-31 11:08:27,679 - INFO - ‚úÖ Batch completed with status: ended
2025-12-31 11:08:27,680 - INFO - ‚¨áÔ∏è Downloading results...
2025-12-31 11:08:27,876 - INFO - HTTP Request: GET https://api.anthropic.com/v1/messages/batches/msgbatch_01Mue6HSzr5mR9g3XR1aEST9?beta=true "HTTP/1.1 200 OK"



üì• Downloading results...


2025-12-31 11:08:28,160 - INFO - HTTP Request: GET https://api.anthropic.com/v1/messages/batches/msgbatch_01Mue6HSzr5mR9g3XR1aEST9/results "HTTP/1.1 200 OK"
2025-12-31 11:08:28,794 - INFO - üì• Downloaded 1026 items


‚úì Downloaded 1026 results

üìä Parsing evaluation scores...
‚úì Parsed 1025 evaluation scores
‚ö†Ô∏è 1 results had errors or couldn't be parsed

LLM Evaluation Summary (1-5 scale):


Unnamed: 0_level_0,idiomatic_airflow_score,no_hallucination_score,instruction_adherence_score
model,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
baseline,0.11,0.24,0.15
finetuned,0.43,0.06,0.08
ground,0.91,0.17,0.72


### Success Case: Idiomatic Airflow Syntax

The fine-tuned model demonstrates superior knowledge of Airflow's provider ecosystem and operator patterns. This section showcases examples where the model correctly uses native operators instead of generic Python wrappers.

**Example 1 (DAG ID 3): Campaign Manager Operations**
- **Fine-tuned:** Correctly uses `GoogleCampaignManagerBatchInsertConversionsOperator`, `GoogleCampaignManagerRunReportOperator` from the Google Marketing Platform provider
- **Baseline:** Falls back to `PythonOperator` wrapping `CampaignManagerHook` calls‚Äîa non-idiomatic pattern that requires more boilerplate code

**Example 2 (DAG ID 19): Compute Engine Instance Management**
- Illustrates a common failure mode where the fine-tuned model imports the correct operators (`ComputeEngineInsertInstanceOperator`) but still wraps logic in a `@task` decorated function with `ComputeEngineHook`
- This represents partial knowledge‚Äîthe model knows the right operators exist but hasn't fully learned when to apply them directly

In [105]:
df_llm[df_llm.dag_id==3][['model','idiomatic_airflow_score','idiomatic_airflow_reasoning']]

Unnamed: 0,model,idiomatic_airflow_score,idiomatic_airflow_reasoning
2,ground,1,"The code uses native Airflow operators specifically designed for Campaign Manager tasks. It imports and uses GoogleCampaignManagerBatchInsertConversionsOperator, GoogleCampaignManagerBatchUpdateConversionsOperator, GoogleCampaignManagerInsertReportOperator, GoogleCampaignManagerRunReportOperator, GoogleCampaignManagerDownloadReportOperator, GoogleCampaignManagerDeleteReportOperator, and GoogleCampaignManagerReportSensor from airflow.providers.google.marketing_platform. These are the idiomatic, provider-specific operators rather than wrapping logic in PythonOperator with hooks."
344,baseline,0,"The code uses PythonOperator with hooks instead of native Campaign Manager operators. Airflow 3.0.0 provides google.marketing_platform.operators.campaign_manager that should be used for interacting with Campaign Manager APIs. The code wraps hook calls in Python functions rather than using dedicated operators like CampaignManagerInsertReportOperator, CampaignManagerRunReportOperator, or CampaignManagerDownloadReportOperator."
686,finetuned,1,"The code uses native Airflow operators from the Google Marketing Platform provider (GoogleCampaignManagerBatchInsertOperator, GoogleCampaignManagerGetOperator, GoogleCampaignManagerUpdateOperator, GoogleCampaignManagerListOperator, GoogleCampaignManagerDeleteOperator) rather than wrapping logic in PythonOperators with hooks. This is the idiomatic Airflow approach."


In [106]:
df_llm[df_llm.dag_id==19][['model','idiomatic_airflow_score','idiomatic_airflow_reasoning']]

Unnamed: 0,model,idiomatic_airflow_score,idiomatic_airflow_reasoning
11,ground,1,"The code uses appropriate Airflow providers and operators for the tasks. It uses ComputeEngineInsertInstanceOperator and ComputeEngineDeleteInstanceOperator for GCE instance management, SQLExecuteQueryOperator for database operations, and SQLToGoogleSheetsOperator for data transfer to Google Sheets. It also uses SSHOperator for remote commands and GoogleSheetsCreateSpreadsheetOperator for spreadsheet creation. These are all idiomatic Airflow patterns using native operators rather than wrapping logic in PythonOperators with hooks."
353,baseline,0,"The code uses several non-existent or incorrect operators. 'PostgresCreateDatabaseOperator' and 'PostgresExecuteQueryOperator' do not exist in 'airflow.providers.google.cloud.operators.postgres' - PostgreSQL operators are in 'airflow.providers.postgres.operators.postgres' (e.g., PostgresOperator). 'GoogleSheetsUploadOperator' does not exist; the correct operator is 'GoogleSheetsCreateSpreadsheetOperator' or using 'GoogleSheetsHook' with appropriate operators. Additionally, GKE (Google Kubernetes Engine) cluster operators are being used instead of Compute Engine instance operators (ComputeEngineInsertInstanceOperator), which doesn't match the instruction to create a Compute Engine instance."
695,finetuned,0,"The DAG uses a PythonOperator (via @task decorator) with ComputeEngineHook to create a Compute Engine instance instead of using the native ComputeEngineInsertInstanceOperator or ComputeEngineInsertInstanceFromTemplateOperator that are already imported but never used. The create_instance task wraps hook logic in a Python function, which is the anti-pattern described in the scoring criteria. The proper idiomatic approach would be to use ComputeEngineInsertInstanceFromTemplateOperator directly."


### Failure Case 1: Dataset Overfitting and Poor Generalization

**Problem:** The fine-tuned model exhibits severe overfitting to numerical parameters in the training data, causing instruction adherence failures.

**Root Cause Analysis (DAG ID 0):**
- **User Request:** "Insert **12** product records and validate the data load"
- **Fine-tuned Output:** Inserts **20** records instead, completely ignoring the specified quantity
- **Training Data Pattern:** 85.7% of training examples (353 out of 412) contain the number "20" as a placeholder value across diverse contexts (row counts, retry limits, timeout seconds, etc.)

**Impact:**
- The model memorized "20" as the default numerical value rather than learning to extract task-specific parameters from instructions
- This catastrophic generalization failure results in only an 8% instruction adherence pass rate vs. 15% for the baseline

**Mitigation Strategies:**
1. **Data Augmentation:** Systematically vary numerical parameters during training to prevent anchoring bias
2. **Synthetic Data Generation:** Create examples with diverse numerical ranges (1-10, 50-100, 1000+)
3. **Few-Shot Prompting:** Include examples with different numbers in the system prompt at inference time

In [None]:
pd.set_option('display.max_colwidth', None)
df_llm[df_llm.dag_id==0][['model','instruction_adherence_score','instruction_adherence_reasoning']]

Unnamed: 0,model,instruction_adherence_score,instruction_adherence_reasoning
0,ground,1,The code fully adheres to the instruction. It creates a data pipeline that loads 12 product records into a Snowflake table (setup_data task) and validates the data load by verifying the row count matches the expected count of 12 (check_num_rows task). Both required steps are implemented correctly with proper task dependencies.
342,baseline,1,"The code fulfills the user instruction completely. It creates a data pipeline that: (1) loads 12 sample product records into a Snowflake table via the load_sample_products function, and (2) validates the data load by querying the row count and comparing it to the expected count of 12 in the validate_data_load function. The tasks are properly sequenced with load_task >> validate_task dependency."
684,finetuned,0,"The instruction requires two key components: (1) insert 12 product records, and (2) validate the data load by verifying the row count. The code inserts 20 records instead of 12, and completely lacks any validation task to verify the total number of rows matches the expected count. There is no SELECT COUNT(*) query or any mechanism to check that the data was loaded correctly."


In [100]:
train_ds = load_dataset(
    "andrea-t94/airflow-dag-dataset",
    split="test",
    download_mode="reuse_cache_if_exists"  # Use cached version if available
)
# Check how many records contain "20" in the prompt
count = 0
for entry in train_ds:
    messages = entry.get('messages', [])
    for msg in messages:
        if "20" in msg['content']:
            count += 1

print(f"Records containing '20' in prompt: {count} out of {len(ground_data)} ({count/len(ground_data)*100:.1f}%)")

Generating train split:   0%|          | 0/7414 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/412 [00:00<?, ? examples/s]

Generating eval split:   0%|          | 0/412 [00:00<?, ? examples/s]

Records containing '20' in prompt: 353 out of 412 (85.7%)


### Failure Case 2: Training Data Contamination with Internal Test Utilities

**Problem:** The fine-tuned model reproduces internal Airflow testing code that should never appear in production DAGs, resulting in a 6% hallucination-free pass rate (vs. 24% baseline).

**Root Cause Analysis (DAG ID 0):**
- **Contaminated Imports:** Both ground truth and fine-tuned model generate:
  ```python
  from tests_common.test_utils.system_tests import get_test_run
  test_run = get_test_run(dag)
  ```
- **Source:** Training dataset inadvertently included internal Airflow CI/CD test files used for system testing
- **Impact:** Claude evaluator correctly flags these as hallucinations since they reference non-public APIs unavailable in production Airflow installations

**Why This Matters:**
- Code would fail at import time: `ModuleNotFoundError: No module named 'tests_common'`
- Indicates the model cannot distinguish between test scaffolding and production code patterns
- Baseline model, lacking exposure to these internals, generates cleaner production-ready code

**Mitigation Strategies:**
1. **Data Filtering:** Implement strict preprocessing to exclude all files under `tests/`, `tests_common/`, or with `test_` prefixes
2. **Validation Pipeline:** Add automated checks to flag imports from non-production namespaces before adding examples to training set
3. **Synthetic Cleaning:** Post-process training examples to remove test harness boilerplate while preserving core DAG logic

In [102]:
pd.set_option('display.max_colwidth', None)
df_llm[df_llm.dag_id==0][['model','no_hallucination_score','no_hallucination_reasoning']]

Unnamed: 0,model,no_hallucination_score,no_hallucination_reasoning
0,ground,0,The code includes test harness boilerplate at the end: 'from tests_common.test_utils.system_tests import get_test_run' and 'test_run = get_test_run(dag)'. These are internal testing modules that should not be present in production-ready code.
342,baseline,1,The code uses only standard Airflow libraries (airflow.operators.python_operator.PythonOperator and airflow.providers.snowflake.hooks.snowflake.SnowflakeHook). There are no imports from internal testing modules or test harness boilerplate. The code is clean and does not contain hallucinated or leaked test utilities.
684,finetuned,0,"The code contains test harness boilerplate that should not be in production code. Specifically, it imports 'from tests_common.test_utils.system_tests import get_test_run' and includes 'test_run = get_test_run(dag)', which are internal testing modules. Additionally, the line 'snowflake_insert_many >> TriggerRule.ALL_DONE' is incorrect syntax and appears to be malformed code."


### Failure Case 3: Hallucinations from Insufficient Coverage of Niche APIs

**Problem:** When encountering specialized Airflow features underrepresented in training data, the model fabricates plausible-looking but incorrect API usage patterns.

**Root Cause Analysis (DAG ID 2 - Custom Timetables):**

**Baseline Model Hallucinations:**
- Invents non-existent import: `from airflow.timetables.after_workday import AfterWorkdayTimetable`
- Fabricates invalid API: `task.set_upstream(None, AfterWorkdayTimetable(...))`‚Äîtimetables cannot be passed to `set_upstream()`
- **Confidence despite incorrectness:** Syntax appears legitimate, making errors harder to detect without execution

**Fine-tuned Model Hallucinations:**
- Creates invalid DAG parameter: `timetables={"after_workday": AfterWorkdayTimetable(...)}`
- Correct approach: Pass timetable directly to `schedule` parameter in Airflow 3.x
- **Shows partial knowledge:** Imports the correct class but misapplies it

**Ground Truth Issues:**
- Even correct implementation imports from `airflow.example_dags.plugins.workday`‚Äîan internal example path unsuitable for production
- Highlights data quality problems even in "ground truth" examples

**Why Custom Timetables Are Challenging:**
- **Low Frequency:** Timetables are advanced features used in <5% of production DAGs
- **API Evolution:** Timetable interface changed significantly between Airflow 2.x and 3.x
- **Documentation Gaps:** Fewer Stack Overflow examples compared to common operators

**Mitigation Strategies:**
1. **Targeted Data Collection:** Manually curate 50-100 high-quality examples for each underrepresented feature (timetables, custom operators, dynamic task mapping)
2. **Retrieval-Augmented Generation (RAG):** Supplement model with runtime access to official Airflow documentation for rare APIs
3. **Confidence Calibration:** Train model to output uncertainty markers (e.g., `# TODO: Verify timetable syntax`) when dealing with low-confidence API usage

In [None]:
pd.set_option('display.max_colwidth', None)
df_llm[df_llm.dag_id==2]

Unnamed: 0,model,dag_id,idiomatic_airflow_score,idiomatic_airflow_reasoning,no_hallucination_score,no_hallucination_reasoning,instruction_adherence_score,instruction_adherence_reasoning
1,ground,2,1,"The code uses native Airflow constructs appropriately. It uses the EmptyOperator from the standard providers package, implements a custom timetable class (AfterWorkdayTimetable) as the schedule parameter, and uses the DAG context manager. This is the idiomatic way to implement custom scheduling in Airflow 3.0.0, where timetables are passed directly to the schedule parameter rather than wrapping logic in PythonOperators.",0,"The code imports 'AfterWorkdayTimetable' from 'airflow.example_dags.plugins.workday', which is an internal example/testing path. This is not a standard production-ready import from official Airflow providers or core libraries. In a production environment, this custom timetable class should either be defined in the DAG file itself or imported from a proper custom plugins directory, not from 'airflow.example_dags.plugins'.",1,"The code fully adheres to the instruction. It constructs an Airflow DAG that showcases a custom timetable scheduling mechanism using the AfterWorkdayTimetable class, builds a simple workflow with an EmptyOperator, and demonstrates scheduling flexibility by triggering after standard work hours. All requested elements are present: custom timetable implementation, simple workflow, and illustration of scheduling flexibility."
343,baseline,2,0,"The code attempts to use a custom timetable but does so incorrectly. The proper idiomatic way to use a custom timetable in Airflow is to pass it to the DAG constructor via the 'timetable' parameter, not by calling a non-existent 'set_upstream' method with timetable parameters on a task. The correct pattern would be: DAG(..., timetable=AfterWorkdayTimetable(...)). Additionally, uses deprecated 'EmptyOperator' import path instead of 'airflow.operators.empty.EmptyOperator' for Airflow 3.0.0.",0,"The code contains multiple hallucinations: (1) 'airflow.timetables.after_workday.AfterWorkdayTimetable' does not exist in standard Airflow - this is a fabricated import path and class; (2) The method 'task.set_upstream(None, AfterWorkdayTimetable(...))' is not a valid Airflow API - set_upstream does not accept timetable parameters; (3) Missing import for 'datetime' class while using it; (4) The timetable constructor parameters shown (start_time, end_time with datetime objects) don't match any real Airflow timetable pattern.",0,"The instruction asks to 'showcase the implementation of a custom timetable scheduling mechanism using the AfterWorkdayTimetable class' and 'triggers an empty operator after standard work hours'. While the code attempts to use AfterWorkdayTimetable and includes an EmptyOperator, it fails to properly implement the custom timetable mechanism. The timetable should be passed to the DAG constructor, not applied to individual tasks. The code also uses 'schedule_interval=timedelta(days=1)' which would override any timetable setting, showing fundamental misunderstanding of how custom timetables work in Airflow."
685,finetuned,2,1,"The code uses the appropriate Airflow components idiomatically. It imports and uses the AfterWorkdayTimetable class directly, uses the EmptyOperator from the standard providers package, and follows proper DAG construction patterns. No generic PythonOperator wrappers are used where native operators exist.",0,"The code contains a significant error that suggests hallucination or confusion about Airflow 3.0.0 API. The 'timetables' parameter as a dictionary is not a valid DAG parameter in Airflow. The correct approach is to pass the timetable instance directly to the 'schedule' parameter (e.g., schedule=AfterWorkdayTimetable(...)). The code shows 'schedule=""@daily""' alongside a 'timetables' dict, which is not standard Airflow API and appears to be fabricated syntax.",0,"The instruction asks to 'showcase the implementation of a custom timetable scheduling mechanism using the AfterWorkdayTimetable class' and 'triggers an empty operator after standard work hours'. While the code attempts to use AfterWorkdayTimetable, it does so incorrectly (wrong syntax with the timetables dict). More critically, it uses 'schedule=""@daily""' which would override any custom timetable behavior. The code does not properly implement the custom timetable as the primary scheduling mechanism, failing to demonstrate the requested scheduling flexibility."
