In [1]:
import os
import sys
from datetime import datetime
import pandas as pd
from typing import List, Dict, Any, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed

import httpx
from loguru import logger

In [2]:
BASE_NOMAD_URL = "http://nomad-lab.eu/prod/v1/api/v1"
OUTPUT_DIR = "data/nomad"

In [3]:
def test_nomad_connection(base_url: str ="http://nomad-lab.eu/prod/v1/api/v1") -> bool:
    """ Test connection to the NOMAD API.

    Parameters:
    -----------
    base_url : str
        The base URL of the NOMAD API. Default is "http://nomad-lab.eu/prod/v1/api/v1".

    Returns:
    --------
    bool
        True if the connection is successful, False otherwise.
    """
    logger.info("Testing connection to NOMAD API...")
    try:
        r = httpx.get(f"{base_url}/entries", timeout=5)
        if r.status_code == 200:
            logger.success("Connected to NOMAD API successfully !")
            return True
    except httpx.RequestException:
        logger.error("Failed to connect to NOMAD API.")
        return False

test_nomad_connection()

[32m2025-10-27 16:47:40.427[0m | [1mINFO    [0m | [36m__main__[0m:[36mtest_nomad_connection[0m:[36m14[0m - [1mTesting connection to NOMAD API...[0m
[32m2025-10-27 16:47:42.026[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mtest_nomad_connection[0m:[36m18[0m - [32m[1mConnected to NOMAD API successfully ![0m


True

In [4]:
def fetch_entries_md_related() -> Tuple[List[Dict[str, Any]], str]:
    """
    Fetch all Molecular Dynamics (MD)-related entries from the NOMAD API.

    Returns
    -------
    Tuple[List[Dict[str, Any]], str]:
        - A list of entries related to Molecular Dynamics workflows (JSON objects).
        Returns an empty list if the request fails.
        - The current timestamp in ISO 8601 format (e.g., '2023-03-05T22:01:12').

    """
    logger.info("Fetching Molecular Dynamics related entries from NOMAD API...")
    # Current timestamp in ISO format
    fetch_time: str = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")

    try:
        # Build the request URL with a query filtering for 'MolecularDynamics' workflow
        url = (
            f"{BASE_NOMAD_URL}/entries/export"
            "?owner=public"
            "&json_query=%7B%22results.method.workflow_name%22%3A%22MolecularDynamics%22%7D"
        )

        # Perform the HTTP GET request with a long timeout to accommodate large data (usually take less than 3 minutes)
        response = httpx.get(url, timeout=1000)
        response.raise_for_status()

        # Parse JSON data
        entries_md = response.json()
        logger.success(f"Fetched {len(entries_md)} MD-related entries from NOMAD successfully !")
        return entries_md, fetch_time
    
    except httpx.HTTPError as e:
        logger.error(f"HTTP error occurred: {e}")
        return [], fetch_time
   

#nomad_data, fetch_time = fetch_entries_md_related()

In [None]:
source = "NOMAD"
source_id = f"https://nomad-lab.eu/prod/v1/gui/search/entries?entry_id={nomad_data[0]['entry_id']}"
doi = nomad_data[0]['references'][1]
title = nomad_data[0]['datasets'][0]['dataset_name']
date_creation = nomad_data[0]['datasets'][0]['dataset_create_time']
date_last_modification = nomad_data[0]['datasets'][0]['dataset_modified_time']
nb_files = len(nomad_data[0]['files'])
file_names = nomad_data[0]['files']
authors = [author_info['name'] for author_info in nomad_data[0]['authors']]
license = nomad_data[0]['license']
description = nomad_data[0]['comment']
file_analysises = nomad_data[0]['results']

In [5]:
def parse_entry_metadata(data: Dict[str, Any], fetch_time: str) -> Dict[str, Any]:
    """
    Parse relevant metadata fields from a single NOMAD entry JSON.

    Parameters
    ----------
    data : Dict[str, Any]
        JSON response for a single NOMAD entry.
    fetch_time : str
        Timestamp when the data was fetched.

    Returns
    -------
    Dict[str, Any]
        Flattened metadata dictionary for one entry.
    """
    entry_id = data.get("entry_id")
    dataset = data.get("datasets", [{}])[0]

    return {
        "source": "NOMAD",
        "source_id": f"https://nomad-lab.eu/prod/v1/gui/search/entries?entry_id={entry_id}",
        "doi": data.get("references"),
        "title": dataset.get("dataset_name"),
        "date_creation": dataset.get("dataset_create_time"),
        "date_last_modification": dataset.get("dataset_modified_time"),
        "date_last_crawled": fetch_time,
        "nb_files": len(data.get("files", [])),
        "file_names": data.get("files", []),
        "authors": [a.get("name") for a in data.get("authors", [])],
        "license": data.get("license"),
        "description": data.get("comment"),
        "file_analyses": data.get("results"),
    }


#dict = parse_entry_metadata(nomad_data[0], fetch_time)

In [6]:
def parse_nomad_dataset_parallel(nomad_data: List[Dict[str, Any]], fetch_time: str, max_workers: int = 8) -> pd.DataFrame:
    """
    Parse all NOMAD entries in parallel and return a combined DataFrame.

    Parameters
    ----------
    nomad_data : List[Dict[str, Any]]
        List of NOMAD entry JSON objects.
    fetch_time : str
        Timestamp when data was fetched.
    max_workers : int, optional
        Maximum number of threads to use for parallel parsing (default is 8).

    Returns
    -------
    pd.DataFrame
        DataFrame containing parsed metadata for all entries.
    """
    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(parse_entry_metadata, entry, fetch_time): entry for entry in nomad_data}
        for future in as_completed(futures):
            try:
                results.append(future.result())
            except Exception as e:
                print(f"Error parsing entry: {e}")

    return pd.DataFrame(results)


def save_nomad_metadata(df: pd.DataFrame, output_dir: str = "data/nomad", filename: str = "nomad_metadata.parquet") -> str:
    """
    Save parsed NOMAD metadata DataFrame to a local file.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame containing parsed NOMAD metadata.
    output_dir : str, optional
        Directory to store the output file (default is 'data/nomad').
    filename : str, optional
        Output filename (default is 'nomad_metadata.parquet').

    Returns
    -------
    str
        Path to the saved file.
    """
    os.makedirs(output_dir, exist_ok=True)
    output_path = os.path.join(output_dir, filename)
    df.to_parquet(output_path, index=False)
    print(f"✅ NOMAD metadata saved to: {output_path}")
    return output_path

In [None]:
source = "NOMAD"
source_id = f"https://nomad-lab.eu/prod/v1/gui/search/entries?entry_id={nomad_data[0]['entry_id']}"
doi = nomad_data[0]['references'][1]
title = nomad_data[0]['datasets'][0]['dataset_name']
date_creation = nomad_data[0]['datasets'][0]['dataset_create_time']
date_last_modification = nomad_data[0]['datasets'][0]['dataset_modified_time']
nb_files = len(nomad_data[0]['files'])
file_names = nomad_data[0]['files']
authors = [author_info['name'] for author_info in nomad_data[0]['authors']]
license = nomad_data[0]['license']
description = nomad_data[0]['comment']
file_analysises = nomad_data[0]['results']


In [7]:
def scrap_nomad_data():
    """ Scrap molecular dynamics datasets and files from NOMAD """
    logger.info("Starting Nomad data scraping...")

    if test_nomad_connection:
        # Define output directory
        output_dir = os.path.join("data", "nomad")
        os.makedirs(output_dir, exist_ok=True)

        # Fetch NOMAD entries metadata
        nomad_data, fetch_time = fetch_entries_md_related()
        if nomad_data == []:
            logger.warning("No data fetched from NOMAD.")
            return
        # Parse NOMAD entries metadata in parallel
        nomad_metadata_df = parse_nomad_dataset_parallel(nomad_data, fetch_time)

        # Save parsed metadata to local file
        save_nomad_metadata(nomad_metadata_df, output_dir=output_dir)
    
        logger.success(f"Scrapped NOMAD data successfully and saved to {output_dir} !")

    else:
        logger.error("Cannot scrap data, no connection to NOMAD API.")
        sys.exit()

scrap_nomad_data()

[32m2025-10-27 16:48:02.526[0m | [1mINFO    [0m | [36m__main__[0m:[36mscrap_nomad_data[0m:[36m3[0m - [1mStarting Nomad data scraping...[0m
[32m2025-10-27 16:48:02.530[0m | [1mINFO    [0m | [36m__main__[0m:[36mfetch_entries_md_related[0m:[36m13[0m - [1mFetching Molecular Dynamics related entries from NOMAD API...[0m
[32m2025-10-27 16:50:03.353[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mfetch_entries_md_related[0m:[36m31[0m - [32m[1mFetched 15934 MD-related entries from NOMAD successfully ![0m


Error parsing entry: list index out of range
Error parsing entry: list index out of range
Error parsing entry: list index out of range
Error parsing entry: list index out of range
Error parsing entry: list index out of range
Error parsing entry: list index out of range
Error parsing entry: list index out of range
Error parsing entry: list index out of range
Error parsing entry: list index out of range
Error parsing entry: list index out of range
Error parsing entry: list index out of range
Error parsing entry: list index out of range
Error parsing entry: list index out of range
Error parsing entry: list index out of range
Error parsing entry: list index out of range
Error parsing entry: list index out of range
Error parsing entry: list index out of range
Error parsing entry: list index out of range
Error parsing entry: list index out of range
Error parsing entry: list index out of range
Error parsing entry: list index out of range
Error parsing entry: list index out of range
Error pars

[32m2025-10-27 16:50:33.295[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mscrap_nomad_data[0m:[36m21[0m - [32m[1mScrapped NOMAD data successfully and saved to data/nomad ![0m


✅ NOMAD metadata saved to: data/nomad/nomad_metadata.parquet
