In [2]:
import os
from typing import Dict, List, Optional, Any
import datetime

# from llama_index.core import VectorStoreIndex, StorageContext, load_index_from_storage
# from llama_index.core.retrievers import VectorIndexRetriever, RouterRetriever
# from llama_index.core.response_synthesizers import ResponseMode
# from llama_index.core.query_engine import RetrieverQueryEngine
# from llama_index.core import get_response_synthesizer
# from llama_index.core.query_engine import CustomQueryEngine
# from llama_index.core.retrievers import BaseRetriever
import time
import sys
sys.path.append('/Users/beckyxu/Documents/GitHub/sgd-insight-engine')

from python_backend.config import logger, POLICY_FOLDER, GCP_PROJECT_ID, GCP_LOCATION, DOCUMENTS_BUCKET
from python_backend.storage.bigquery import get_fa_from_bigquery
from python_backend.storage.gcs import ensure_bucket_exists
from python_backend.ai.models import llm, embed_model
from python_backend.document.processor import process_document, docling_reader

# Initialize global variables
_indices_dict = {}
_policy_indices_dict = {}
_policy_context_cache = None  # Cache for policy context to avoid repeated extraction

import json

INFO:httpx:HTTP Request: GET https://us-central1-aiplatform.googleapis.com/v1beta1/publishers/google/models/gemini-2.0-flash "HTTP/1.1 200 OK"
  from .autonotebook import tqdm as notebook_tqdm


In [None]:
def create_policy_docs(
    policy_links: List[str] = None, 
    ) -> Dict:
    """
    Initialize indices for policy documents using the existing ChromaDB collection.
    
    Args:
        policy_links: list of policy document links (not used when loading from existing ChromaDB).

    Returns:
        List of extracted policy documents for analysis
    """
    # 1. create combinedtext for each document
    # file 1 SDG
    file_path = policy_links[0]
    docs = docling_reader.load_data(file_path)
    text_doc1_sdg = ' '.join(doc.text.strip() for doc in docs)
    
    # file 2 SDG
    file_path = policy_links[1]
    docs = docling_reader.load_data(file_path)
    text_doc2_sdg = ' '.join(doc.text.strip() for doc in docs)
    
    # file 3 RS
    file_path = policy_links[2]
    docs = docling_reader.load_data(file_path)
    text_doc1_rs = ' '.join(doc.text.strip() for doc in docs)
    
    # file 4 RS
    file_path = policy_links[3]
    docs = docling_reader.load_data(file_path)
    text_doc2_rs = ' '.join(doc.text.strip() for doc in docs)
    
    # file 5 RS
    file_path = policy_links[4]
    docs = docling_reader.load_data(file_path)
    text_doc3_rs = ' '.join(doc.text.strip() for doc in docs)
    
    return [text_doc1_sdg, text_doc2_sdg, text_doc1_rs, text_doc2_rs, text_doc3_rs] 

