## 🌱 ESG emissions Declarative Pipeline - Partners data

**Purpose:** Use Agent model to extract emission information from partners data (PDFs) in batch.

**Data Source:** Delta table containing PDF texts

### STEP 1 : BATCH INFERENCE

**Purpose:** Run batch inference on all documents using the information extraction Agent model.

In [0]:
CREATE OR REFRESH STREAMING TABLE partners_data_raw 
  TBLPROPERTIES (
    'delta.feature.variantType-preview' = 'supported'
  )
  AS
  WITH query_results AS (
    SELECT
      `text` AS input,
      ai_query(
        'kie-ec44258a-endpoint',
        input,
        failOnError => false
      ) AS response
    FROM STREAM(`devconnect_2025`.`esg`.`document_texts`)
  )
  SELECT
    input,
    response.result AS response,
    response.errorMessage AS error
  FROM query_results;

### STEP 2 : POST PROCESSING

**Purpose:** Process query results , handle edge cases and apply constraints on the extracted data

In [0]:
CREATE OR REFRESH MATERIALIZED VIEW partners_data_parsed (
  CONSTRAINT valid_company_name EXPECT (org_name IS NOT NULL AND length(trim(org_name)) > 1) ON VIOLATION DROP ROW,
  CONSTRAINT reasonable_scope_1_ghg EXPECT (scope_3_ghg IS NULL OR (scope_3_ghg >= 0 AND scope_3_ghg < 1e10)) ON VIOLATION DROP ROW
)
AS SELECT 
  get_json_object(response, '$.company_name') as org_name,
  CAST(
    get_json_object(response, '$.annual_revenue_usd') AS DOUBLE
  ) as annual_revenue_usd,
  CAST(
  get_json_object(response, '$.number_of_employees') 
  AS INTEGER
  ) as nb_employees,
  CAST(
    get_json_object(response, '$.scope_3_emissions_tCO2')
    AS DOUBLE
  ) as scope_3_ghg
FROM partners_data_raw
WHERE get_json_object(response, '$.company_name') IS NOT NULL
  AND error IS NULL;