## Connector demo
compatible with EDC v0.10.1

most examples based on the edc samples: https://github.com/eclipse-edc/Samples

In [None]:
import requests
import json

from dataspace_apis import *

### Demo setup

In [None]:

IS_LOCALHOST_DEPLOYMENT = True

# for paas:
# PROVIDER_URL = "https://ad4gd-provider-edc.dashboard-siba.store"
# CONSUMER_URL = "https://consumer-edc-connector.apps.dcw1.paas.psnc.pl"
# FEDERATED_CATALOG_BASE_URL = "https://federatedcatalog-edc-connector.apps.dcw1.paas.psnc.pl"
# CONSUMER_BACKEND_URL = "https://consback-edc-connector.apps.dcw1.paas.psnc.pl"

PROVIDER_URL = "https://provider-edc-connector.apps.paas-dev.psnc.pl"
CONSUMER_URL = "https://consumer-edc-connector.apps.paas-dev.psnc.pl"
FEDERATED_CATALOG_BASE_URL = "https://federatedcatalog-edc-connector.apps.paas-dev.psnc.pl"
CONSUMER_BACKEND_URL = "https://consback-edc-connector.apps.paas-dev.psnc.pl"

# PROVIDER_URL = "https://provider-edc-connector.apps.dcw1.paas.psnc.pl"
# CONSUMER_URL = "https://consumer-edc-connector.apps.dcw1.paas.psnc.pl"
# FEDERATED_CATALOG_BASE_URL = "https://federatedcatalog-edc-connector.apps.dcw1.paas.psnc.pl"

## for local:
LOCALHOST = "http://localhost"
CONSUMER_CONTAINER = "http://consumer-connector"
PROVIDER_CONTAINER = "http://provider-connector"
FEDERATED_CATALOG_BASE_CONTAINER = "http://federated-catalog"
CONSUMER_BACKEND_CONTAINER = "http://consumer-backend"

if (IS_LOCALHOST_DEPLOYMENT):
    CONSUMER_URL = LOCALHOST
    PROVIDER_URL = LOCALHOST
    FEDERATED_CATALOG_BASE_URL = LOCALHOST

In [None]:
PROVIDER_API = f"{PROVIDER_URL}/api"
PROVIDER_CONTROL = f"{PROVIDER_URL}/control"
PROVIDER_MANAGEMENT = f"{PROVIDER_URL}/management"
PROVIDER_PROTOCOL = f"{PROVIDER_URL}/protocol"
PROVIDER_PUBLIC = f"{PROVIDER_URL}/public"

CONSUMER_API = f"{CONSUMER_URL}/api"
CONSUMER_CONTROL = f"{CONSUMER_URL}/control"
CONSUMER_MANAGEMENT = f"{CONSUMER_URL}/management"
CONSUMER_PROTOCOL = f"{CONSUMER_URL}/protocol"
CONSUMER_PUBLIC = f"{CONSUMER_URL}/public"

CONSUMER_BACKEND_EDR = f"{CONSUMER_BACKEND_URL}/edr-endpoint"
FEDERATED_CATALOG_URL = f"{FEDERATED_CATALOG_BASE_URL}/catalog"

if (IS_LOCALHOST_DEPLOYMENT):
    PROVIDER_API = f"{PROVIDER_URL}:19191/api"
    PROVIDER_CONTROL = f"{PROVIDER_URL}:19192/control"
    PROVIDER_MANAGEMENT = f"{PROVIDER_URL}:19193/management"
    PROVIDER_PROTOCOL = f"{PROVIDER_URL}:19194/protocol"
    PROVIDER_PUBLIC = f"{PROVIDER_URL}:19291/public"

    CONSUMER_API = f"{CONSUMER_URL}:29191/api"
    CONSUMER_CONTROL = f"{CONSUMER_URL}:29192/control"
    CONSUMER_MANAGEMENT = f"{CONSUMER_URL}:29193/management"
    CONSUMER_PROTOCOL = f"{CONSUMER_URL}:29194/protocol"
    CONSUMER_PUBLIC = f"{CONSUMER_URL}:29291/public"

    CONSUMER_BACKEND_EDR = f"{CONSUMER_BACKEND_CONTAINER}:4000/edr-endpoint"
    
    FEDERATED_CATALOG_URL = f"{FEDERATED_CATALOG_BASE_URL}:9181/catalog"