def answer_question_from_document_link(document_link: str, policy_doc_list) -> Dict[str, Any]:
    """
    Answer a question based on a document link and policy indices.
    First extracts information from policy documents, then uses that to analyze the project document.
    
    Args:
        document_link: The link to the project document to analyze.
        policy_doc_list: list of extracted document text from
                
    Returns:
        Dict containing the structured analysis and relevant context.
    """
    
    # Step 1: load policy context docs

    # Step 2: Download and analyze the project document 
    logger.info(f"Processing document link: {document_link}")
    try:
        processed_doc = process_document(document_link) 
        project_doc_text = processed_doc['text_doc_fa'] 
        project_doc_id = processed_doc['file_id'] 
    except Exception as e:
        logger.error(f"Error initializing project fa doc: {str(e)}")
        return {}
    
    # Step 3: Extract data from the project fa and the policy docs 
    try:
        system_prompt = """ Use ReAct:
        1. **Reason**: Identify relevant project elements from project document.
        2. **Act**: Match to tool or indicators.
        3. **Reason**: Validate matches.
        4. **Act**: Format the response as JSON
        If information is missing or uncertain, include null values.
        """
        from python_backend.ai.models import llm
        
        # 1. Create a summary text, with measurable indicators 
        try:
            # Create prompt for the LLM
            prompt = f"""{system_prompt}
            You are analyzing a project document financial document for Sustainable Development Goals (SDGs) impact and potential application of remote sensing.
            The project document contains two main sections in the report: <finance> and <project description>.
            Based on the document text below, please answer the following question:

            *Questions*:
            - Identify quantifiable outcomes (e.g., "5 health centers built", "training for 200 farmers"). Focus on the <project description> (and <finance> if relevant).
            - What are the main objectives of the project? Focus on the <project description>.
            - What specific societal, economic, or environmental problems does it address? Focus on <project description>.
            - Who are the beneficiaries and who will be impacted? Focus on <project description>.
            - What are the anticipated short-term and long-term outcomes? Focus on <project description> and <finance>.

            *Here is the document text*:
            {project_doc_text}
            *Strictly follow this json response format*
            {{
                "project_summary": "string of brief project SDG impact summary in paragraph",
                "quantifiable_outcome_list": ["outcome_item": "string"],
                "project_summary_list": ["analysis_item": "string"]
            }}
            Create a written summary of the project based on the questions and the document text.
            """
            
            # Get response from LLM
            response = llm.complete(prompt)
            answer_summary = response.text.strip()
            
            # Store in summary
            # print(answer_summary)
            logger.info(f"Generated summary for doc")
        except Exception as e:
            logger.info(f"Fail to generate summary {str(e)}")
            answer_summary = None
            
        # 2. Create a sdg and sdg indicators json
        try:
            # Create prompt for the LLM
            prompt = f"""{system_prompt}
            You are analyzing a project document financial document for Sustainable Development Goals (SDGs) and measurable SDG indicators.
            You are given two sets of documents: 1. project document, 2. sdg indicators documents.
            The project document contains two main sections in the report: <finance> and <project description>.
            Focusing on <project description> and the sdg indicators documents, analyze:
            - What SDG goals does this project contribute to?
            - What specific SDG indicators are measurable in this project?"
            - Output a nested json with keys: "sdg_goals" and "sdg_indicators". Each key can have a list of objects. Follow the format below.

            *Here is the project document*:
            {project_doc_text}

            *Here is the sdg indicators document*:
            {policy_doc_list[0]}
            {policy_doc_list[1]}

            *Strictly follow this json response format*
            {{
                "sdg_goals": [
                    {{
                        "sdg_goal": "string",  # e.g., "6"
                        "name": "string",     # e.g., "Clean Water and Sanitation"
                        "relevance": "string" # e.g., "Provides safe water"
                    }}
                ],
                "sdg_indicators": [
                    {{
                        "sdg_indicator": "string",
                        "description" : "string",
                        "measurability" : "string"
                    }}
                ]
            }}

            Only output the json string.
            """
            # Get response from LLM
            response = llm.complete(prompt)
            answer_sdg = response.text.strip()
            
            # Store in summary
            # print(answer_sdg)
            logger.info(f"Generated SDG for doc")  
        except Exception as e:
            logger.error(f"Error generating SDG doc: {str(e)}")
            answer_sdg = None
            
        # 3. Create a imat and remote sensing capacity json
        try:
            # Create prompt for the LLM
            prompt = f"""{system_prompt}
            You are analyzing a project document financial document for potential application of remote sensing.
            You are given two sets of documents: 1. project document, 2. remote sensing tools documents.
            The project document contains two main sections in the report: <finance> and <project description>.
            Focusing on <project description> from the  project document, and the remote sensing tool documents, analyze:
            - Which remote sensing or other IMAT tools applicable for this project?
            - How is this remote sensing or IMAT tool applicable for this project?
            - Output a nested json with keys: "remote_sensing_tools". Each key can have a list of objects. Follow the format below.

            *Here is the project document text*:
            {project_doc_text}

            *Here are the remote sensing tools documents*:
            {policy_doc_list[2]}
            {policy_doc_list[3]}
            {policy_doc_list[4]}

            *Strictly follow this nested json response format*
                {{
                    "remote_sensing_tools":[
                        {{
                            "technology": "string",   # e.g., "Remote Sensing Tool A"
                            "application": "string"   # e.g., "Monitor water sources"
                        }}
                    ]
                }}
            Only output the json string.
            """
            
            # Get response from LLM
            response = llm.complete(prompt)
            answer_rs = response.text.strip()
            
            # Store in summary
            # print(answer_rs)
            logger.info(f"Generated remote sensing summary for doc")
        except Exception as e:
            logger.error(f"Error remote sensing summary doc: {str(e)}")
            answer_rs = None

        answer_summary_dict = safe_json_parse(answer_summary) if answer_summary else {}
        answer_sdg_dict = safe_json_parse(answer_sdg) if answer_sdg else {}
        answer_rs_dict = safe_json_parse(answer_rs) if answer_rs else {}
        file_id_dict = {"file_id": project_doc_id}
        combined_result = {**file_id_dict, **answer_summary_dict, **answer_sdg_dict, **answer_rs_dict}
        
        result = json.dumps(combined_result)
        # result = json.loads(result)
        return result
    
    except Exception as e:
        logger.error(f"Error processing document: {str(e)}")
        return {"answer": f"Error processing document: {str(e)}", "source_links": [document_link]}
    
