In [1]:
import requests
import json

# 定义请求URL
url = "http://localhost:1026/v2/subscriptions"

# 定义请求头
headers = {
    "Content-Type": "application/json",
    "fiware-service": "room_service",
    "fiware-servicepath": "/"
}

# 定义请求体数据
data = {
    "description": "Test subscription",
    "subject": {
        "entities": [
            {
                "idPattern": ".*",
                "type": "Room"
            }
        ],
        "condition": {
            "attrs": ["temperature"]
        }
    },
    "notification": {
        "http": {
            "url": "http://quantumleap:8668/v2/notify"
        },
        "attrs": ["temperature"],
        "metadata": ["dateCreated", "dateModified"]
    },
    "throttling": 5
}

# 发送POST请求
try:
    response = requests.post(url, headers=headers, json=data)
    
    # 打印响应状态码
    print(f"Status Code: {response.status_code}")
    
    # 打印响应头
    print(f"Response Headers: {dict(response.headers)}")
    
    # 打印响应内容
    if response.text:
        print(f"Response Content: {response.text}")
    else:
        print("No response content")
        
    # 检查请求是否成功
    if response.status_code == 201:
        print("✅ 订阅创建成功!")
    else:
        print(f"❌ 请求失败，状态码: {response.status_code}")
        
except requests.exceptions.RequestException as e:
    print(f"❌ 请求异常: {e}")
except Exception as e:
    print(f"❌ 发生错误: {e}")

Status Code: 201
Response Headers: {'Connection': 'Keep-Alive', 'Content-Length': '0', 'Location': '/v2/subscriptions/68a48ccf243167701250296c', 'Fiware-Correlator': '6b3eee0c-7d0a-11f0-9645-0242ac140006', 'Date': 'Tue, 19 Aug 2025 14:40:15 GMT'}
No response content
✅ 订阅创建成功!


In [3]:
import requests
import json

# 定义请求URL - 创建实体
url = "http://localhost:1026/v2/entities"

# 定义请求头
headers = {
    "Content-Type": "application/json",
    "fiware-service": "room_service",
    "fiware-servicepath": "/"
}

# 定义请求体数据 - 房间实体
data = {
    "id": "urn:ngsi-ld:Room:002",
    "type": "Room",
    "temperature": {
        "type": "Number",
        "value": 26.5
    }
}

# 发送POST请求
try:
    response = requests.post(url, headers=headers, json=data)
    
    # 打印响应状态码
    print(f"Status Code: {response.status_code}")
    
    # 打印响应头
    print(f"Response Headers: {dict(response.headers)}")
    
    # 打印响应内容
    if response.text:
        print(f"Response Content: {response.text}")
    else:
        print("No response content")
        
    # 检查请求是否成功
    if response.status_code == 201:
        print("✅ 房间实体创建成功!")
    elif response.status_code == 422:
        print("⚠️ 实体可能已存在")
    else:
        print(f"❌ 请求失败，状态码: {response.status_code}")
        
except requests.exceptions.RequestException as e:
    print(f"❌ 请求异常: {e}")
except Exception as e:
    print(f"❌ 发生错误: {e}")

Status Code: 201
Response Headers: {'Connection': 'Keep-Alive', 'Content-Length': '0', 'Location': '/v2/entities/urn:ngsi-ld:Room:002?type=Room', 'Fiware-Correlator': 'c04f1840-7d0a-11f0-b249-0242ac140006', 'Date': 'Tue, 19 Aug 2025 14:42:37 GMT'}
No response content
✅ 房间实体创建成功!
