## Connector demo
updated to EDC v. 0.7.0

based on https://github.com/jhalasinski/connector-local and https://github.com/eclipse-edc/Samples

In [None]:
import requests
import json

### Demo setup

In [None]:
# for paas:

PROVIDER_URL = "https://provider-edc-connector..."
CONSUMER_URL = "https://consumer-edc-connector..."

default_headers = {
    "Content-Type": "application/json"
    ,"X-API-KEY": "fifi-info-zaneta"
}

## for local:
LOCALHOST = "http://localhost"
CONSUMER_CONTAINER = "http://consumer-connector"
PROVIDER_CONTAINER = "http://provider-connector"
CONSUMER_URL = LOCALHOST
PROVIDER_URL = LOCALHOST

In [None]:
# PROVIDER_API = f"{PROVIDER_URL}/api"
# PROVIDER_CONTROL = f"{PROVIDER_URL}/control"
# PROVIDER_MANAGEMENT = f"{PROVIDER_URL}/management"
# PROVIDER_PROTOCOL = f"{PROVIDER_URL}/protocol"
# PROVIDER_PUBLIC = f"{PROVIDER_URL}/public"

# CONSUMER_API = f"{CONSUMER_URL}/api"
# CONSUMER_CONTROL = f"{CONSUMER_URL}/control"
# CONSUMER_MANAGEMENT = f"{CONSUMER_URL}/management"
# CONSUMER_PROTOCOL = f"{CONSUMER_URL}/protocol"
# CONSUMER_PUBLIC = f"{CONSUMER_URL}/public"

PROVIDER_API = f"{PROVIDER_URL}:19191/api"
PROVIDER_CONTROL = f"{PROVIDER_URL}:19192/control"
PROVIDER_MANAGEMENT = f"{PROVIDER_URL}:19193/management"
PROVIDER_PROTOCOL = f"{PROVIDER_URL}:19194/protocol"
PROVIDER_PUBLIC = f"{PROVIDER_URL}:19291/public"

CONSUMER_API = f"{CONSUMER_URL}:29191/api"
CONSUMER_CONTROL = f"{CONSUMER_URL}:29192/control"
CONSUMER_MANAGEMENT = f"{CONSUMER_URL}:29193/management"
CONSUMER_PROTOCOL = f"{CONSUMER_URL}:29194/protocol"
CONSUMER_PUBLIC = f"{CONSUMER_URL}:29291/public"


In [None]:
"""
The vars below are useful when working on local deployment
(or at least when no routings are specified for connectors)
Docker containers have their own localhost, which is not the host
machine's localhost.

In some requests there is a 'counterPartyAddress' which contains
localhost. This one will be solved as containers' internal localhost
but not the localhost of the host machine and connectors won't connect
to each other.

Hence we substitute the "localhost" with containers' names (if they
contain "localhost", otherwise urls remain unchanged).
If routings are specified correctly on PaaS, those lines aren't
required.

We leave those vars here anyway, just not to complicate 
any of the requests later in the demo. 
"""

provider_control_intern = PROVIDER_CONTROL.replace(LOCALHOST, PROVIDER_CONTAINER)
provider_public_intern = PROVIDER_PUBLIC.replace(LOCALHOST, PROVIDER_CONTAINER)
provider_protocol_internal = f"{PROVIDER_PROTOCOL}".replace(LOCALHOST, PROVIDER_CONTAINER)


### Conn Check

In [None]:
print(f"{PROVIDER_API}/check/health/")
rp = requests.get(f"{PROVIDER_API}/check/health/").json()
print(rp)

In [None]:
print(f"{PROVIDER_API}/check/health/")
rp = requests.get(f"{PROVIDER_API}/check/health/").json()
print(rp)

print()

print(f"{CONSUMER_API}/check/health/")
rc = requests.get(f"{CONSUMER_API}/check/health/").json()
print(rc)

print()
print("They're Alive!")

### Populating the provider

In [None]:
def register_data_plane_instance_for_provider():
    global provider_control_intern
    global provider_public_intern

    return requests.post(
            headers=default_headers,
            data=json.dumps({
                "@context": {
                    "edc": "https://w3id.org/edc/v0.0.1/ns/"
                },
                "@id": "provider-dataplane",
                "url": f"{provider_control_intern}/transfer",
                "allowedSourceTypes": [ "HttpData" ],
                "allowedDestTypes": [ "HttpProxy", "HttpData" ],
                "properties": {
                        "https://w3id.org/edc/v0.0.1/ns/publicApiUrl": f"{provider_public_intern}/"
                }
            }),
            url=f"{PROVIDER_MANAGEMENT}/v2/dataplanes"
        )

