In [11]:

import json
import pprint

import requests




# Testing HL Api for PUSH transfer

---

## Consts

In [12]:
data_space_provider = "http://127.0.0.1:1234"
data_space_consumer = "http://127.0.0.1:1235"
context_broker = "http://127.0.0.1:1026"
streaming_processor = "http://0.0.0.0:5000/data"

catalog_id = ""
dataservice_id = ""
agreement_id = ""
agreement_pid = ""

## Setup Catalog, Dataservice and agreements

In [13]:
payload = {
    "foaf:homepage": "My catalog in Dataspace provider",
    "dct:title": "My catalog in Dataspace provider"
}
headers = {
    "Content-Type": "application/json",
}
url = data_space_provider + "/api/v1/catalogs"
response = requests.request("POST", url, headers=headers, data=json.dumps(payload))
response_as_json = response.json()
catalog_id = response_as_json["@id"]

pprint.pprint(response.json())

{'@context': 'https://w3id.org/dspace/2024/1/context.json',
 '@id': 'urn:uuid:94edeecb-c7a0-4f68-8156-c2f6b81c223d',
 '@type': 'dcat:Catalog',
 'dcat:dataset': [],
 'dcat:keyword': '',
 'dcat:service': [],
 'dcat:theme': '',
 'dct:conformsTo': None,
 'dct:creator': None,
 'dct:description': [],
 'dct:identifier': 'urn:uuid:94edeecb-c7a0-4f68-8156-c2f6b81c223d',
 'dct:issued': '2025-01-29T10:55:06.962070',
 'dct:modified': None,
 'dct:title': 'My catalog in Dataspace provider',
 'dspace:extraFields': None,
 'dspace:participantId': None,
 'foaf:homepage': 'My catalog in Dataspace provider',
 'odrl:hasPolicy': None}


In [14]:
suscription_payload = {
    "description": "Air Quality Unit Subscription",
    "subject": {
        "entities":[
            {"idPattern":".*","type":"AirQualityUnit"}],
        "condition": {
            "attrs":["CO"]
        }
    },
    "notification": {
        "http": {
            "url":"$data_url",
        },
    },
    "expires":"2040-01-01T14:00:00.00Z",
    "throttling":5
}
payload = {
    "dcat:endpointURL": context_broker + "/v2/subscriptions",
    "dcat:endpointDescription": json.dumps(suscription_payload),
}
headers = {
    "Content-Type": "application/json",
}
url = data_space_provider + "/api/v1/catalogs/" + catalog_id + "/data-services"
response = requests.request("POST", url, headers=headers, data=json.dumps(payload))
response_as_json = response.json()
dataservice_id = response_as_json["@id"]

pprint.pprint(response.json())

{'@context': 'https://w3id.org/dspace/2024/1/context.json',
 '@id': 'urn:uuid:ca4d81f5-48fa-4309-9310-4fce6db70268',
 '@type': 'dcat:DataService',
 'dcat:endpointDescription': '{"description": "Air Quality Unit Subscription", '
                             '"subject": {"entities": [{"idPattern": ".*", '
                             '"type": "AirQualityUnit"}], "condition": '
                             '{"attrs": ["CO"]}}, "notification": {"http": '
                             '{"url": "$data_url"}}, "expires": '
                             '"2040-01-01T14:00:00.00Z", "throttling": 5}',
 'dcat:endpointURL': 'http://127.0.0.1:1026/v2/subscriptions',
 'dcat:keyword': '',
 'dcat:theme': '',
 'dct:conformsTo': None,
 'dct:creator': None,
 'dct:description': [],
 'dct:identifier': 'urn:uuid:ca4d81f5-48fa-4309-9310-4fce6db70268',
 'dct:issued': '2025-01-29T10:55:07.301643',
 'dct:modified': None,
 'dct:title': None,
 'dspace:extraFields': None,
 'odrl:hasPolicy': None}


In [15]:
payload = {
    "dataServiceId": dataservice_id
}
headers = {
    "Content-Type": "application/json",
}
url = data_space_provider + "/api/v1/agreements"
response = requests.request("POST", url, headers=headers, data=json.dumps(payload))
response_as_json = response.json()
agreement_id = response_as_json["agreement_id"]

pprint.pprint(response.json())

{'agreement_id': 'urn:uuid:f05acf43-7551-46aa-9ffb-6d5311120ced',
 'data_service_id': 'urn:uuid:ca4d81f5-48fa-4309-9310-4fce6db70268',
 'identity': None,
 'identity_token': None}


## Setup transfer

In [16]:
url = data_space_consumer + "/api/v1/setup-transfer"

payload = ""
headers = {}
response = requests.request("POST", url, headers=headers, data=payload)
callbackAddress = response.json()["callbackAddress"]
callbackId = response.json()["callbackId"]
consumerPid = response.json()["consumerPid"]

pprint.pprint(response.json())

{'callbackAddress': 'http://127.0.0.1:1235/urn:uuid:66376dbe-0c2d-4a8e-a76d-71fc1c3cfb74',
 'callbackId': 'urn:uuid:66376dbe-0c2d-4a8e-a76d-71fc1c3cfb74',
 'consumerPid': 'urn:uuid:085a2d59-523a-4a7d-84ab-4be9ae005a9c'}


## Request transfer

In [17]:
url = data_space_consumer + "/api/v1/request-transfer"

payload = json.dumps({
    "agreementId": agreement_id,
    "format": "ngsi-ld+push",
    "dataAddress": {
        "@type": "dspace:DataAddress",
        "dspace:endpoint": streaming_processor,
        "dspace:endpointType": "HTTP",
        "dspace:endpointProperties": []
    },
    "callbackAddress": callbackAddress,
    "callbackId": callbackId,
    "consumerPid": consumerPid
})
headers = {
  'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

pprint.pprint(response.json())


{'consumerPid': 'urn:uuid:085a2d59-523a-4a7d-84ab-4be9ae005a9c',
 'transferProcess': {'@context': 'https://w3id.org/dspace/2024/1/context.json',
                     '@type': 'dspace:TransferProcess',
                     'dspace:consumerPid': 'urn:uuid:085a2d59-523a-4a7d-84ab-4be9ae005a9c',
                     'dspace:providerPid': 'urn:uuid:48bf7701-897f-4179-803c-d8ab5c646163',
                     'dspace:state': 'dspace:REQUESTED'}}


#### For seeing how the push service is working, please check the logs in the streaming-testing-service

## Server

In [19]:
from flask import Flask, request, jsonify
import json
import time

app = Flask(__name__)

@app.route('/data', methods=['POST'])
def receive_data():
    data = json.loads(request.get_data())
    timestamp = int(time.time() * 1000)
    filename = f"data/fiware_stream_{timestamp}.json"
    with open(filename, 'w') as f:
        json.dump(data, f)

    return jsonify({"status": "Data received"}), 200

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:5000
 * Running on http://192.168.1.130:5000
[33mPress CTRL+C to quit[0m
127.0.0.1 - - [29/Jan/2025 11:56:06] "POST /data HTTP/1.1" 200 -
127.0.0.1 - - [29/Jan/2025 11:56:11] "POST /data HTTP/1.1" 200 -
127.0.0.1 - - [29/Jan/2025 11:56:16] "POST /data HTTP/1.1" 200 -
127.0.0.1 - - [29/Jan/2025 11:56:21] "POST /data HTTP/1.1" 200 -
127.0.0.1 - - [29/Jan/2025 11:56:26] "POST /data HTTP/1.1" 200 -
