In [6]:
import requests
import concurrent.futures
import re
import numpy as np
import json
import ast
import pandas as pd
import glob
import os
import logging

### Transcript and Prompt Set Import

In [7]:
# Prompt import - 41 prompts
df_prompts_X = pd.read_csv(r"C:\Users\belie\OneDrive\Documents\GitHub\AutoQA-FunctionDev\X\X Prompts.csv")

# Transcript import
def read_files():
    # Get all files from 'Call Transcripts_X' folder
    files = glob.glob(r'C:\Users\belie\OneDrive\Documents\GitHub\AutoQA-FunctionDev\X\Call Transcripts_X/*.txt')
    
    # Read all filenames and file contents
    filenames = []
    data = []
    
    for file in files:
        filenames.append(file)  # Store the filename
        with open(file, 'r', encoding='utf-8') as f:
            data.append(f.read())  # Store the file content
    
    return filenames, data

# Get filenames and transcripts
filenames, transcripts = read_files()
# Clean filenames to remove .txt extension
filenames = [os.path.splitext(os.path.basename(file))[0] for file in filenames]

### Prompt Setup

In [8]:
# system and user prompt setup
system_prompt = f"""
Answer questions based on the interaction between a call-center agent and a customer.
Ensure the output follows the JSON format below:
{{
    "Question #": "[Question Number]",
    "Answer": "[Yes/No/NA/Good/Average/Needs Improvement]"
}}
Interaction:
{transcripts[0]}
"""
# "Justification": "[Provide your justification based on the interaction]"

questions = df_prompts_X['Prompt'].tolist()

user_prompt = ""
for i, question in enumerate(questions, 1):
    user_prompt += f"""
    Question {i}: {question}
    Answer the following in the format provided:
    {{
        "Question #": "{i}",
        "Answer": "[Yes/No/NA/Good/Average/Needs Improvement]"
    }}
    """

In [9]:
# Token Length
import tiktoken

def count_tokens(text):
    encoder = tiktoken.encoding_for_model("gpt-4")
    tokens = encoder.encode(text)
    return len(tokens)

count_tokens(f"{system_prompt} {user_prompt}")

27027

### Authentication and Payload Setup

In [10]:
import requests
import json
import time

# Step 1: Define your variables
url = "https://api.runpod.ai/v2/k9fanpyhc2z8ya/run"
headers = {"Authorization": "Bearer rpa_ARG4EDO1OIMKM70C4J04YBVR1685WN3VB46AFUSU1c54vp", "Content-Type": "application/json"}

payload = {"input": {
    "messages": [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_prompt}
    ],
    "sampling_params": {"temperature": 0.7, "max_tokens": 32768}
}}

### Inferencing

#### Single Inference with polling

In [12]:
# Step 2: Send the job request
response = requests.post(url, headers=headers, json=payload)
if response.status_code != 200:
    print("Failed to submit job:", response.text)
    exit()

response_json = response.json()
job_id = response_json["id"]
print(f"Job submitted. ID: {job_id}")

# Step 3: Poll the status endpoint
status_url = f"https://api.runpod.ai/v2/k9fanpyhc2z8ya/status/{job_id}"

while True:
    status_response = requests.get(status_url, headers=headers)
    status_data = status_response.json()

    print("Current status:", status_data["status"])

    if status_data["status"] == "COMPLETED":
        print("Job completed.")
        print("Output:", json.dumps(status_data["output"], indent=2))
        break
    elif status_data["status"] == "FAILED":
        print("Job failed.")
        print("Details:", status_data)
        break
    elif status_data["status"] == "CANCELLED":
        print("Job cancelled.")
        print("Details:", status_data)
        break
    else:
        time.sleep(1)  # Wait 1 seconds before polling again


Job submitted. ID: 719b41b8-067e-4d37-8246-eade2b0beee3-u2
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: 

#### Multi Iterative Inferencing - one request sent at a time

In [None]:
# Iteratively send requests
job_ids = []
number_of_requests = 1000

