# LLM-for-Metadata-Harvesting

This notebook contains the experimental results from [P6: Groot zeegras (2023)](https://datahuiswadden.openearth.nl/geonetwork/srv/api/records/TF1TbsTxTqykP5rv6MXJEg).  
The results can be found under the last code block. Note that not all code is directly relevant to this experiment; some parts are retained for future development and elaboration.


In [1]:
from cheatsheet import CHEATSHEETS
from prompt import PROMPTS
from grobidmonkey import reader
from webutils import readWebContent, downloadAndParseXML

monkeyReader = reader.MonkeyReader('monkey') # or 'lxml' or 'x2d'

dataPortalURL = [
    "https://developers.google.com/earth-engine/datasets/catalog/NASA_HLS_HLSS30_v002",
    "https://lpdaac.usgs.gov/products/mod09a1v061/",
    "https://stac.ecodatacube.eu/veg_quercus.robur_anv.eml/collection.json?.language=en",
    "https://stac.ecodatacube.eu/ndvi_glad.landsat.ard2.seasconv/collection.json?.language=en",
    "https://zenodo.org/records/8319440",
    "https://lifesciences.datastations.nl/dataset.xhtml?persistentId=doi:10.17026/dans-2bd-kskz",
    "https://www.gbif.org/dataset/4fa7b334-ce0d-4e88-aaae-2e0c138d049e",
    "https://www.gbif.org/dataset/74196cd9-7ebc-4b20-bc27-3c2d22e31ed7",
    "https://www.gbif.org/dataset/bc0acb9a-131f-4085-93ae-a46e08564ac5",
    "https://zenodo.org/records/11440456",
    "https://stac.ecodatacube.eu/blue_glad.landsat.ard2.seasconv.m.yearly/collection.json",
    "https://datahuiswadden.openearth.nl/geonetwork/srv/eng/catalog.search#/metadata/L-mHomzGRuKAHGMkUPjY9g",
    "https://datahuiswadden.openearth.nl/geonetwork/srv/eng/catalog.search#/metadata/0fe7e64b-50b3-4cee-b64a-02659fc2b6c7",
    "https://stac.ecodatacube.eu/green_glad.landsat.ard2.seasconv.m.yearly/collection.json",
    "https://datahuiswadden.openearth.nl/geonetwork/srv/api/records/A0h06_NlSEuNlium5OO3FA",
]

# Get the web content
url = dataPortalURL[0]


# soup = readWebContent(url)
# if soup is None:
#     raise ValueError("Failed to retrieve web content")

# # Extract text from the webpage - adjust the selector based on the webpage structure
# # This is a basic example - you might need to modify based on the specific webpage
# text = soup.get_text(separator='\n', strip=True)

# text_xml, _ = downloadAndParseXML("https://datahuiswadden.openearth.nl/geonetwork/srv/api/records/A0h06_NlSEuNlium5OO3FA/formatters/xml")
# text += "\n" + text_xml
# full_text = text
import nest_asyncio
import asyncio
from webutils import extract_full_page_text

# Apply nest_asyncio to allow asyncio.run() in Jupyter
nest_asyncio.apply()

# Run the async function
full_text = await extract_full_page_text(url)

# Optionally display or save it
print(full_text)  # Print the first 1000 characters

Skip to main content

developers.google.com uses cookies from Google to deliver and enhance the quality of its services and to analyze traffic. Learn more.

OK, got it
Earth Engine Data Catalog
/
Sign in
Home
Categories
All datasets
All tags
Landsat
MODIS
Sentinel
Publisher
Community
API Docs
HLSS30: HLS Sentinel-2 Multi-spectral Instrument Surface Reflectance Daily Global 30m 
bookmark_border
Dataset Availability
2015-11-28T00:00:00Z–2025-05-06T23:38:31Z
Dataset Provider
NASA LP DAAC
Earth Engine Snippet
ee.ImageCollection("NASA/HLS/HLSS30/v002") open_in_new
Tags
landsat nasa satellite-imagery sentinel usgs
Description
Bands
Image Properties
Terms of Use
Citations
DOIs

The Harmonized Landsat Sentinel-2 (HLS) project provides consistent surface reflectance data from the Operational Land Imager (OLI) aboard the joint NASA/USGS Landsat 8 satellite and the Multi-Spectral Instrument (MSI) aboard Europe's Copernicus Sentinel-2A satellites. The combined measurement enables global observatio

In [2]:
from openai import OpenAI
from dotenv import load_dotenv
from utils import (
    logger,
    clean_str,
    compute_mdhash_id,
    decode_tokens_by_tiktoken,
    encode_string_by_tiktoken,
    is_float_regex,
    normalize_extracted_info,
    pack_user_ass_to_openai_messages,
    split_string_by_multi_markers,
    use_llm_func_with_cache,
)
from collections import defaultdict

import tiktoken
import re
import os

llm_model = "gpt-4"
load_dotenv()

def chunk_text(text: str, max_tokens: int = 6000) -> list[str]:
    """Split text into chunks that fit within token limit"""
    encoder = tiktoken.encoding_for_model(llm_model)
    tokens = encoder.encode(text)
    chunks = []
    
    current_chunk = []
    current_length = 0
    
    for token in tokens:
        if current_length + 1 > max_tokens:
            # Convert chunk back to text
            chunk_text = encoder.decode(current_chunk)
            chunks.append(chunk_text)
            current_chunk = []
            current_length = 0
        
        current_chunk.append(token)
        current_length += 1
    
    if current_chunk:
        chunks.append(encoder.decode(current_chunk))
    
    return chunks

def extract_entities(text: str, entity_types: list[str], special_interest: str = "") -> dict:
    # Split text into chunks
    chunks = chunk_text(text, max_tokens=4000)  # Leave room for completion
    
    all_nodes = defaultdict(list)
    all_edges = defaultdict(list)
    
    client = OpenAI(
        api_key=os.getenv("OPENAI_API_KEY")
    )

    nightly_entities_prompt = CHEATSHEETS["nightly_entity_template"].format(
        tuple_delimiter=PROMPTS["DEFAULT_TUPLE_DELIMITER"],
        record_delimiter=PROMPTS["DEFAULT_RECORD_DELIMITER"],
    )

    all_records = []
    
    # Process each chunk
    for chunk in chunks:
        formatted_prompt = {
            "language": "English",
            "tuple_delimiter": PROMPTS["DEFAULT_TUPLE_DELIMITER"],
            "record_delimiter": PROMPTS["DEFAULT_RECORD_DELIMITER"],
            "completion_delimiter": PROMPTS["DEFAULT_COMPLETION_DELIMITER"],
            "entity_types": entity_types,
            "special_interest": special_interest,
            "nightly_entities": nightly_entities_prompt,
            "input_text": chunk
        }
        
        response = client.chat.completions.create(
            model=llm_model,
            messages=[
                {
                    "role": "system",
                    "content": "You are an AI trained to extract entities (meta data fields) and relationships from text."
                },
                {
                    "role": "user",
                    "content": _format_prompt(formatted_prompt)
                }
            ],
            temperature=0.0,
            max_tokens=2000
        )

        print("----------------\nresponse.choices[0].message.content:\n", response.choices[0].message.content)
        
        # Process the chunk results
        records = _process_extraction_result(
            response.choices[0].message.content,
            chunk_key=compute_mdhash_id(chunk),
            file_path="unknown_source"
        )
        all_records += records
    
    return _post_processing_records(
        all_records, 
        chunk_key="unknown_chunk", 
        file_path="unknown_source"
    )
    

def _format_prompt(params: dict) -> str:
    # Format the prompt template with the provided parameters
    prompt_template = CHEATSHEETS["fill_nightly"]
    return prompt_template.format(**params)

def _handle_post_processed_entity_extraction(
    record_attributes: list[str],
    chunk_key: str,
    file_path: str = "unknown_source",
): 
    """Handle the extraction of a single entity from the record attributes.
    
    Args:
        record_attributes (list[str]): The attributes of the record to process
        chunk_key (str): The key for the chunk being processed
        file_path (str): The file path for citation
        
    Returns:
        dict: A dictionary containing the extracted entity information, or None if extraction fails
    """
    if len(record_attributes) < 3 or record_attributes[0] != '"entity"':
        return None

    # Clean and validate entity name
    entity_name = clean_str(record_attributes[1]).strip('"')
    if not entity_name.strip():
        logger.warning(
            f"Entity extraction error: empty entity name in: {record_attributes}"
        )
        return None

    # Normalize entity name
    entity_name = normalize_extracted_info(entity_name, is_entity=True)

    # Clean and validate entity type
    entity_value = clean_str(record_attributes[2]).strip('"')
    if not entity_value.strip() or entity_value.startswith('("'):
        logger.warning(
            f"Entity extraction error: invalid entity type in: {record_attributes}"
        )
        return None

    return dict(
        entity_name=entity_name,
        entity_value=entity_value,
        source_id=chunk_key,
        file_path=file_path,
    )

def _handle_single_entity_extraction(
    record_attributes: list[str],
    chunk_key: str,
    file_path: str = "unknown_source",
):
    if len(record_attributes) < 4 or record_attributes[0] != '"entity"':
        return None

    # Clean and validate entity name
    entity_name = clean_str(record_attributes[1]).strip('"')
    if not entity_name.strip():
        logger.warning(
            f"Entity extraction error: empty entity name in: {record_attributes}"
        )
        return None

    # Normalize entity name
    entity_name = normalize_extracted_info(entity_name, is_entity=True)

    # Clean and validate entity type
    entity_type = clean_str(record_attributes[2]).strip('"')
    if not entity_type.strip() or entity_type.startswith('("'):
        logger.warning(
            f"Entity extraction error: invalid entity type in: {record_attributes}"
        )
        return None

    # Clean and validate description
    entity_description = clean_str(record_attributes[3])
    entity_description = normalize_extracted_info(entity_description)

    if not entity_description.strip():
        logger.warning(
            f"Entity extraction error: empty description for entity '{entity_name}' of type '{entity_type}'"
        )
        return None

    return dict(
        entity_name=entity_name,
        entity_type=entity_type,
        description=entity_description,
        source_id=chunk_key,
        file_path=file_path,
    )


def _handle_single_relationship_extraction(
    record_attributes: list[str],
    chunk_key: str,
    file_path: str = "unknown_source",
):
    if len(record_attributes) < 5 or record_attributes[0] != '"relationship"':
        return None
    # add this record as edge
    source = clean_str(record_attributes[1])
    target = clean_str(record_attributes[2])

    # Normalize source and target entity names
    source = normalize_extracted_info(source, is_entity=True)
    target = normalize_extracted_info(target, is_entity=True)

    edge_description = clean_str(record_attributes[3])
    edge_description = normalize_extracted_info(edge_description)

    edge_keywords = clean_str(record_attributes[4]).strip('"').strip("'")
    edge_source_id = chunk_key
    weight = (
        float(record_attributes[-1].strip('"').strip("'"))
        if is_float_regex(record_attributes[-1])
        else 1.0
    )
    return dict(
        src_id=source,
        tgt_id=target,
        weight=weight,
        description=edge_description,
        keywords=edge_keywords,
        source_id=edge_source_id,
        file_path=file_path,
    )

def _post_process_single_record(record: str, context_base: dict) -> tuple[str, list[str]]:
    """Process a single record by cleaning and extracting its contents.
    
    Args:
        record (str): The record string to process
        context_base (dict): Dictionary containing delimiter configuration
        
    Returns:
        tuple: (processed_record, record_attributes) where:
            - processed_record is the cleaned record string
            - record_attributes is a list of attributes split by delimiter
    """
    # Add parentheses if they don't exist
    if not record.startswith('('):
        record = f'({record})'
    if not record.endswith(')'):
        record = f'{record})'
        
    # Extract content between parentheses
    match = re.search(r"\((.*)\)", record)
    if match is None:
        print(f"Record extraction error: invalid record format in: {record}")
        return None, []
        
    processed_record = match.group(1)
    record_attributes = split_string_by_multi_markers(
        processed_record, 
        [context_base["tuple_delimiter"]]
    )
    
    return processed_record, record_attributes

def _process_extraction_result(
        result: str, chunk_key: str, file_path: str = "unknown_source"
    ):
        """Process a single extraction result (either initial or gleaning)
        Args:
            result (str): The extraction result to process
            chunk_key (str): The chunk key for source tracking
            file_path (str): The file path for citation
        Returns:
            tuple: (nodes_dict, edges_dict) containing the extracted entities and relationships
        """
        context_base = dict(
            tuple_delimiter=PROMPTS["DEFAULT_TUPLE_DELIMITER"],
            record_delimiter=PROMPTS["DEFAULT_RECORD_DELIMITER"],
            completion_delimiter=PROMPTS["DEFAULT_COMPLETION_DELIMITER"],
        )
        maybe_nodes = defaultdict(list)
        maybe_edges = defaultdict(list)

        records = split_string_by_multi_markers(
            result,
            [context_base["record_delimiter"], context_base["completion_delimiter"], "\n"],
        )
        return records

def _post_processing_records(all_records: list[str], chunk_key: str, file_path: str = "unknown_source"):
    """Post-process records to extract entities and relationships.
    
    This function processes the extracted records, cleaning them and extracting
    entities and relationships based on predefined rules.
    
    Returns:
        tuple: (maybe_nodes, maybe_edges) where:
            - maybe_nodes is a dictionary of extracted entities
            - maybe_edges is a dictionary of extracted relationships
    """
    context_base = dict(
        tuple_delimiter=PROMPTS["DEFAULT_TUPLE_DELIMITER"],
        record_delimiter=PROMPTS["DEFAULT_RECORD_DELIMITER"],
        completion_delimiter=PROMPTS["DEFAULT_COMPLETION_DELIMITER"],
    )
    
    maybe_nodes = defaultdict(list)

    merged_records = ""
    for record in all_records:
        processed_record, record_attributes = _post_process_single_record(record, context_base)
        if processed_record is None:
            continue
        
        if_entities = _handle_single_entity_extraction(
            record_attributes, chunk_key="unknown_chunk", file_path="unknown_source"
        )
        if if_entities is not None:
            entity_type = if_entities["entity_type"]
            entity_description = if_entities["description"]
            entity_name = if_entities["entity_name"]
            maybe_nodes[entity_type].append(if_entities)
            continue
    
    for entity_type, entities in maybe_nodes.items():
        entity_type = entity_type.strip('"')
        entity_description = ""
        for entity in entities:
            entity_description += entity["entity_name"] + ". " + entity["description"]
        
        merged_records += f"(\"entity\"{context_base['tuple_delimiter']}{entity_type}{context_base['tuple_delimiter']}{entity_description}){context_base['record_delimiter']}\n"

    client = OpenAI(
        api_key=os.getenv("OPENAI_API_KEY")
    )
    
    response = client.chat.completions.create(
        model=llm_model,
        messages=[
            {
                "role": "system",
                "content": "You are an AI trained to extract entities (meta data fields) and relationships from text."
            },
            {
                "role": "user",
                "content": CHEATSHEETS["post_processing"].format(
                    language="English",
                    tuple_delimiter=context_base["tuple_delimiter"],
                    record_delimiter=context_base["record_delimiter"],
                    completion_delimiter=context_base["record_delimiter"],
                    input_entities=merged_records,
                )
            }
        ],
        temperature=0.0,
        max_tokens=2000
    )

    result = response.choices[0].message.content

    records = split_string_by_multi_markers(
        result,
        [context_base["record_delimiter"], context_base["completion_delimiter"], "\n"],
    )

    print("----------------\nresponse.choices[0].message.content:\n", result)
    final_nodes = defaultdict(list)

    for record in records:
        print(f"Processing record: {record}")
        # Add parentheses if they don't exist
        if not record.startswith('('):
            record = f'({record})'
        if not record.endswith(')'):
            record = f'{record})'
        record = re.search(r"\((.*)\)", record)
        if record is None:
            print(
                f"Record extraction error: invalid record format in: {record}"
            )
            continue
        record = record.group(1)
        record_attributes = split_string_by_multi_markers(
            record, [context_base["tuple_delimiter"]]
        )

        if_entities = _handle_post_processed_entity_extraction(
            record_attributes, chunk_key, file_path
        )
        if if_entities is not None:
            final_nodes[if_entities["entity_name"]].append(if_entities)
            continue

    return final_nodes


# special_interest = CHEATSHEETS.get("special_interests", "Focus on metadata fields and their relationships")
# output_nodes = extract_entities(
#     text=full_text, 
#     entity_types=PROMPTS["DEFAULT_ENTITY_TYPES"], 
#     special_interest=special_interest
# )

In [None]:
from tqdm import tqdm
for url in tqdm(dataPortalURL):
    # Apply nest_asyncio to allow asyncio.run() in Jupyter
    nest_asyncio.apply()

    # Run the async function
    full_text = await extract_full_page_text(url)

    # Optionally display or save it
    special_interest = CHEATSHEETS.get("special_interests", "Focus on metadata fields and their relationships")
    output_nodes = extract_entities(
        text=full_text, 
        entity_types=PROMPTS["DEFAULT_ENTITY_TYPES"], 
        special_interest=special_interest
    )
    # Create a dictionary to store entity_type: [(entity_name, description), ...]
    entity_type_map = {}

    for entity_group in output_nodes.values():
        for item in entity_group:
            entity_name = item.get('entity_name')
            entity_value = item.get('entity_value')

            # Initialize the list for this entity_type if not already present
            if entity_name not in entity_type_map:
                entity_type_map[entity_name] = []

            # Append the (entity_name, description) pair
            entity_type_map[entity_name].append(entity_value)

    # Example: print results and save to a file with separator and URL
    # get current date
    from datetime import datetime
    now = datetime.now()
    date_str = now.strftime("%Y-%m-%d")

    output_file_path = "outputs/" + date_str + "entity_type_map.txt"

    with open(output_file_path, "a") as file:  # Use "a" to append to the file
        separator = "\n" + "=" * 50 + "\n"  # Separator for each run
        file.write(separator)
        file.write(f"Source URL: {url}\n")
        file.write(separator)
        
        for entity_type, values in entity_type_map.items():
            print(f"{entity_type}:")
            file.write(f"{entity_type}:\n")
            for value in values:
                print(f"  - {value}")
                file.write(f"  - {value}\n")


Title:
  - EOD – eBird Observation Dataset, WMR - TMAP: Ecotopenkaart, Wadden viewer, Commission Regulation (EU) No 1089/2010
Description:
  - eBird is a collective enterprise that develops cooperative partnerships among experts in various fields, aiming to increase data quantity and control for data quality issues. It is a major source of biodiversity data. Voor een goed beheer van gebieden zoals de Waddenzee is een consistent en eenduidig inzicht in de veranderingen van de diverse landschappelijke eenheden van groot belang. This data set is conformant with the INSPIRE Implementing Rules for the interoperability of spatial data sets and services.
Unique Identifier:
  - A0h06_NlSEuNlium5OO3FA, Data afkomstig van WMR tbv TMAP
Metadata language:
  - Dutch
Responsible organization metadata:
  - Wageningen Marine Research
Data contact point:
  - info@wur.nl
Resource type:
  - Dataset
Spatial coverage:
  - Worldwide, Waddenzee
Metadata date:
  - 2016-11-01, 2010-12-08
Landing page:
  - http

In [None]:
def parse_metadata_block(text: str) -> tuple[str, dict]:
    """
    Parse a metadata block and return the source URL and a dictionary of metadata fields.
    
    Args:
        text (str): Text block containing metadata information
        
    Returns:
        tuple: (source_url, metadata_dict) where metadata_dict contains field types as keys 
        and lists of (value, description) tuples as values
    """
    lines = text.split('\n')
    metadata = {}
    source_url = ""
    current_field = None
    
    for line in lines:
        line = line.strip()
        
        # Skip empty lines and separator lines
        if not line or line.startswith('==='):
            continue
            
        # Extract source URL
        if line.startswith('Source URL:'):
            source_url = line.replace('Source URL:', '').strip()
            continue
            
        # Check for field type
        if line and not line.startswith('-'):
            if line.endswith(':'):
                current_field = line[:-1]  # Remove trailing colon
                metadata[current_field] = []
            continue
            
        # Process metadata entries
        if line.startswith('- (') and current_field:
            # Extract content between parentheses
            content = line[5:-1]  # Remove "  - (" and ")"
            if ',' in content:
                # Split into value and description
                value, desc = content.split(',', 1)
                value = value.strip()
                desc = desc.strip()
                metadata[current_field].append((value, desc))
    
    return source_url, metadata

def read_metadata_file(file_path: str) -> list[tuple[str, dict]]:
    """
    Read the metadata file and parse all blocks.
    
    Args:
        file_path (str): Path to the metadata file
        
    Returns:
        list: List of (source_url, metadata_dict) tuples for each block
    """
    with open(file_path, 'r') as file:
        content = file.read()
    
    # Split content into blocks using the separator
    sep_blocks = content.split('\n==================================================\n')
    blocks = []

    print(len(sep_blocks))

    if len(sep_blocks)%2:
        for i in range(1, len(sep_blocks)-1, 2):
            block = sep_blocks[i] + "\n" + sep_blocks[i+1]
            blocks.append(block)
    else:
        raise ValueError("The file content does not have the expected format.")

    
    # Parse each non-empty block
    results = []
    for block in blocks:
        if block.strip():
            url, metadata = parse_metadata_block(block)
            if url:  # Only add blocks with a valid source URL
                results.append((url, metadata))
    
    return results

# Example usage:
file_path = "outputs/2025-05-20entity_type_map.txt"
metadata_blocks = read_metadata_file(file_path)

# Print example of first block
if metadata_blocks:
    url, metadata = metadata_blocks[0]
    print(f"Source URL: {url}")
    print("\nExample fields:")
    for field, values in list(metadata.items())[:3]:  # Show first 3 fields
        print(f"\n{field}:")
        for value, desc in values:
            print(f"  - {value}: {desc}")

In [None]:
metadata_blocks


In [6]:
# create_dill_pickle.py
import dill

class MyModel:
    def get_team(self):
        return "Awesome Hackers"

model = MyModel()

# Save with dill
with open("my_model.pkl", "wb") as f:
    dill.dump(model, f, protocol=4)

print("Pickle file created using dill: my_model.pkl")

Pickle file created using dill: my_model.pkl


In [4]:
pip install dill

Note: you may need to restart the kernel to use updated packages.