In [None]:
"""
The vars below are useful when working on local deployment
(or at least when no routings are specified for connectors)
Docker containers have their own localhost, which is not the host
machine's localhost.

In some requests there is a 'counterPartyAddress' which contains
localhost. This one will be solved as containers' internal localhost
but not the localhost of the host machine and connectors won't connect
to each other.

Hence we substitute the "localhost" with containers' names (if they
contain "localhost", otherwise urls remain unchanged).
If routings are specified correctly on PaaS, those lines aren't
required.

We leave those vars here anyway, just not to complicate 
any of the requests later in the demo. 
"""

provider_control_internal = PROVIDER_CONTROL.replace(LOCALHOST, PROVIDER_CONTAINER)
provider_public_internal = PROVIDER_PUBLIC.replace(LOCALHOST, PROVIDER_CONTAINER)
provider_protocol_internal = f"{PROVIDER_PROTOCOL}".replace(LOCALHOST, PROVIDER_CONTAINER)

default_headers = {
    "Content-Type": "application/json",
    "x-api-key": "edc",
}

### Conn Check

In [None]:
def check_health(coreApiUrl):
    print(f"{coreApiUrl}/check/health/")
    rp = requests.get(f"{coreApiUrl}/check/health/", headers=default_headers).json()
    print(rp)

check_health(PROVIDER_API)
check_health(CONSUMER_API)
print("They're Alive!")

### Populating the provider

Not required to run, since connectors self-register data plane on startup. Just provided as an example.

In [None]:
def register_data_plane_instance_for_provider():
    global provider_control_internal
    global provider_public_internal

    return requests.post(
            headers=default_headers,
            data=json.dumps({
                "@context": {
                    "edc": "https://w3id.org/edc/v0.0.1/ns/"
                },
                "@id": "provider-dataplane",
                "url": f"{provider_control_internal}/transfer",
                "allowedSourceTypes": [ "HttpData" ],
                "allowedDestTypes": [ "HttpProxy", "HttpData" ],
                "properties": {
                        "https://w3id.org/edc/v0.0.1/ns/publicApiUrl": f"{provider_public_internal}/"
                }
            }),
            url=f"{PROVIDER_MANAGEMENT}/v2/dataplanes"
        )

register_data_plane_instance_for_provider().status_code

In [None]:
asset_id = "test-asset"

asset = create_asset(asset_id, PROVIDER_MANAGEMENT, default_headers)

# expected 200 or 409 (already exists) because the @id is fixed in the example
asset.status_code

In [None]:
policy_id = "test-policy"

policy = create_policy(policy_id, PROVIDER_MANAGEMENT, default_headers, permissions=[])


"""
example with rule:

allowed_policy_region = "pl"
allowed_region_rule = {
    "action": "use", 
    "constraint": { 
        "@type": "AtomicConstraint", 
        "leftOperand": "https://w3id.org/edc/v0.0.1/ns/regionLocation", 
        "operator": "odrl:eq", 
        "rightOperand": allowed_policy_region 
    }
}
policy = create_policy(policy_id, PROVIDER_MANAGEMENT, default_headers, permissions=[allowed_region_rule])
"""

# expected 200 or 409 (already exists) because the @id is fixed in the example
print(policy.status_code)

Contract definition - Links an asset with a policy

In [None]:
contract_definition_id = "test-contract-definition"

contract_definition = create_contract_definition(contract_definition_id, PROVIDER_MANAGEMENT, asset_id, policy_id, default_headers)

# expected 200 or 409 (already exists) because the @id is fixed in the example
contract_definition.status_code

In [None]:
def get_offer_id(fetched_catalog, asset_id):
    """
    Fetch for id of the offer (contract definition) in the catalog
    """

    catalogs_array = []
    if isinstance(fetched_catalog, list):
        catalogs_array = fetched_catalog
    else:
        catalogs_array = [fetched_catalog]

    offer_id = None
    for catalog in catalogs_array:
        dcat_dataset = catalog["dcat:dataset"]
        dataset_array = []
        if isinstance(dcat_dataset, list):
            dataset_array = dcat_dataset
        else:
            dataset_array = [dcat_dataset]

        for asset in dataset_array:
            if (asset["@id"] == asset_id):
                policy = asset["odrl:hasPolicy"]
                if isinstance(policy, list):
                    return policy[0]["@id"]
                else:
                    return policy["@id"]
    return offer_id

# Check if there is an existing negotiation for the asset
def check_existing_negotiation(asset_id : str):
    """
    Check if there is an existing negotiation for the asset.
    """
    response = get_contracts(CONSUMER_MANAGEMENT, default_headers)
    if response.status_code != 200:
        print("Error fetching negotiations:", response.status_code)
        return None
    contracts = response.json()
    for contract in contracts:
        if contract["assetId"] == asset_id:
            return contract["@id"]
    return None

# Check if there is an existing transfer negotiation for the given asset and contract agreement
def check_existing_transfer(asset_id : str, contract_id : str, callback_address : str):
    """
    Check if there is an existing transfer for the asset.
    """
    response = get_transfers(CONSUMER_MANAGEMENT, default_headers)
    if response.status_code != 200:
        print("Error fetching transfers:", response.status_code)
        return None
    transfers = response.json()
    for transfer in transfers:
        if transfer["assetId"] == asset_id and transfer["contractId"] == contract_id and "uri" in transfer["callbackAddresses"] and transfer["callbackAddresses"]["uri"] == callback_address:
            return transfer["@id"]
    return None

### Working on consumer side

Fetch catalog, negotiate and perform transfer

In [None]:
import time
# 1. fetch catalog
fetched_catalog = fetch_catalog(FEDERATED_CATALOG_URL, default_headers)
print(f"1. Catalog fetch: {fetched_catalog.status_code}")

# 2. check if there is an existing negotiation
existing_negotiation = check_existing_negotiation(asset_id)
contract_agreement_id = ""

if existing_negotiation is not None:
    # 2A. Use existing negotiation
    contract_agreement_id = existing_negotiation
    print(f"Negotiation for asset \"{asset_id}\" already exists")
else:
    offer_id = get_offer_id(fetched_catalog.json(), asset_id)
    print("OfferId:", offer_id)

    # 2B. Negotitate a contract
    negotiated_contract = negotiate_contract(
        offer_id, CONSUMER_MANAGEMENT, provider_protocol_internal, [], default_headers)
    # If there's policy with rules then negotiated_contract = negotiate_contract(offer_id, CONSUMER_MANAGEMENT, provider_protocol_internal, [allowed_region_rule], default_headers)

    print(f"Negotiate a contract: {negotiated_contract.status_code}")
    print(negotiated_contract.json())

    # wait a minute until the negotiation will be finalized (automatic interval process takes maximum 60 secondes to load new contracts)
    print("... Waiting for negotiation to save ...")
    time.sleep(60)

    contract_negotiation_id = negotiated_contract.json()["@id"]
    contract_agreement = get_contract_agreement_id(contract_negotiation_id, CONSUMER_MANAGEMENT, default_headers)
    print(contract_agreement.json())

    if ("contractAgreementId" in contract_agreement.json()):
        contract_agreement_id = contract_agreement.json()["contractAgreementId"]

print("[@AgreementId]:", contract_agreement_id)

# Check for existing transfer for given asset and agreement
existing_transfer = check_existing_transfer(asset_id, contract_agreement_id, CONSUMER_BACKEND_EDR)

# 3. transfer asset
if existing_transfer is not None:
    print(f"Requested transfer already exists: {existing_transfer}")
    requested_transfer = get_transfer(existing_transfer, CONSUMER_MANAGEMENT, default_headers)
else:
    requested_transfer = request_consumer_pull_transfer(
        "provider",
        CONSUMER_MANAGEMENT,
        CONSUMER_BACKEND_EDR,
        provider_protocol_internal,
        contract_agreement_id,
        default_headers
    )

print(f"3. Transfer an asset: {requested_transfer.status_code}")
print(requested_transfer.json())

## Transfer with query params to PSNC MinIO

In [None]:
import time

# 1. fetch catalog
fetched_catalog = fetch_catalog(FEDERATED_CATALOG_URL, default_headers)
print(f"1. Catalog fetch: {fetched_catalog.status_code}")

# 2. check if there is an existing negotiation
existing_negotiation = check_existing_negotiation(asset_id)
print(f"Negotiation fo asset \"{asset_id}\"already exists")
contract_agreement_id = ""

if existing_negotiation is not None:
    # 2A. Use existing negotiation
    contract_agreement_id = existing_negotiation
else:
    offer_id = get_offer_id(fetched_catalog.json(), asset_id)
    print(offer_id)

    # 2. negotiate a contract
    negotiated_contract = negotiate_contract(
        offer_id, CONSUMER_MANAGEMENT, provider_protocol_internal, [], default_headers)
    # If there's policy with rules then negotiated_contract = negotiate_contract(offer_id, CONSUMER_MANAGEMENT, provider_protocol_internal, [allowed_region_rule], default_headers)

    print(f"2. Negotiate a contract: {negotiated_contract.status_code}")
    print(negotiated_contract.json())

    # wait a minute until the negotiation will be finalized (automatic interval process takes maximum 60 secondes to load new contracts)
    print("... Waiting for negotiation to save ...")
    time.sleep(60)

    contract_negotiation_id = negotiated_contract.json()["@id"]
    contract_agreement = get_contract_agreement_id(contract_negotiation_id, CONSUMER_MANAGEMENT, default_headers)
    print(contract_agreement.json())

    if ("contractAgreementId" in contract_agreement.json()):
        contract_agreement_id = contract_agreement.json()["contractAgreementId"]