"""
better execute it once on your connector, if you don't
know what you're doing it for. I don't, but it's apparently
necessairy and overusing it may cause appearing unexpected
contract offers for exapmple.

It's not harmful at all, just has the potential to confuse the dev.
"""

register_data_plane_instance_for_provider().status_code

In [None]:
create_an_asset_on_the_provider_side = requests.post(
            headers=default_headers,
            data=json.dumps({
                "@context": {
                    "edc": "https://w3id.org/edc/v0.0.1/ns/"
                },
                "@id": "asset-workshop-demo-by-api",
                "properties": {
                    "name": "test of asset, just to have another one on board",
                    "contenttype": "application/json"
                    },
                "dataAddress": {
                    "name": "Test asset",
                    "baseUrl": "https://jsonplaceholder.typicode.com/users",
                    "type": "HttpData"
                    }
                }),
            url=f"{PROVIDER_MANAGEMENT}/v3/assets"
            )

# expected 200 or 409 (already exists) because the @id is fixed in the example
create_an_asset_on_the_provider_side.status_code

In [None]:
create_policy_on_the_provider = requests.post(
            headers=default_headers,
            data=json.dumps({
                "@context": {
                    "edc": "https://w3id.org/edc/v0.0.1/ns/",
                    "odrl": "http://www.w3.org/ns/odrl/2/"
                },
                "@id": "aPolicy",
                "policy": {
                    "@context": "http://www.w3.org/ns/odrl.jsonld",
                    "@type": "Set",
                    "odrl:permission": [],
                    "odrl:prohibition": [],
                    "odrl:obligation": []
                }
                }),
            url=f"{PROVIDER_MANAGEMENT}/v2/policydefinitions"
        )

from pprint import pprint
pprint(vars(create_policy_on_the_provider))

# expected 200 or 409 (already exists) because the @id is fixed in the example
print(create_policy_on_the_provider.status_code)

In [None]:
create_a_contract_definition_on_provider = requests.post(
            headers=default_headers,
            data=json.dumps({
                "@context": {
                    "edc": "https://w3id.org/edc/v0.0.1/ns/"
                },
                "@id": "3",
                "accessPolicyId": "aPolicy",
                "contractPolicyId": "aPolicy",
                "assetsSelector": []
                }),
            url=f"{PROVIDER_MANAGEMENT}/v2/contractdefinitions"
        )

# expected 200 or 409 (already exists) because the @id is fixed in the example
create_a_contract_definition_on_provider.status_code

### Working on consumer side

In [None]:
fetch_catalog_on_consumer_side = requests.post(
            headers=default_headers,
            data=json.dumps({
                "@context": {
                    "edc": "https://w3id.org/edc/v0.0.1/ns/"
                },
                "@type": "CatalogRequest",
                "counterPartyAddress": provider_protocol_internal,
                "protocol": "dataspace-protocol-http"
                }),
            url=f"{CONSUMER_MANAGEMENT}/v2/catalog/request"
        )

fetch_catalog_on_consumer_side.status_code

In [None]:
# get_dataplanes_or_sth = requests.get(
#     headers=default_headers,
#     url=f"{PROVIDER_MANAGEMENT}/v2/dataplanes"
# )
# get_dataplanes_or_sth.json()

In [None]:
## a quick lookup
fetch_catalog_on_consumer_side.json()

In [None]:
dcat_dataset = fetch_catalog_on_consumer_side.json()["dcat:dataset"]

if isinstance(dcat_dataset, list):
    _policy = dcat_dataset[0]["odrl:hasPolicy"]
else:
    _policy = dcat_dataset["odrl:hasPolicy"]

if isinstance(_policy, list):
    offer_id = _policy[0]["@id"]
else:
    offer_id = _policy["@id"]

print(_policy);
print(offer_id);