import json
def safe_json_parse(response_text: str) -> dict:
    """
    Extracts JSON from a code block in an LLM response and parses it.
    
    Args:
        response_text: The text returned by the LLM (possibly with ```json ... ```)

    Returns:
        Parsed Python dictionary, or empty dict if parsing fails.
    """
    try:
        # Match JSON block wrapped in triple backticks
        match = re.search(r"```(?:json)?\s*({.*?})\s*```", response_text, re.DOTALL)
        if match:
            json_str = match.group(1)
            return json.loads(json_str)
    except Exception as e:
        print(f"Failed to extract/parse JSON: {e}")
    return {}

In [None]:
# file_path = policy_links[0]
# docs = docling_reader.load_data(file_path)
# text_doc1_sdg = ' '.join(doc.text.strip() for doc in docs)
# print(text_doc1_sdg)

In [None]:
# NOTE ONLY RUN THIS ONCE

policy_links = [
    '/Users/beckyxu/Documents/GitHub/sgd-insight-engine/python_backend/policy_docs/Global-Indicator-Framework-after-2024-refinement-English.xlsx',
    '/Users/beckyxu/Documents/GitHub/sgd-insight-engine/python_backend/policy_docs/Guidance for aligning UNOPS engagements to SDGs.docx',
    '/Users/beckyxu/Documents/GitHub/sgd-insight-engine/python_backend/policy_docs/IMAT’s Remote Sensing Capacities and Tools.docx',
    '/Users/beckyxu/Documents/GitHub/sgd-insight-engine/python_backend/policy_docs/Rescuing the SDGs with Geospatial Information (1).pdf',
    '/Users/beckyxu/Documents/GitHub/sgd-insight-engine/python_backend/policy_docs/UNOPS Geographic Information Systems (GIS).pptx'
]
policy_doc_list = create_policy_docs(policy_links)

In [None]:
document_link = "https://drive.google.com/file/d/1BnxsDqskNB2Q4KLcZ7mAK-tZ6BGXaiEY"
if not is_document_already_processed(document_link):
    result = answer_question_from_document_link(document_link, policy_doc_list)

In [None]:
print(json.dumps(result))
result1 = json.dumps(result)

----

# Continue here to figure out how to upload to BQ

In [None]:
# Define the output filename
output_filename = '/Users/beckyxu/Documents/GitHub/sgd-insight-engine/python_backend/policy_docs/test_project_data.json'

# try:
#     # 1. Load the string into a Python dictionary to validate it's proper JSON
#     #    and to allow for pretty-printing.
#     data = json.loads(result1)

#     # 2. Write the Python dictionary to a file as formatted JSON
#     #    'w' mode overwrites the file if it exists.
#     #    encoding='utf-8' is standard for JSON.
#     #    indent=4 makes the output file human-readable.
#     with open(output_filename, 'w', encoding='utf-8') as f:
#         json.dump(data, f, ensure_ascii=False, indent=4)

#     print(f"Data successfully validated, formatted, and saved to: {os.path.abspath(output_filename)}")

# except json.JSONDecodeError as e:
#     print(f"Error: The provided string is not valid JSON. {e}")
#     # Optional: Save the original (potentially invalid) string anyway
#     # with open("project_data_raw_error.txt", 'w', encoding='utf-8') as f_err:
#     #     f_err.write(json_data_string)
#     # print("Original raw string saved to project_data_raw_error.txt")

