Skip to content

Interactive LLM example based on the Airflow survey data#64824

Draft
vikramkoka wants to merge 1 commit intomainfrom
aip99_example
Draft

Interactive LLM example based on the Airflow survey data#64824
vikramkoka wants to merge 1 commit intomainfrom
aip99_example

Conversation

@vikramkoka
Copy link
Copy Markdown
Contributor

Here is a new interactive example for the common.ai provider based on public data which happens to be the Airflow 2025 Survey data.

The goal is to demonstrate an interactive LLM use case, which can be used by the developer as an example with other integrations pulling other data sets.


Was generative AI tooling used to co-author this PR?
  • [ X] Yes (please specify the tool below)

Here is a new interactive example for the common.ai provider based on public data which happens to be the Airflow 2025 Survey data.

The goal is to demonstrate an interactive LLM use case, which can be used by the developer as an example with other integrations pulling other data sets.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Adds a new example DAG demonstrating an interactive LLM-driven analysis flow over the Airflow Community Survey 2025 CSV, using HITL steps for question/result review.

Changes:

  • Introduces a new “interactive survey analysis” example DAG using LLMSQLQueryOperator + AnalyticsOperator over a local CSV.
  • Adds HITL tasks to confirm the natural-language prompt and approve the returned results.
  • Documents required setup (LLM connection and cleaned CSV) and provides a minimal schema context for SQL generation.

"SURVEY_CSV_PATH",
"/opt/airflow/data/airflow-user-survey-2025.csv",
)
SURVEY_CSV_URI = f"file://{SURVEY_CSV_PATH}"
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file://{SURVEY_CSV_PATH} produces an invalid/ambiguous file URI for absolute paths (e.g. /opt/... becomes file:////opt/...). Use a proper URI builder (e.g., pathlib.Path(...).resolve().as_uri()) or ensure the URI is formatted as file:///... for absolute paths so DataFusion reliably resolves the file across platforms.

Copilot uses AI. Check for mistakes.
Comment on lines +96 to +101
survey_datasource = DataSourceConfig(
conn_id="",
table_name="survey",
uri=SURVEY_CSV_URI,
format="csv",
)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting conn_id="" risks being treated as a real connection ID and may trigger a lookup of an empty connection name at runtime. Prefer omitting conn_id entirely (if optional) or using None so the config unambiguously represents a file-based datasource without a connection.

Copilot uses AI. Check for mistakes.


# [START example_llm_survey_interactive]
@dag(schedule=None)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DAG definition does not specify start_date (and typically catchup=False). In Airflow, missing start_date commonly causes DAG parsing/validation failures or inconsistent UI behavior. Add an explicit start_date and set catchup=False for this manual/interactive example DAG.

Suggested change
@dag(schedule=None)
@dag(
schedule=None,
start_date=datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc),
catchup=False,
)

Copilot uses AI. Check for mistakes.
Comment on lines +147 to +163
generate_sql = LLMSQLQueryOperator(
task_id="generate_sql",
prompt="{{ ti.xcom_pull(task_ids='prompt_confirmation')['params_input']['prompt'] }}",
llm_conn_id=LLM_CONN_ID,
datasource_config=survey_datasource,
schema_context=SURVEY_SCHEMA,
)

# ------------------------------------------------------------------
# Step 3: SQL execution via Apache DataFusion.
# ------------------------------------------------------------------
run_query = AnalyticsOperator(
task_id="run_query",
datasource_configs=[survey_datasource],
queries=["{{ ti.xcom_pull(task_ids='generate_sql') }}"],
result_output_format="json",
)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This executes LLM-generated SQL directly. Even in an example DAG, this is effectively “untrusted code execution” and can be abused (e.g., querying other local files/URIs if DataFusion SQL functions allow it, or running very expensive queries). Add a guard step before execution (e.g., an approval/review task for the SQL, or validation that enforces a restricted pattern such as SELECT-only and referencing only the survey table), and consider setting query/resource limits if supported by AnalyticsOperator.

Copilot uses AI. Check for mistakes.
Comment on lines +171 to +173
def extract_data(raw: str) -> str:
results = json.loads(raw)
data = [row for item in results for row in item["data"]]
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract_data assumes raw is a JSON string, but depending on XCom serialization and operator implementation, run_query.output may already be a Python object (list/dict). In that case json.loads(raw) will raise a TypeError. Make this tolerant by handling both cases (string vs. already-parsed object) and validating the expected shape before iterating.

Suggested change
def extract_data(raw: str) -> str:
results = json.loads(raw)
data = [row for item in results for row in item["data"]]
def extract_data(raw: str | list[dict] | dict) -> str:
results = json.loads(raw) if isinstance(raw, str) else raw
if isinstance(results, dict):
results = [results]
if not isinstance(results, list):
raise ValueError("Expected analytics result to be a list or dict payload")
data = []
for item in results:
if not isinstance(item, dict):
raise ValueError("Expected each analytics result item to be a dict")
rows = item.get("data")
if not isinstance(rows, list):
raise ValueError("Expected each analytics result item to contain a list in 'data'")
data.extend(rows)

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants