# Earth Engine Data Catalog LLM Experiment
Author: Renee Johnston (reneejohnston@google.com), Eliot Cowan (eliotc@google.com)

Forked from [work](https://colab.sandbox.google.com/drive/1Tvdquc7gFj_bR5WVRZBol-cdfKZliP6p?resourcekey=0-B93UAfDiEcw5kzGgJLF85g) by Johnny Yip (jcyip@google.com)

### Summary
This Colab notebook summarizes and creates embeddings for the Earth Engine data catalog to be better suited for a downstream search task.

**Required Setup**

You need to get a Generative AI API key [here](https://aistudio.google.com/app/prompts/new_chat). Be aware that you might need to pay
for use of the Generative AI API.

To save this key in the notebook, click on the key icon in Colab on the left-hand side and add your key as a secret with the name GOOGLE_API_KEY. Make sure the value has no newlines

**Key steps**

*   Authenticates the user and installs required libraries.
*   Sets the GCP Project ID and location.
*   Indexing

*   Downloads the data catalog from GitHub.
*   Parses JSONNET files to extract dataset details (ID, title, spatial/temporal extents).
*   Summarizes dataset descriptions using an LLM (Gemini-1.5-pro).
*   Stores the summarized information.

**Intended use**

The summarized documents can be embedded and used in a semantic search. Published alongside this notebook is an example.






In [None]:
#@title Authenticate

from google.colab import auth as google_auth
google_auth.authenticate_user()

In [None]:
#@title Install Python Libraries

%%capture
%%bash
pip install google_cloud_aiplatform
pip install langchain-community
pip install langchain_google_genai
pip install python-dateutil
pip install langchain
pip install GitPython
pip install jsonnet
pip install retry

## Setup

In [None]:
#@title Set GCP Project ID

project_id = "your_gcp_project" #@param {type:"string"}
location = "us-central1" #@param {type:"string"}


In [None]:
#@title LLM and embeddings for use with Langchain

from vertexai.preview.language_models import TextEmbeddingModel
from langchain_google_genai import ChatGoogleGenerativeAI
from google.colab import userdata
import vertexai

vertexai.init(project=project_id, location=location)

llm = ChatGoogleGenerativeAI(model="gemini-1.5-pro", google_api_key=userdata.get('GOOGLE_API_KEY'))
embedding_model = TextEmbeddingModel.from_pretrained("google/text-embedding-004")

## Choose save location

In [None]:
# Cached file paths
persistent_path = '.'
catalog_summary_path = f'{persistent_path}/catalog_summaries.jsonl'


## Index the Earth Engine data catalog.

**Note: you do not need to run this section unless you are building the index from scratch**

The original idea was to load the webpages on https://developers.google.com/earth-engine/datasets but it was quite time consuming to parse out relevant text from the layout. Turns out the entire data catalog is available on [Github](https://github.com/google/earthengine-catalog/tree/main) as jsonnet format.

TODO(b/363343474): Load STAC JSON files from gs://earthengine-stac instead. They are updated daily and expected to be more stable. They also have some light post-processing which may be helpful.

### Fetch and parse the catalog

In [None]:
%%bash
rm -rf ./earthengine-catalog

Download the Earth Engine data catalog from GitHub

In [None]:
from langchain.document_loaders import GitLoader

loader = GitLoader(
    clone_url="https://github.com/google/earthengine-catalog",
    repo_path="./earthengine-catalog",
    branch="main"
)

data = loader.load()

Create methods to parse JSONNET files

In [None]:
import os
import _jsonnet

#  Returns content if worked, None if file not found, or throws an exception
def try_path(dir, rel):
  if not rel:
    raise RuntimeError('Got invalid filename (empty string).')
  if rel[0] == '/':
    full_path = rel
  elif os.path.isfile(dir + rel):
    full_path = dir + rel
  elif rel == 'terms_of_use.md':
    # This one is super weird
    full_path = './earthengine-catalog/catalog/LANDSAT/' + rel
  else:
    full_path = './earthengine-catalog/catalog/' + rel
  if full_path[-1] == '/':
    raise RuntimeError('Attempted to import a directory')

  if not os.path.isfile(full_path):
    return full_path, None
  with open(full_path, 'rb') as f:
    return full_path, f.read()

def import_callback(directory, rel):
  full_path, content = try_path(directory, rel)
  if content:
    return full_path, content
  raise RuntimeError('File not found')


### Summarize the dataset description before generating the embedding.

In [None]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains.summarize import load_summarize_chain
from langchain.prompts import PromptTemplate

text_splitter = RecursiveCharacterTextSplitter(
  chunk_size = 1000,
  chunk_overlap  = 200,
  length_function = len,
)

def summarize_text(text: str) -> str:
  # Remove newlines in description
  text = re.sub('\n\s*', ' ', text)

  docs = text_splitter.create_documents([text])

  chain = load_summarize_chain(
    llm,
    chain_type="map_reduce",
    # map_prompt=prompt,
    # combine_prompt=prompt
  )
  return chain.run(docs)

Helper method to store generated summaries as JSONL

In [None]:
from  langchain.schema import Document
import json

def save_docs_to_jsonl(docs, file_path):
  with open(file_path, 'w') as jsonl_file:
    for doc in docs:
      jsonl_file.write(doc.json() + '\n')

def load_docs_from_jsonl(file_path):
  docs = []
  with open(file_path, 'r') as jsonl_file:
    for line in jsonl_file:
      data = json.loads(line)
      obj = Document(**data)
      docs.append(obj)
  return docs

Parse the JSONNET files and extract relevant informations. The current iteration of this code generates the embeddings for the dataset and band description, and puts the dataset title and ID as metadata. To make this more useful in the future, you can also pull in properties, date range etc.

In [None]:
import re
import json
from langchain.schema import Document
import tqdm
import datetime
import dateutil

import concurrent
import retry

iso_to_datetime = lambda date_str: dateutil.parser.isoparse(date_str) if date_str is not None else None

@retry.retry(BaseException, tries=5, delay=1)  # TODO: Make this exception more specific.
def _summarize_catalog_target(f):
  if re.search("^catalog/*/.+\.jsonnet", f.metadata['file_path']) and f.metadata['file_name'] != ('catalog.jsonnet'):
      json_str = _jsonnet.evaluate_file(
        f"./earthengine-catalog/{f.metadata['file_path']}",
        import_callback=import_callback,
      )
      json_dict = json.loads(json_str)
      if 'deprecated' in json_dict and json_dict['deprecated']:
        return
      # Ignore community datasets
      if json_dict['id'].startswith('projects/'):
        return
      try:
        assert len(json_dict['extent']['temporal']['interval']) == 1, f"Non-singular interval in temporal json: {json_dict['extent']['temporal']}"
        assert set(json_dict['extent']['spatial'].keys()) == {'bbox'}, f"Non-bounding box keys in spatial json {json_dict['spatial']}"
        metadata = {
          'file_path': f.metadata['file_path'],
          'id': json_dict['id'],
          'title': json_dict['title'],
          'spatial': json_dict['extent']['spatial']['bbox'],
          'temporal': [iso_to_datetime(datetime_string) for datetime_string in json_dict['extent']['temporal']['interval'][0]],
        }
      except Exception as e:
        print(json_dict)
        # print(e.with_traceback())
        raise e
      description = summarize_text(json_dict['description'])
      # Format band descriptions and add them to the summary
      band_string = ""
      if 'summaries' in json_dict:
        if 'eo:bands' in json_dict['summaries']:
          for band in json_dict['summaries']['eo:bands']:
            band_string += f'"{band["name"]}" represents {band["description"]}\n'
            if 'gee:classes' in band:
              band_string += "    Classes:\n"
              for cls in band['gee:classes']:
                band_string += f'    {cls["description"]}\n'
      description = description + "\n\n" + band_string
      doc = Document(
          page_content=description,
          metadata=metadata
      )
      return doc

def summarize_catalog(threads=1, start_index=0):
  global data
  documents = []
  with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as pool:
    documents = list(tqdm.tqdm(pool.map(_summarize_catalog_target, data[start_index:]), total=len(data)-start_index))
  return [doc for doc in documents if doc is not None]

Write summary texts to file, this process can take a while

In [None]:
if os.path.isfile(catalog_summary_path):
  documents = load_docs_from_jsonl(catalog_summary_path)
else:
  # In theory this should be able to go higher than 8, but in practice we hit gemini 500 errors.
  # Sometimes it works with 12.
  documents = summarize_catalog(threads=8)
  save_docs_to_jsonl(documents, catalog_summary_path)

for doc in documents[:10]:
  display(doc.page_content)

# Save file
You can instead download this file to place wherever is most helpful. The jsonl file can be used in any downstream application that requires the summarized catalog.

In [None]:
import google
from google.cloud import storage

gcp_location = "gs://your_bucket/catalog_summaries.jsonl" #@param {type:"string"}
client = storage.Client(project=project_id)
ee_client_bucket = google.cloud.storage.bucket.Bucket(client, name=gcp_location.split('/')[2],user_project=project_id)
blob = ee_client_bucket.blob('/'.join(gcp_location.split('/')[3:]))
blob.upload_from_filename('catalog_summaries.jsonl')