print("[@ContractAgreementId]:", contract_agreement_id)

# 3. transfer asset (this will save the asset to PSNC MinIO)
query_params = "?url=false"
consumer_backend_url_with_query_params = CONSUMER_BACKEND_EDR + query_params

# Check for existing transfer for given asset and agreement
existing_transfer = check_existing_transfer(asset_id, contract_agreement_id, consumer_backend_url_with_query_params)

# 3. transfer asset
if existing_transfer is not None:
    print(f"Requested transfer already exists: {existing_transfer}")
    requested_transfer = get_transfer(existing_transfer, CONSUMER_MANAGEMENT, default_headers)
else:
    requested_transfer = request_consumer_pull_transfer(
        "provider",
        CONSUMER_MANAGEMENT,
        consumer_backend_url_with_query_params,
        provider_protocol_internal,
        contract_agreement_id,
        default_headers
    )

print(f"3. Transfer an asset: {requested_transfer.status_code}")
print(requested_transfer.json())

## Local transfer

In [None]:
# 1. fetch catalog
fetched_catalog = fetch_catalog(FEDERATED_CATALOG_URL, default_headers)
print(f"1. Catalog fetch: {fetched_catalog.status_code}")

# 2. check if there is an existing negotiation
existing_negotiation = check_existing_negotiation(asset_id)
print(f"Negotiation for asset \"{asset_id}\": {existing_negotiation}")
contract_agreement_id = ""

if existing_negotiation is not None:
    # 2A. Use existing negotiation
    contract_agreement_id = existing_negotiation
else:
    offer_id = get_offer_id(fetched_catalog.json(), asset_id)
    print(offer_id)
    # 2. negotiate a contract
    negotiated_contract = negotiate_contract(
        offer_id, CONSUMER_MANAGEMENT, provider_protocol_internal, [], default_headers)
    # If there's policy with rules then negotiated_contract = negotiate_contract(offer_id, CONSUMER_MANAGEMENT, provider_protocol_internal, [allowed_region_rule], default_headers)

    print(f"2. Negotiate a contract: {negotiated_contract.status_code}")
    print(negotiated_contract.json())

    # wait a minute until the negotiation will be finalized (automatic interval process takes maximum 60 secondes to load new contracts)
    print("... Waiting for negotiation to save ...")
    time.sleep(60)

    contract_negotiation_id = negotiated_contract.json()["@id"]
    contract_agreement = get_contract_agreement_id(contract_negotiation_id, CONSUMER_MANAGEMENT, default_headers)
    print(contract_agreement.json())

    if ("contractAgreementId" in contract_agreement.json()):
        contract_agreement_id = contract_agreement.json()["contractAgreementId"]

print("[@ContractAgreementId]:", contract_agreement_id)

# 3. transfer asset (this will save the asset to PSNC MinIO)
query_params = "?url=false"
consumer_backend_url_with_query_params = CONSUMER_BACKEND_EDR + query_params

# Check for existing transfer for given asset and agreement
existing_transfer = check_existing_transfer(asset_id, contract_agreement_id, consumer_backend_url_with_query_params)

# 3. transfer asset
if existing_transfer is not None:
    print(f"Requested transfer already exists: {existing_transfer}")
    requested_transfer = get_transfer(existing_transfer, CONSUMER_MANAGEMENT, default_headers)
else:
    requested_transfer = request_consumer_pull_transfer(
        "provider",
        CONSUMER_MANAGEMENT,
        consumer_backend_url_with_query_params,
        provider_protocol_internal,
        contract_agreement_id,
        default_headers
    )

print("... Waiting for transfer to save ...")
time.sleep(10)

print(f"3. Transfer an asset: {requested_transfer.status_code}")
print(requested_transfer.json())

print("4. Get credentials")
print(f"{CONSUMER_MANAGEMENT}/v3/edrs/{requested_transfer.json()["@id"]}/dataaddress")
transfer_credentials = get_transfer_data_credentials(CONSUMER_MANAGEMENT, requested_transfer.json()["@id"], default_headers)

proxy_path = "formats/html"
query_params = "url=true"
public = f"{transfer_credentials['endpoint']}/{proxy_path}?{query_params}"

if IS_LOCALHOST_DEPLOYMENT:
    public = public.replace(PROVIDER_CONTAINER, LOCALHOST)
    
data = get_data_locally(public, transfer_credentials['authorization'])
if (data.text):
    print(data.text)
else:
    print(data.content)
    