for i in range(0, number_of_requests):
    response = requests.post(url, headers=headers, json=payload)
    if response.status_code != 200:
        print("Failed to submit job:", response.text)
        exit()
    response_json = response.json()
    job_id = response_json["id"]
    job_ids.append(job_id)
    print(f"Job submitted. ID: {job_id}")

#### Batch Inferencing - 1000 Requests at a time with 2.5 minutes of sleep in between

Setup

In [None]:
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random
import json

TOTAL_REQUESTS = 1000 # adjust based on load of single inference
MAX_WORKERS = 200 

job_ids = []

def send_request(i, max_retries=5):
    attempt = 0
    while attempt < max_retries:
        try:
            response = requests.post(url, headers=headers, json=payload)
            if response.status_code == 200:
                return response.json()["id"]
            elif response.status_code == 429:
                print(f"[{i}] 429 Too Many Requests — backing off for 60s (attempt {attempt+1})")
                time.sleep(60)
                attempt += 1
            else:
                print(f"[{i}] Failed: {response.status_code} - {response.text}")
                return None
        except Exception as e:
            print(f"[{i}] Exception: {e}")
            time.sleep(5)
            attempt += 1
    print(f"[{i}] Failed after {max_retries} retries.")
    return None

def send_requests():
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = {executor.submit(send_request, i): i for i in range(TOTAL_REQUESTS)}
        for future in as_completed(futures):
            job_id = future.result()
            if job_id:
                job_ids.append(job_id)
            if len(job_ids) % 100 == 0:
                print(f"{len(job_ids)} jobs submitted...")
    end_time = time.time()
    print(f"\n✅  Submitted {len(job_ids)} jobs in {end_time - start_time:.2f} seconds.")

Batch Requests Sent Here

In [None]:
factor = 10 
# Sending 'factor' batches of 1000 requests to avoid overwhelming the server
for i in range(0, factor):
    print(f"Starting batch {i+1} of {1000} requests...")
    send_requests()
    print(f"Sleeping for 150 seconds before next batch...")
    time.sleep(150)

### Logic App Setup

#### Step 1: Single Job Submission

In [15]:
import requests
import tiktoken
import logging

def validate_inputs(transcript, questions):
    if not transcript or not isinstance(transcript, str):
        raise ValueError("Invalid transcript: Must be a non-empty string.")
    if not questions or not isinstance(questions, list) or not all(isinstance(q, str) for q in questions):
        raise ValueError("Invalid questions: Must be a list of strings.")
    logging.info("Input validation passed.")

def create_prompts(transcript, questions):
    logging.info("Creating system and user prompts...")
    system_prompt = f"""
    Answer questions based on the interaction between a call-center agent and a customer.
    Ensure the output follows the JSON format below:
    {{
        "Question #": "[Question Number]",
        "Answer": "[Yes/No/NA/Good/Average/Needs Improvement]"
    }}
    Interaction:
    {transcript}
    """
    user_prompt = ""
    for i, question in enumerate(questions, 1):
        user_prompt += f"""
        Question {i}: {question}
        Answer the following in the format provided:
        {{
            "Question #": "{i}",
            "Answer": "[Yes/No/NA/Good/Average/Needs Improvement]"
        }}
        """
    return system_prompt.strip(), user_prompt.strip()

def determine_max_tokens(system_prompt, user_prompt):
    ALLOWED_TOKEN_LENGTHS = [1024, 2048, 4096, 8192, 16384, 32768, 65536]
    logging.info("Encoding prompt for token length...")
    try:
        tokens = tiktoken.encoding_for_model("gpt-4").encode(f"{system_prompt} {user_prompt}")
    except Exception as e:
        raise RuntimeError(f"Token encoding failed: {str(e)}")

    token_length = len(tokens)
    logging.info(f"Total token length: {token_length}")
    if token_length > ALLOWED_TOKEN_LENGTHS[-1]:
        raise ValueError(f"Token limit exceeded. Token Length: {token_length}. Max allowed: {ALLOWED_TOKEN_LENGTHS[-1]}")
    
    for allowed in ALLOWED_TOKEN_LENGTHS:
        if token_length <= allowed:
            logging.info(f"Using max_tokens = {allowed}")
            return allowed
    return ALLOWED_TOKEN_LENGTHS[-1]

