In [84]:
import requests
import json

KEYS_TO_EXTRACT = ['creationTimeStamp', 'id', 'name', 'publishName', 'publishType']

def read_json(file_path):
    with open(file_path, 'r') as file:
        return json.load(file)

def extract_items_as_json(data, keys_to_extract):
    if 'items' in data and isinstance(data['items'], list):
        extracted_items = [
            {
                key: item[key] for key in keys_to_extract if key in item
            }
            for item in data['items']
        ]
        return extracted_items  # Return the extracted items as a Python dictionary
    else:
        return []  # Return an empty list if 'items' is not in data or not a list

class SASDeployClient:
    def __init__(self):
        self.base_url = 'https://sasserver.demo.sas.com'
        self.headers = read_json('headers.json')

    def send_request(self, type, url, payload=None):
        if payload is None:
            return requests.request(
                type, 
                url, 
                headers=self.headers, 
                verify='../trustedcerts.pem')
        else:
            return requests.request(
                type, 
                url, 
                headers=self.headers, 
                data=payload, 
                verify='../trustedcerts.pem')
    
    def get_joblist(self, payload):
        request_url = self.base_url + "/jobExecution/jobs"
        response = self.send_request("GET", request_url, payload)
        
        if response.status_code == 200:
            return response.json()
        else:
            print(f"Failed to retrieve job list: {response.status_code}")
            return None

    def get_published_flows(self):
        request_url = self.base_url + "/modelPublish/models"
        response = self.send_request("GET", request_url)

        data = response.json()

        result = extract_items_as_json(data, KEYS_TO_EXTRACT)
        
        if response.status_code == 200:
            return result
        else:
            print(f"Failed to retrieve job list: {response.status_code}")
            return None

    def import_json_obj(self):
        request_url = self.base_url + "/transfer/importJobs"
        response = self.send_request("POST", request_url)

        return response

In [89]:
app = SASDeployClient()

data = app.get_published_flows()
type(data)

str

In [90]:
json.loads(data)

[{'creationTimeStamp': '2024-02-23T22:41:10.962773Z',
  'id': 'c1a711ad-f667-4249-8ad9-9f50f5aa12d7',
  'name': 'lbzx3kdnf4fhfzjdgdeqqxjq7fm',
  'publishName': 'lbzx3kdnf4fhfzjdgdeqqxjq7fm',
  'publishType': 'mas'},
 {'creationTimeStamp': '2024-02-23T22:41:43.315328Z',
  'id': 'd4a46783-3901-4c58-a3cb-cb874f95618c',
  'name': 'lmedt22s6fzaztjs5unrc2ryfzq',
  'publishName': 'lmedt22s6fzaztjs5unrc2ryfzq',
  'publishType': 'mas'},
 {'creationTimeStamp': '2024-06-04T08:46:21.013332Z',
  'id': '70b7a117-5c84-43fb-9b7b-822f04a0c2a1',
  'name': 'ml1_income_lgbm',
  'publishName': 'ml1_income_lgbm',
  'publishType': 'mas'},
 {'creationTimeStamp': '2024-06-04T08:46:47.484047Z',
  'id': '3a54ee03-dfcf-4731-9d88-b6ae840faa1e',
  'name': 'ml2_cluster_kmeans',
  'publishName': 'ml2_cluster_kmeans',
  'publishType': 'mas'},
 {'creationTimeStamp': '2024-06-04T08:47:15.423911Z',
  'id': '546d33e9-18a3-400a-a026-3266b9ed9164',
  'name': 'RB_Macro_PD_Calibrated__Pipeline_1_',
  'publishName': 'RB_Macro_

In [None]:
app = SASDeployClient()

payload = json.dumps({
  "jobDefinition": {
    "type": "validateDecision"
  },
  "arguments": {
    "decisionId": "63f49f1e-8ce0-4f35-9a44-0f941c4a7a38",
    "decisionRevisionId": "003e50fa-473a-472a-b101-1ef7f4133178",
    "reset": "false"
  }
})

app.get_joblist(payload)

In [None]:
https://sasserver.demo.sas.com/decisions/flows/