# except IOError as e:
#     print(f"Error writing file {output_filename}: {e}")
# except Exception as e:
#     print(f"An unexpected error occurred: {e}")


In [None]:
# Load it back

output_filename = '/Users/beckyxu/Documents/GitHub/sgd-insight-engine/python_backend/policy_docs/test_project_data.json'

# NOTE: The original file is formatted with a string outside with json.dump() ... '{"file_id":

filename = output_filename

try:
    with open(filename, 'r', encoding='utf-8') as f:
        data = json.load(f)

    # Now 'data' contains the JSON data as a Python dictionary
    print("JSON data loaded successfully:")
    # Print a few keys/values for verification (don't print the whole thing if it's huge)
    print(f"All data: {json.dumps(data, indent=4)}") # Print another field

except FileNotFoundError:
    print(f"Error: File '{filename}' not found.")
except json.JSONDecodeError as e:
    print(f"Error: Invalid JSON in file '{filename}': {e}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

In [None]:
data['file_id'] = 'https://drive.google.com/file/d/1BnxsDqskNB2Q4KLcZ7mAK-tZ6BGXaiEY'

In [None]:
from python_backend.storage.bigquery import create_bigquery_table

In [7]:
from python_backend.storage.bigquery import get_bigquery_client, ensure_processed_docs_table_exists, is_document_already_processed, mark_document_as_processed

In [4]:
from ast import main
import os
from google.cloud import bigquery
from config import logger, GCP_PROJECT_ID, BQ_FA_DATASET, BQ_FA_TABLE, GCP_REPORTS_ID 

def upload_to_bigquery(row: dict, project_id = GCP_PROJECT_ID, dataset_id = BQ_FA_DATASET, table_id = "analysis_results") -> None:
    """
    Upload a single row to a BigQuery table with Repeated Fields.
    Args:
        row: Dict[str, Any], processed row from json document analysis results to upload,
        project_id: str, GCP project id, 
        dataset_id: str, GCS dataset id, 
        table_id: str, GCS table id

    Returns:
    """
    
    bigquery_client = get_bigquery_client()
    if not bigquery_client or not ensure_processed_docs_table_exists():
        logger.warning("BigQuery client or processed documents table not initialized")
        return False
            
    table_ref = f"{project_id}.{dataset_id}.{table_id}"

    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition="WRITE_APPEND"
    )
    try:
        job = bigquery_client.load_table_from_json([row], table_ref, job_config=job_config)
        job.result()  # Wait for the job to complete.  Raises exception if it fails.
        logger.info(f"Uploaded row for {row['document_link']} to {table_ref}")
    except Exception as e:
        logger.error(f"Error uploading to BigQuery: {str(e)}")
        raise

In [None]:

schema = [
    bigquery.SchemaField("file_id", "STRING", mode="REQUIRED", description="Unique identifier for the file"),
    bigquery.SchemaField("project_summary", "STRING", mode="NULLABLE", description="A summary of the project"),
    bigquery.SchemaField(
        "quantifiable_outcome_list",
        "RECORD",
        mode="REPEATED",
        description="List of quantifiable outcomes",
        fields=[
            bigquery.SchemaField("outcome_item", "STRING", mode="NULLABLE", description="A specific quantifiable outcome item")
        ],
    ),
    bigquery.SchemaField(
        "project_summary_list",
        "RECORD",
        mode="REPEATED",
        description="List of project summary analysis items",
        fields=[
            bigquery.SchemaField("analysis_item", "STRING", mode="NULLABLE", description="An analysis item related to the project summary")
        ],
    ),
    bigquery.SchemaField(
        "sdg_goals",
        "RECORD",
        mode="REPEATED",
        description="List of Sustainable Development Goals associated with the project",
        fields=[
            bigquery.SchemaField("sdg_goal", "STRING", mode="NULLABLE", description="The SDG goal number (e.g., 16)"),
            bigquery.SchemaField("name", "STRING", mode="NULLABLE", description="The name of the SDG goal (e.g., Peace, Justice and Strong Institutions)"),
            bigquery.SchemaField("relevance", "STRING", mode="NULLABLE", description="Explanation of how the project relates to the SDG goal"),
        ],
    ),
    bigquery.SchemaField(
        "sdg_indicators",
        "RECORD",
        mode="REPEATED",
        description="List of Sustainable Development Goal indicators associated with the project",
        fields=[
            bigquery.SchemaField("sdg_indicator", "STRING", mode="NULLABLE", description="The SDG indicator number (e.g., 16.4.1)"),
            bigquery.SchemaField("description", "STRING", mode="NULLABLE", description="Description of the SDG indicator"),
            bigquery.SchemaField("measurability", "STRING", mode="NULLABLE", description="Explanation of how the indicator can be measured in relation to the project"),
        ],
    ),
    bigquery.SchemaField(
        "remote_sensing_tools",
        "RECORD",
        mode="REPEATED",
        description="List of remote sensing tools used in the project",
        fields=[
            bigquery.SchemaField("technology", "STRING", mode="NULLABLE", description="The name of the remote sensing technology or tool"),
            bigquery.SchemaField("application", "STRING", mode="NULLABLE", description="How the tool is applied in the project"),
        ],
    ),
]