def build_payload(system_prompt, user_prompt, max_tokens):
    logging.info("Building request payload...")
    return {
        "input": {
            "messages": [
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt}
            ],
            "sampling_params": {
                "temperature": 0.7,
                "max_tokens": max_tokens
            }
        }
    }

def send_post_request(payload):
    RUNPOD_URL = "https://api.runpod.ai/v2/k9fanpyhc2z8ya/run"
    AUTH_TOKEN = "Bearer rpa_ARG4EDO1OIMKM70C4J04YBVR1685WN3VB46AFUSU1c54vp"
    headers = {
        "Authorization": AUTH_TOKEN,
        "Content-Type": "application/json"
    }
    logging.info("Sending POST request to RunPod API...")
    try:
        response = requests.post(RUNPOD_URL, headers=headers, json=payload)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        raise RuntimeError(f"Job submission failed: {str(e)}")
    except Exception as e:
        raise RuntimeError(f"Unexpected error during response handling: {str(e)}")

def extract_job_id(response_json):
    job_id = response_json.get("id")
    if not job_id:
        raise ValueError(f"Job submission failed: Missing job ID in response: {response_json}")
    return job_id

# Main function
def submit_job(transcript, questions):
    # Setup logging
    logging.basicConfig(
        level=logging.INFO,
        format='[%(asctime)s] %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler("job_submission.log"),
            logging.StreamHandler()
        ]
    )
    try:
        validate_inputs(transcript, questions)
        system_prompt, user_prompt = create_prompts(transcript, questions)
        max_tokens = determine_max_tokens(system_prompt, user_prompt)
        payload = build_payload(system_prompt, user_prompt, max_tokens)
        response_json = send_post_request(payload)
        job_id = extract_job_id(response_json)
        logging.info(f"Job submitted successfully. ID: {job_id}")
        return job_id
    except Exception as e:
        logging.error(f"Error in submit_job: {str(e)}")
        return f"Error: {str(e)}"


In [16]:
submit_job(transcripts[0], questions)

[2025-04-30 15:16:15,025] INFO - Input validation passed.
[2025-04-30 15:16:15,025] INFO - Creating system and user prompts...
[2025-04-30 15:16:15,027] INFO - Encoding prompt for token length...
[2025-04-30 15:16:15,047] INFO - Total token length: 27028
[2025-04-30 15:16:15,048] INFO - Using max_tokens = 32768
[2025-04-30 15:16:15,049] INFO - Building request payload...
[2025-04-30 15:16:15,050] INFO - Sending POST request to RunPod API...
[2025-04-30 15:16:15,333] INFO - Job submitted successfully. ID: d674f41a-2468-4988-8565-9a97dcd0fec4-u2


'd674f41a-2468-4988-8565-9a97dcd0fec4-u2'

#### Step 2: Status Polling

In [19]:
import requests
import time
import logging

def validate_job_id(job_id):
    if not job_id or not isinstance(job_id, str):
        raise ValueError("Invalid job_id: Must be a non-empty string.")

def build_status_url(job_id):
    return f"https://api.runpod.ai/v2/k9fanpyhc2z8ya/status/{job_id}"

def get_job_status(status_url, headers):
    try:
        response = requests.get(status_url, headers=headers)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        raise RuntimeError(f"Status check failed: {str(e)}")
    except ValueError as e:
        raise RuntimeError(f"Invalid JSON in response: {str(e)}")

