## Seismic Data Bank Metadata Ingestion to OSDU-custom schema

This notebook is intended to document a summarized version of the SDB metadata records ingestion to custom schemas in OSDU.

Note that this summary only shows how one activity (seismicsurveys) was constructed. At the moment several entities were ingested to OSDU including: seismicsurveys, seismicprojects, seismicpoststackdatasets, binsetgrids, and poststackcubes.

All these activities (Azure Functions) are run by an Azure Funtion Orchestrator once another Time Trigger Azure Fuction calls this last one.

For more details, check the official reporitory [osdu_seismic_metadata_ingestion](https://github.com/equinor/osdu_seismic_metadata_ingestion). 

In [1]:
import os
import requests

# Adding .env file variables as environment variables
from dotenv import load_dotenv
load_dotenv()

True

#### Seismic Survey Entity Activity

In [2]:
"""Let's create the activity that will ingest the SDB seismic_surveys entity data.
The process should be repeated in a similar manner for each SDB entity.
Few changes might be expected from wntity to entity."""

import pycountry

"""Utility functions to be used by main()"""

def get_country_code(country_name):
    """return country 2 digits alpha code"""
    try:
        c = pycountry.countries.search_fuzzy(country_name)
        return c[0].alpha_2
    except LookupError as err:
        return "Not Identifiable"

def get_records_comm_attributes(commit_sha, survey_name, county_of_origin, schema_kind, owners, viewers, legaltags):
    """method to populate common attribute of object"""
    property_right = "Supplier"
    if survey_name[:2] in ["ST", "EQ", "NH", "SG"]:
        property_right = "Equinor"
    record = {
        "kind": schema_kind,
        "acl": {"owners": owners, "viewers": viewers},
        "legal": {
            "legaltags": legaltags,
            "otherRelevantDataCountries": ["NO"],
            "status": "compliant",
        },
        "tags": {
            "dataID": "",
            "exportControl": "No",
            "EqnrSource": "SDB",
            "securityClassification": "Internal",
            "personalData": "Yes - limited personal data",
            "intellectualPropertyRight": f"{property_right} owned",
            "legalOwnership": property_right,
            "inside": "No",
            "businessCriticalData": "No",
            "soxCriticalData": "No",
            "countryOfOrigin": get_country_code(county_of_origin),
            "commitId": commit_sha,
        },
        "data": {},
    }
    return record

In [3]:
"""Seismic Survey Metadata Ingestion Main Function."""

import json
from libs.osdu_service.osdu_http_client import OsduHttpClient
import json

MAX_ENTRIES_PER_REQUEST = 400

def main(lastrundatetime: str) -> int:
       
    """0. [POST] SBD Access token request: Let's get an access token by
    sending a request using an offline token (refresh token).
    """
    
    access_token_uri = os.environ['ds_security_url']+'/protocol/openid-connect/token'

    body = {
        "grant_type": "refresh_token",
        "refresh_token": os.environ['sbd_refresh_token'],
        "client_id":'enterprise-search'
    }

    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
        }

    session = requests.session()
    session.headers.update({"User-Agent": "osdu/sismic_metadata"})
    response = getattr(session, "post")(access_token_uri, body, headers)

    sdb_access_token = response.json()['access_token']
    print(f"1.  iEnergy access_token response code: {response}")
    
    
    """1. [GET] SBD service - Get Seismic Survey Records from SDB API using the
    sdb_access_token we got before.
    """
    
    sdb_base_url = os.environ['sdb_metadata_url']
    surveys_relarive_url = "seismicsurveys"
    dimension = '3D'
    params = {
        'headers': {
        'Authorization': f'Bearer {sdb_access_token}',
        'Accept': '*/*'
        },
        'timeout': 10
    }
        
    if lastrundatetime is None:
        relative_path = f"{surveys_relarive_url}?$format=json&$filter=surveyDimension eq '{dimension}'&$inlinecount=allpages"
    else:
        relative_path = f"{surveys_relarive_url}?$format=json&$filter=surveyDimension eq '{dimension}' and updateDate ge datetime'{lastrundatetime}'&$inlinecount=allpages"

    surveys = []
    response = None
    while relative_path:
        session.headers.update({"User-Agent": "osdu/sismic_metadata"})
        response = getattr(session, "get")(f"{sdb_base_url}/{relative_path}", **params)
        response_json = json.loads(response.content, strict=False)
        
        surveys.extend(response_json.get('value'))
        relative_path = response_json.get('odata.nextLink', None)
        
        # break
        
    print(f"2.  {len(surveys)} {dimension} seismic surveys found in SDB edited since: {lastrundatetime}")
    
    
    """2. Iterate throught records and construct record in OSDU custom schema format."""
    
    records = []
    
    for survey in surveys:
        rec = get_records_comm_attributes(
            commit_sha=None,
            survey_name=survey['surveyName'],
            county_of_origin=survey['countryName'],
            schema_kind="eqn:iEnergy--seismicsurveys:1.0.0",
            owners="Group of users having owning rights on the data",
            viewers="Group of users having viewing rights on the data",
            legaltags="Legal taggs attached to each record"
        )
        
        osdu_data_partition_id = 'npequinor-dev'        
        rec["id"] = f"{osdu_data_partition_id}:seismicsurveys:{survey['surveyId']}"
        
        for key in survey:
            rec["data"][key] = survey.get(key)
        
        records.append(rec)
    
        """3. [PUT] Send records to OSDU in batches (if batch > 400 records)."""
    
        count = 0
        osdu_client = OsduHttpClient("npequinor-dev", client_type="public-client")
        
        if len(records) >= MAX_ENTRIES_PER_REQUEST:
            try:
                res = osdu_client.app_put_returning_json(
                    "/storage/v2/records__",
                    json.dumps(records)
                )
                count += len(records)
            except requests.exceptions.RequestException as exp:
                print("An error occurred:", exp)
    
    """3. [PUT] Send remaining record to OSDU (if batch < 400 records)."""
    if len(records)>0:
        res = None
        try:
            res = osdu_client.app_put_returning_json(
                "/storage/v2/records__",
                json.dumps(records)
            )
            count += len(records)
        except requests.exceptions.RequestException as exp:
            print(f"3.   A wrong OSDU storage endpoint was introduced on-purpose when sending {len(records)} records to OSDU.", exp)

    return records, count

