Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# use python3.12 or above
azure-ai-ml==1.30.0
azure-identity==1.25.1
azureml-mlflow==1.60.0.post1
huggingface-hub==1.1.5
matplotlib==3.10.7
mlflow==2.22.2
ipykernel
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import os
import json
import subprocess
import pandas as pd
from tempfile import TemporaryDirectory
from azure.ai.ml import MLClient
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes


def register_dataset(ml_client: MLClient, dataset_name: str, file_path: str):
"""Register a dataset in Azure ML."""
data_asset = Data(
name=dataset_name,
path=file_path,
type=AssetTypes.URI_FILE,
description="FinQA dataset",
tags={"source": "https://github.com/czyssrs/FinQA"},
version="1",
)

registered_data = ml_client.data.create_or_update(data_asset)
print(f"Registered dataset {registered_data.name}.")
return registered_data


def download_finqa_dataset(src: str, target_dir: str = "data/raw"):
"""Prepare the FinQA dataset for training and evaluation."""
with TemporaryDirectory() as tmpdir:
print(f"Cloning raw FinQA dataset to {tmpdir} ...")
subprocess.run(["git", "clone", src, tmpdir], check=True)
os.makedirs(target_dir, exist_ok=True)
print("Converting FinQA dataset to jsonl format ...")
dataset_dir = os.path.join(tmpdir, "dataset")
filenames = ["train.json", "dev.json", "test.json"]
for filename in filenames:
target_file_name = filename.split(".")[0] + ".jsonl"
convert_to_jsonl(
current_path=os.path.join(dataset_dir, filename),
target_path=os.path.join(target_dir, target_file_name),
)


def convert_to_jsonl(current_path: str, target_path: str):
"""Convert FinQA dataset file to jsonl format."""
with open(current_path, "r") as rf, open(target_path, "w") as wf:
lines = json.loads(rf.read())
for item in lines:
wf.write(json.dumps(item) + "\n")
print(f"Converted {current_path} to {target_path}.")


def prepare_finqa_dataset(
ml_client: MLClient, data_dir: str = "data", register_datasets: bool = False
) -> tuple[str, str, str]:
"""Prepare the FinQA dataset for training and evaluation."""
# VERL finetuning relies on acceptable data sources for reward modeling and evaluation
data_source = "openai/gsm8k"

# download and convert dataset
raw_data_dir = os.path.join(data_dir, "raw")
FINQA_GIT_REPO = "https://github.com/czyssrs/FinQA"
download_finqa_dataset(src=FINQA_GIT_REPO, target_dir=raw_data_dir)
train_dataset_path = os.path.join(raw_data_dir, "train.jsonl")
test_dataset_path = os.path.join(raw_data_dir, "test.jsonl")
valid_dataset_path = os.path.join(raw_data_dir, "dev.jsonl")

def format_list_to_string(data_list: list):
"""Convert list to string with newline separation"""
if not data_list:
return ""
if isinstance(data_list, str):
return data_list
return "\n".join(str(item) for item in data_list)

def format_table(table_list: list):
"""Format table data as string"""
if not table_list:
return ""
table_str = "\nTable:\n"
for row in table_list:
if isinstance(row, list):
table_str += " | ".join(str(cell) for cell in row) + "\n"
else:
table_str += str(row) + "\n"
return table_str

def map_fn(example: pd.Series, idx: int, split: str):
"""Map function to transform each example into desired format."""
pre_instruction = "Please answer the following financial question based on the context provided."
post_instruction = (
'Let\'s think step by step and output the final answer after "####".'
)
qa = example.get("qa", {})
question = qa.get("question", "")
answer = qa.get("answer", qa.get("exe_ans", ""))
gold_evidence = "\n".join(qa.get("gold_inds", {}).values())
pre_text = format_list_to_string(example.get("pre_text", []))
post_text = format_list_to_string(example.get("post_text", []))
table = format_table(example.get("table", [])).strip()
# Build prompt content according to specified schema
prompt_content = "\n\n".join(
[
pre_instruction,
"Context: " + pre_text,
gold_evidence,
post_text,
table,
"Question: " + question,
post_instruction,
]
)
data = {
"data_source": data_source,
"prompt": [
{
"role": "user",
"content": prompt_content,
}
],
"ability": "financial_reasoning",
"reward_model": {"style": "rule", "ground_truth": answer},
"extra_info": {
"index": idx,
"answer": answer,
"question": question,
"split": split,
},
}
return data