def process_status_response(status_data):
    status = status_data.get("status")
    if not status:
        raise ValueError("Missing 'status' field in response.")
    logging.info(f"Current status: {status}")

    if status == "COMPLETED":
        logging.info("Job completed.")
        return "COMPLETED"
    elif status == "FAILED":
        logging.error(f"Job failed. Details: {status_data}")
        return "FAILED"
    elif status == "CANCELLED":
        logging.warning(f"Job cancelled. Details: {status_data}")
        return "CANCELLED"
    return None  # Job still in progress

# Main polling function
def status_poll(job_id):
    AUTH_TOKEN = "Bearer rpa_ARG4EDO1OIMKM70C4J04YBVR1685WN3VB46AFUSU1c54vp"
    POLL_INTERVAL_SECONDS = 60
    # Setup logging (reuse if already configured elsewhere)
    logging.basicConfig(
        level=logging.INFO,
        format='[%(asctime)s] %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler("job_submission.log"),
            logging.StreamHandler()
        ]
    )
    try:
        validate_job_id(job_id)
        status_url = build_status_url(job_id)
        headers = {
            "Authorization": AUTH_TOKEN,
            "Content-Type": "application/json"
        }

        logging.info(f"Polling status for job ID: {job_id}")
        while True:
            status_data = get_job_status(status_url, headers)
            final_status = process_status_response(status_data)
            if final_status:
                return final_status
            logging.info(f"Job still in progress or in queue. Sleeping for {POLL_INTERVAL_SECONDS} seconds...")
            time.sleep(POLL_INTERVAL_SECONDS)

    except Exception as e:
        logging.error(f"Error in status_poll: {str(e)}")
        return f"Error: {str(e)}"


In [20]:
status_poll('d674f41a-2468-4988-8565-9a97dcd0fec4-u2')

[2025-04-30 15:17:08,274] INFO - Polling status for job ID: d674f41a-2468-4988-8565-9a97dcd0fec4-u2
[2025-04-30 15:17:08,418] INFO - Current status: IN_PROGRESS
[2025-04-30 15:17:08,419] INFO - Job still in progress or in queue. Sleeping for 60 seconds...
[2025-04-30 15:18:08,596] INFO - Current status: COMPLETED
[2025-04-30 15:18:08,597] INFO - Job completed.


'COMPLETED'

#### Step 3: Response Acquisition and Validation

In [21]:
import requests
import logging
import re
import ast

# Helper Functions
def extract_json_from_response(raw_response):
    try:
        def extract_think_content(response_text):
            think_match = re.search(r'<think>(.*?)</think>', response_text, re.DOTALL)
            think_content = think_match.group(1).strip() if think_match else "No structured thought content available."
            remaining_text = re.sub(r'<think>.*?</think>', '', response_text, flags=re.DOTALL).strip()
            return think_content, remaining_text

        def clean_and_dict(text):
            cleaned_text = re.sub(r'(?s).*?({.*?})\s*```$', r'\1', text).strip()
            return ast.literal_eval(cleaned_text)

        think_content, final_ans = extract_think_content(raw_response)
        logging.info(f"Extracted final answer segment: {final_ans}")
        cleaned_str = re.sub(r'^[^\{]*\{', '{', final_ans, count=1)
        logging.info(f"Cleaned JSON string: {cleaned_str}")
        return clean_and_dict(cleaned_str)

    except Exception as e:
        logging.error(f"Failed to extract single JSON from response: {str(e)}")
        return f"Error extracting JSON: {str(e)}"

def extract_jsons_from_response(raw_response):
    try:
        def extract_think_content(response_text):
            think_match = re.search(r'<think>(.*?)</think>', response_text, re.DOTALL)
            think_content = think_match.group(1).strip() if think_match else "No structured thought content available."
            remaining_text = re.sub(r'<think>.*?</think>', '', response_text, flags=re.DOTALL).strip()
            return think_content, remaining_text

        def clean_and_dict(text):
            cleaned_text = re.sub(r'^[^\{]*\{', '{', text, count=1)
            return ast.literal_eval(cleaned_text)

        think_content, final_ans = extract_think_content(raw_response)
        json_pattern = r'\{.*?\}'
        json_matches = re.findall(json_pattern, final_ans, flags=re.DOTALL)

        json_dicts = []
        for json_str in json_matches:
            try:
                cleaned_str = re.sub(r'^[^\{]*\{', '{', json_str, count=1)
                json_dict = clean_and_dict(cleaned_str)
                json_dicts.append(json_dict)
            except Exception as inner_e:
                logging.warning(f"Failed to parse a JSON block: {str(inner_e)}")

        logging.info(f"Extracted {len(json_dicts)} JSON blocks from response.")
        return json_dicts

    except Exception as e:
        logging.error(f"Failed to extract multiple JSONs from response: {str(e)}")
        return []