records, count = main('2020-01-01T07:41:34.585')

1.  iEnergy access_token response code: <Response [200]>
2.  134 3D seismic surveys found in SDB edited since: 2020-01-01T07:41:34.585
3.   A wrong OSDU storage endpoint was introduced on-purpose when sending 134 records to OSDU. 


In [4]:
"""Let's have a look to one of the records we intend to ingest to OSDU."""

print(f"Custom schem seismic surveys record: {json.dumps(records[0], indent=4)}")

Custom schem seismic surveys record: {
    "kind": "eqn:iEnergy--seismicsurveys:1.0.0",
    "acl": {
        "owners": "Group of users having owning rights on the data",
        "viewers": "Group of users having viewing rights on the data"
    },
    "legal": {
        "legaltags": "Legal taggs attached to each record",
        "otherRelevantDataCountries": [
            "NO"
        ],
        "status": "compliant"
    },
    "tags": {
        "dataID": "",
        "exportControl": "No",
        "EqnrSource": "SDB",
        "securityClassification": "Internal",
        "personalData": "Yes - limited personal data",
        "intellectualPropertyRight": "Equinor owned",
        "legalOwnership": "Equinor",
        "inside": "No",
        "businessCriticalData": "No",
        "soxCriticalData": "No",
        "countryOfOrigin": "GB",
        "commitId": null
    },
    "data": {
        "surveyId": "1508709832",
        "s1TableId": null,
        "operator": "EQUINOR",
        "mergedSurv