In [None]:
upload_to_bigquery(data)

In [6]:
from python_backend.storage.bigquery import get_bigquery_client, is_document_already_processed, mark_document_as_processed

In [None]:
link='https://drive.google.com/file/d/1BnxsDqskNB2Q4KLcZ7mAK-tZ6BGXaiEY'
is_document_already_processed(link)

In [None]:
mark_document_as_processed(link)

In [3]:
from python_backend.auth.credentials import credentials_manager
from python_backend.config import logger, GCP_PROJECT_ID, GCP_LOCATION, BQ_FA_DATASET, BQ_FA_TABLE 

In [69]:
# Use the GCP_LOCATION from config to specify the location explicitly
bigquery_client_fa = get_bigquery_client()
if not bigquery_client_fa:
    print("error initializing client")

# TODO: CHANGE THIS TO THE ACTUAL COLNAME
query = f"""
SELECT
  Legal_Agreement,
  t0.File_URL,
  Region,
  t1.Donor
FROM
  `unops-eil-sdg-measurement-prod`.NS_Project_Legal_Agreement.TBL_Project_Legal_Agreement
  CROSS JOIN
  UNNEST(Legal_Agreement_Files) AS t0
  CROSS JOIN
  UNNEST(Donors) AS t1
WHERE
  t0.File_URL IS NOT NULL
LIMIT 2;
"""

query_job = bigquery_client_fa.query(query) 
results = query_job.result()
print(results)
# links = [row .file_url for row in results]

# links

<google.cloud.bigquery.table.RowIterator object at 0x31758f7d0>


In [None]:
result_list = list(results)
links = [row.Legal_Agreement for row in result_list]
links

In [79]:
import pandas as pd
df = results.to_dataframe()

ValueError: Please install the 'db-dtypes' package to use this function.

In [64]:
links = [row.Legal_Agreement for row in results]

ValueError: ('Iterator has already started', <google.cloud.bigquery.table.RowIterator object at 0x31758d220>)

get_fa_from_bigquery

In [15]:
def get_dataset_location(dataset_id):
    """Retrieves the location of a BigQuery dataset."""

    bigquery_client = get_bigquery_client()
    dataset_ref = bigquery_client.dataset(dataset_id)
    dataset = bigquery_client.get_dataset(dataset_ref)  # Make an API request.

    print(f"Dataset {dataset_id} is located in {dataset.location}")
    return dataset.location

In [45]:
_bigquery_client = None
from google.cloud import bigquery

def get_bigquery_client():
    """Get authenticated BigQuery client."""
    # a global variable is accessible throughout the entire module (file)
    global _bigquery_client
    if _bigquery_client is None:
        try:
            credentials = credentials_manager.get_credentials('bigquery')
            _bigquery_client = bigquery.Client(project=GCP_PROJECT_ID, credentials=credentials)
            logger.info("BigQuery client initialized")
        except Exception as e:
            logger.error(f"Error initializing BigQuery client: {str(e)}")
            return None
    return _bigquery_client

In [16]:
get_dataset_location(BQ_FA_DATASET)

Dataset NS_Project_Legal_Agreement is located in EU


'EU'