def check_job_status(job_id):
    endpoint_id = "k9fanpyhc2z8ya"
    try:
        url = f"https://api.runpod.ai/v2/{endpoint_id}/status/{job_id}"
        headers = {"Authorization": f"Bearer rpa_ARG4EDO1OIMKM70C4J04YBVR1685WN3VB46AFUSU1c54vp"}
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        return response.json()
    except Exception as e:
        logging.error(f"Error fetching job status for {job_id}: {str(e)}")
        return {}

def validate_resp(job_id, num_of_questions):
    # Logging setup (reuse existing config if already defined)
    logging.basicConfig(
        level=logging.INFO,
        format='[%(asctime)s] %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler("job_submission.log"),
            logging.StreamHandler()
        ]
    )
    try:
        status = check_job_status(job_id)
        output = status.get('output')
        if not output:
            raise ValueError("No output field found in job response.")

        response_text = output[0].get('choices', [{}])[0].get('tokens', [None])[0]
        if not response_text:
            raise ValueError("Missing response tokens from job output.")

        jsons = extract_jsons_from_response(response_text)
        if len(jsons) == num_of_questions:
            logging.info(f"Job {job_id} returned a valid response with {len(jsons)} items.")
            return jsons
        else:
            logging.warning(f"Job {job_id} returned {len(jsons)} items, expected {num_of_questions}.")
            return f"Job {job_id} returned an invalid response."

    except Exception as e:
        logging.error(f"Error validating response for job {job_id}: {str(e)}")
        return f"Error validating response: {str(e)}"


In [22]:
validate_resp('d674f41a-2468-4988-8565-9a97dcd0fec4-u2', len(questions))

[2025-04-30 15:18:54,897] INFO - Extracted 41 JSON blocks from response.
[2025-04-30 15:18:54,897] INFO - Job d674f41a-2468-4988-8565-9a97dcd0fec4-u2 returned a valid response with 41 items.


[{'Question #': '1', 'Answer': 'Yes'},
 {'Question #': '2', 'Answer': 'Yes'},
 {'Question #': '3', 'Answer': 'Yes'},
 {'Question #': '4', 'Answer': 'Yes'},
 {'Question #': '5', 'Answer': 'Good'},
 {'Question #': '6', 'Answer': 'Yes'},
 {'Question #': '7', 'Answer': 'Yes'},
 {'Question #': '8', 'Answer': 'Yes'},
 {'Question #': '9', 'Answer': 'No'},
 {'Question #': '10', 'Answer': 'Yes'},
 {'Question #': '11', 'Answer': 'No'},
 {'Question #': '12', 'Answer': 'Yes'},
 {'Question #': '13', 'Answer': 'Yes'},
 {'Question #': '14', 'Answer': 'Yes'},
 {'Question #': '15', 'Answer': 'No'},
 {'Question #': '16', 'Answer': 'No'},
 {'Question #': '17', 'Answer': 'Yes'},
 {'Question #': '18', 'Answer': 'Yes'},
 {'Question #': '19', 'Answer': 'Good'},
 {'Question #': '20', 'Answer': 'Needs Improvement'},
 {'Question #': '21', 'Answer': 'Needs Improvement'},
 {'Question #': '22', 'Answer': 'No'},
 {'Question #': '23', 'Answer': 'Needs Improvement'},
 {'Question #': '24', 'Answer': 'Needs Improvement