# load datasets
train_dataset = pd.read_json(train_dataset_path, lines=True)
test_dataset = pd.read_json(test_dataset_path, lines=True)
valid_dataset = pd.read_json(valid_dataset_path, lines=True)

# map datasets
train_dataset = train_dataset.apply(
lambda x: map_fn(x, x.name, split="train"), axis=1
)
test_dataset = test_dataset.apply(lambda x: map_fn(x, x.name, split="test"), axis=1)
valid_dataset = valid_dataset.apply(
lambda x: map_fn(x, x.name, split="valid"), axis=1
)

# save locally as jsonl
train_dataset_path = os.path.join(data_dir, "train.jsonl")
test_dataset_path = os.path.join(data_dir, "test.jsonl")
valid_dataset_path = os.path.join(data_dir, "valid.jsonl")
train_dataset.to_json(train_dataset_path, orient="records", lines=True)
test_dataset.to_json(test_dataset_path, orient="records", lines=True)
valid_dataset.to_json(valid_dataset_path, orient="records", lines=True)

# register datasets
if register_datasets:
train_data = register_dataset(ml_client, "finqa_train", train_dataset_path)
test_data = register_dataset(ml_client, "finqa_test", test_dataset_path)
valid_data = register_dataset(ml_client, "finqa_valid", valid_dataset_path)
if (
(train_data and train_data.id)
and (test_data and test_data.id)
and (valid_data and valid_data.id)
):
return train_data.id, test_data.id, valid_data.id

return train_dataset_path, test_dataset_path, valid_dataset_path
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
import uuid
import requests
from typing import Optional
from azure.ai.ml import MLClient
from azure.ai.ml.entities import (
EndpointAuthKeys,
ManagedOnlineEndpoint,
ManagedOnlineDeployment,
KubernetesOnlineEndpoint,
KubernetesOnlineDeployment,
ProbeSettings,
OnlineRequestSettings,
)


def get_default_probe_settings() -> ProbeSettings:
"""Get default probe settings for deployments."""
return ProbeSettings( # Probes are APIs exposed by the deployment which informs the frameworktraffic
initial_delay=1400, # if the deployment is healthy and ready to receive
period=30,
timeout=2,
success_threshold=1,
failure_threshold=30,
)


def get_default_request_settings() -> OnlineRequestSettings:
"""Get default request settings for deployments."""
return OnlineRequestSettings( # Online request setting which controls timeout and concurrent request per instance
request_timeout_ms=90000,
max_concurrent_requests_per_instance=4,
)


def create_managed_deployment(
ml_client: MLClient,
model_asset_id: str, # Asset ID of the model to deploy
instance_type: str, # Supported instance type for managed deployment
environment_asset_id: Optional[str] = None, # Asset ID of the serving engine to use
endpoint_name: Optional[str] = None,
endpoint_description: str = "Sample endpoint",
endpoint_tags: dict = {},
deployment_name: Optional[str] = None,
deployment_env_vars: dict = {},
) -> str:
"""Create a managed deployment."""
guid = str(uuid.uuid4())[:8] # Unique suffix to avoid name collisions
endpoint_name = endpoint_name or f"rl-endpoint"
endpoint_name = f"{endpoint_name}-{guid}" # Unique names prevent collisions and allow parallel experiments
deployment_name = deployment_name or "default"

endpoint = ManagedOnlineEndpoint( # Use AzureML endpoint abstraction for traffic management and auth
name=endpoint_name,
auth_mode="key",
description=endpoint_description,
tags=endpoint_tags,
)

print(f"Creating endpoint: {endpoint_name}")
ml_client.online_endpoints.begin_create_or_update(
endpoint
).wait() # Using there the endpoint object to trigger actual endpoint in AML workspace.

deployment = ManagedOnlineDeployment( # Use deployment abstraction for scaling, versioning, and isolation
name=deployment_name,
endpoint_name=endpoint_name,
model=model_asset_id,
instance_type=instance_type,
instance_count=1,
environment=environment_asset_id,
environment_variables=deployment_env_vars,
liveness_probe=get_default_probe_settings(),
readiness_probe=get_default_probe_settings(),
request_settings=get_default_request_settings(),
)