negociate_a_contract = requests.post(
            headers=default_headers,
            data=json.dumps({
                "@context": {
                    "edc": "https://w3id.org/edc/v0.0.1/ns/"
                },
                "@type": "ContractRequest",
                "counterPartyAddress": provider_protocol_internal,
                "protocol": "dataspace-protocol-http",
                "policy": {
                        "@context": "http://www.w3.org/ns/odrl.jsonld",
                        "@id": f"{offer_id}",
                        "@type": "Offer",
                        "assigner": "provider",
                        "target": "assetId"
                }
            }),
            url=f"{CONSUMER_MANAGEMENT}/v2/contractnegotiations"
        )
from pprint import pprint
pprint(vars(negociate_a_contract))

negociate_a_contract.status_code

In [None]:
negociate_a_contract.json()

In [None]:
contract_negotiation_id = negociate_a_contract.json()["@id"]

def wait_for_negotiation_to_reach_finalized_state():
    """
    In some cases, contract negotiation may take some time to reach "finalized" state.
    This function waits for agreement to reach "FINALIZED" state (without time limit though).
    It doesn't return anything because it's job is only to wait until the connector is ready
    for further work
    """

    from time import sleep
    while True:
        getting_contract_agreement_id = requests.get(
            headers={"Content-Type": "application/json"},
            url=f"{CONSUMER_MANAGEMENT}/v2/contractnegotiations/{contract_negotiation_id}"
        )
        print(getting_contract_agreement_id.status_code)
        try:
            JJ = getting_contract_agreement_id.json()
            if "edc:state" in JJ and JJ["edc:state"] == "FINALIZED": return
        except json.JSONDecodeError:
            pass
        sleep(1.0)

contract_negotiation_id

In [None]:
getting_contract_agreement_id = requests.get(
        headers=default_headers,
        url=f"{CONSUMER_MANAGEMENT}/v2/contractnegotiations/{contract_negotiation_id}"
    )
getting_contract_agreement_id.json()["state"]

In [None]:
def request_provider_push_transfer(contract_agreement_id: str) -> requests.Response:
    return requests.post(
            headers={'Content-Type': 'application/json'},
            data=json.dumps({
                "@context": {
                    "edc": "https://w3id.org/edc/v0.0.1/ns/"
                },
                "@type": "TransferRequestDto",
                "connectorId": "provider",
                "connectorAddress": f"{provider_protocol_internal}",
                "contractId": f"{contract_agreement_id}",
                "assetId": "assetId",
                "managedResources": False,
                "protocol": "dataspace-protocol-http",
                "transferType": "HttpData-PUSH",
                "dataDestination": { 
                    "type": "HttpData",
                    "baseUrl": f"http://consumer-backend:4000/api/consumer/store"
                }
            }),
            url=f"{CONSUMER_MANAGEMENT}/v2/transferprocesses"
        )

def request_consumer_pull_transfer(contract_agreement_id: str) -> requests.Response:
    return requests.post(
            headers=default_headers,
            data=json.dumps({
                "@context": {
                    "@vocab": "https://w3id.org/edc/v0.0.1/ns/"
                },
                "@type": "TransferRequestDto",
                "connectorId": "provider",
                "connectorAddress": f"{provider_protocol_internal}",
                "contractId": f"{contract_agreement_id}",
                "assetId": "assetId",
                "protocol": "dataspace-protocol-http",
                "transferType": "HttpData-PULL",
                "dataDestination": { 
                    "type": "HttpProxy",
                 }
            }),
            url=f"{CONSUMER_MANAGEMENT}/v2/transferprocesses"
        )


In [None]:
contract_agreement_id = getting_contract_agreement_id.json()["contractAgreementId"]

# provider_push_transfer = provider_push_transfer(contract_agreement_id)
consumer_pull_transfer = request_consumer_pull_transfer(contract_agreement_id)

In [None]:
consumer_pull_transfer.json()

In [None]:
pull_transfer_id = consumer_pull_transfer.json()["@id"]

"""
Don't be supprised if transfer remains in the "Started" state.
It may stay this way even when transfer is completed.
"""

requests.get(headers=default_headers, url=f"{CONSUMER_MANAGEMENT}/v2/transferprocesses/{pull_transfer_id}/state").json()

In [None]:
consumer_pull_transfer.json()

In [None]:
transfer_process_id = consumer_pull_transfer.json()["@id"]

check_transfer_status = requests.get(f"{CONSUMER_MANAGEMENT}/v2/transferprocesses/{transfer_process_id}", headers=default_headers)

check_transfer_status.json()