# EXTRACT Joint Demonstrator WP2+WP4
## Propagate Inference Data Over Data Catalog and SkyStore - Consumer Notebook

### Set import path and import libraries

In [1]:
import sys
import os

# Adjust this path to point to the directory containing your local package
project_root = os.path.abspath(os.path.join(os.getcwd(), ".."))  # or another relative path
if project_root not in sys.path:
    sys.path.insert(0, project_root)


In [2]:
import torch
import json
import requests
import json
import numpy as np
import boto3
import io
from botocore.config import Config
from catalogue.consumer import DataConsumer
from nuvla.api import Api as Nuvla
from dotenv import load_dotenv
from pathlib import Path
from pprint import pprint

### Load environment variables from .env

In [None]:
env_path = Path('..') / '.env'
load_dotenv(dotenv_path=env_path)


INGRESS_PORT: 80


### Function for parsin notification data

In [4]:
def get_relevant_data(msg: dict) -> [str, str, str]:
    bucket = msg.get("data-object", {}).get("bucket", "")
    file_name = msg.get("data-object", {}).get("object", "")
    uri = msg.get("link", {}).get("uri", "")
    print(f"Bucket: {bucket}")
    print(f"File Name: {file_name}")
    print(f"Download Link: {uri}")
    return bucket, file_name, uri


In [5]:
# Nuvla access
NUVLA_ENDPOINT = os.getenv("NUVLA_ENDPOINT", "https://nuvla.io")
NUVLA_KEY = os.getenv("NUVLA_KEY", "")
NUVLA_KEY_SECRET = os.getenv("NUVLA_SECRET", "")

# MQTT Configuration for DMF Data Catalog
TOPIC = os.getenv("MQTT_TOPIC", "test-topic")
MQTT_BROKER: str = os.getenv("MQTT_BROKER", "91.134.104.104")
MQTT_PORT: int = int(os.getenv("MQTT_PORT", 1883))


### Connect to Nuvla and wait for DMF notification (MQTT)

In [6]:
nuvla: Nuvla = Nuvla(endpoint=NUVLA_ENDPOINT, insecure=False)
resp = nuvla.login_apikey(NUVLA_KEY, NUVLA_KEY_SECRET)
if not resp or resp.status_code != 201:
    pprint("Failed to login to Nuvla. Please check your credentials.")
    pprint(resp.content)
    exit(1)
consumer: DataConsumer = DataConsumer(
    nuvla=nuvla,
    topic=TOPIC,
    host=MQTT_BROKER,
    port=MQTT_PORT
)

consumer.listen()

msg = consumer.link_queue.get()


Connected to broker
'Data record ID: data-record/de045b5a-969b-4dce-a555-b03343632cb5'


### Load inference data based on S3 info from notification

In [7]:
# Parse notification for S3 info
bucket_name, key, download_link = get_relevant_data(msg)

# Set up S3 client for S3-Proxy #2
s3 = boto3.client(
    's3',
    endpoint_url=os.getenv("S3_ENDPOINT"),
    aws_access_key_id=os.getenv("ACCESS_KEY"),
    aws_secret_access_key=os.getenv("SECRET_KEY"),
)

# Get the object from S3
response = s3.get_object(Bucket=bucket_name, Key=key)

# Read binary stream and load with torch
buffer = io.BytesIO(response['Body'].read())
array = torch.load(buffer, map_location='cpu')  # or 'cuda' if needed


Bucket: inference-data
File Name: reduced_tronchetto_array_1751299367.pt
Download Link: http://91.134.11.99:8002/inference-data/reduced_tronchetto_array_1751299367.pt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20250630T160251Z&X-Amz-SignedHeaders=host&X-Amz-Expires=899&X-Amz-Credential=extract%2F20250630%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Signature=6b6d59db7320883d9d306253dd1c800355f59dd8136c6eab43efdc7bec5343b8


  array = torch.load(buffer, map_location='cpu')  # or 'cuda' if needed


### Prepare inference request

In [8]:
# Create request message to be sent to the predictor
message_data = {}
inputs = {}
message_data["inputs"] = []
inputs["name"]="input1"
inputs["shape"] = array.shape
inputs["datatype"]="FP32" # as the given per model expects float32
inputs["data"]=array.tolist()
message_data["inputs"].append(inputs)

### Invoke inference service with request

In [None]:
# Call predictor

service_hostname=os.getenv("SERVICE_HOSTNAME")
model_name=os.getenv("MODEL_NAME")
ingress_ip=os.getenv("INGRESS_IP")
ingress_port=os.getenv("INGRESS_PORT")
predictor_url = f"http://{ingress_ip}:{ingress_port}/v2/models/{model_name}/infer"
request_headers = {
    "Content-Type": "application/json",
    "Accept": "application/json",
    "Host": service_hostname,
}
response = requests.post(predictor_url, headers=request_headers, data=json.dumps(message_data))

INGRESS_PORT: 80


### Parse inference response and print result

In [10]:
response_message = json.loads(response.text)
output1 = np.array(response_message["outputs"][0]['data'], dtype=np.float32)
# Print result
print(output1)

[2.2801923e+08 2.2591114e+08 2.2561734e+08 2.2002538e+08 2.1993592e+08]