print(f"Creating deployment (15-20 min)...") #
ml_client.online_deployments.begin_create_or_update(deployment).wait()

# Route all traffic to new deployment for immediate use
endpoint.traffic = {deployment_name: 100}
ml_client.online_endpoints.begin_create_or_update(endpoint).result()

print(f"Endpoint ready: {endpoint_name}")

return endpoint_name


def create_kubernetes_deployment(
ml_client: MLClient,
model_asset_id: str, # Asset ID of the model to deploy
environment_asset_id: str, # Asset ID of the serving engine to use
instance_type: str, # Kubernetes supports partial node usage granular upto the GPU level
compute_name: str, # Name of the compute which will be use for endpoint creation
endpoint_name: Optional[str] = None,
endpoint_description: str = "Sample endpoint",
endpoint_tags: dict = {},
deployment_name: Optional[str] = None,
deployment_env_vars: dict = {},
model_mount_path: str = "/var/model-mount",
) -> str:
"""Create endpoint using Kubernetes."""

print("🌐 Creating endpoint...")

guid = str(uuid.uuid4())[:8] # Unique suffix to avoid name collisions
endpoint_name = endpoint_name or f"rl-endpoint"
endpoint_name = f"{endpoint_name}-{guid}" # Unique names prevent collisions and allow parallel experiments
deployment_name = deployment_name or "default"

endpoint = KubernetesOnlineEndpoint( # Use AzureML endpoint abstraction for traffic management and auth
name=endpoint_name,
auth_mode="key",
compute=compute_name,
description=endpoint_description,
tags=endpoint_tags,
)

print(f"Creating endpoint: {endpoint_name}")
ml_client.online_endpoints.begin_create_or_update(
endpoint
).wait() # Using there the endpoint object to trigger actual endpoint in AML workspace.

deployment = KubernetesOnlineDeployment( # Use deployment abstraction for scaling, versioning, and isolation
name=deployment_name,
endpoint_name=endpoint_name,
model=model_asset_id,
model_mount_path=model_mount_path,
instance_type=instance_type,
instance_count=1,
environment=environment_asset_id,
environment_variables=deployment_env_vars,
liveness_probe=get_default_probe_settings(),
readiness_probe=get_default_probe_settings(),
request_settings=get_default_request_settings(),
)

print(f"Creating deployment (15-20 min)...") #
ml_client.online_deployments.begin_create_or_update(deployment).wait()

# Route all traffic to new deployment for immediate use
endpoint.traffic = {deployment_name: 100}
ml_client.online_endpoints.begin_create_or_update(endpoint).result()

print(f"Endpoint ready: {endpoint_name}")

return endpoint_name


def test_deployment(ml_client, endpoint_name):
"""Run a test request against a deployed endpoint and print the result."""
print("Testing endpoint...")
# Retrieve endpoint URI and API key to authenticate test request
scoring_uri = ml_client.online_endpoints.get(endpoint_name).scoring_uri
if not scoring_uri:
raise ValueError("Scoring URI not found for endpoint.")

api_keys = ml_client.online_endpoints.get_keys(endpoint_name)
if not isinstance(api_keys, EndpointAuthKeys) or not api_keys.primary_key:
raise ValueError("API key not found for endpoint.")

# Use a realistic financial question to verify model reasoning and output format
payload = {
"messages": [
{
"role": "user",
"content": """Please answer the following financial question:

Context: A company has revenue of $1,000,000 and expenses of $750,000.

Question: What is the profit margin as a percentage?
Let's think step by step and put final answer after ####.""",
}
],
"max_tokens": 512,
"temperature": 0.7,
}

# Set headers for JSON content and bearer authentication
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_keys.primary_key}",
}

response = requests.post(scoring_uri, json=payload, headers=headers)

if response.status_code == 200:
result = response.json()
# Extract the model response
if "choices" in result and len(result["choices"]) > 0:
answer = result["choices"][0]["message"]["content"]
print(f"Response received")
print(f"\n{'='*60}")
print(answer)
print(f"{'='*60}\n")
return result
else:
print(f" ✗ Error: {response.status_code}")
print(f" {response.text}")
return None
Loading
Loading