In [None]:
import ast
import asyncio
import aiohttp
import os
import json as js
import glob
import re
import base64
from itertools import product
from itertools import cycle
import time
from pathlib import Path

import logging

In [2]:
class CustomError(RuntimeError):
    pass

In [None]:
log_file = Path("logs/async_requests.log")
log_file.parent.mkdir(parents=True, exist_ok=True)

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler(log_file, mode="a"),
        logging.StreamHandler()
    ]
)

In [None]:
logging.basicConfig(level=logging.INFO)

MAX_RETRIES = 3
RETRY_DELAY = 3 #DELAY BY SECONDS

RETRY_FILE = "retry_gpt_global.txt"

SEMAPHORE = asyncio.Semaphore(300) # Decrease if toooo big

endpoints = "YOUR ENDPOINT"
api_keys = "YOUR API KEY"

endpoint_cycle = cycle(endpoints)
api_key_cycle = cycle(api_keys)

def get_next_endpoint_and_key():
    return next(endpoint_cycle), next(api_key_cycle)

In [5]:
def extract_ids(directory):
    integers = set()
    pattern = re.compile(r'^(\d+)_')
    
    for filename in os.listdir(directory):
        match = pattern.match(filename)
        if match:
            integers.add(int(match.group(1)))
    
    return sorted(integers)

In [None]:
def find_graph_data_by_pk(p_alias:str, size:str, index:int, fmt="edge_list") -> str:
    """Find the path of graph data file for (FMT, P_ALIAS, SIZE, INDEX)"""
    if size not in ["exlarge", "small"]:
        raise CustomError(f"Invalid size {size}.")

    term = "general" if p_alias == "SBM" else "special"

    str_pattern = f"{index}_{p_alias}_*.txt"
    files = glob.glob(f"data/{size}_{term}_graphs/{fmt}/{p_alias}/{str_pattern}")

    for file in files:
        match = re.match(rf"data/{size}_{term}_graphs/{fmt}/{p_alias}/(\d+)_.*\.txt$", file)
        if match and int(match.group(1)) == index:
            return file

    raise CustomError(f"Filename not found for {fmt} {p_alias} {size} {index}")

In [None]:
def read_graph_from_edge_list(filename:str):
    edges = []
    nodes = set()

    with open(filename, 'r') as file:
        for line in file:
            node1, node2 = map(int, line.split())
            
            edges.append((node1, node2))
            
            nodes.add(node1)
            nodes.add(node2)
    
    return sorted(list(nodes)), sorted(list(edges))

In [8]:
def get_p_alias(pattern: str) -> str:
    if pattern not in ["Cycle","Star", "Path", "Grid", "clustered graph"]:
        raise CustomError(f"Invalid pattern {pattern}")
    if pattern == "clustered graph":
        p_alias = "SBM"
    else:
        p_alias = pattern
    return p_alias

In [9]:
def encode_image(image_path):
    with open(image_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode("utf-8")

def prepare_data(p_alias:str, size:str, i:int):
    term = "general" if p_alias == "SBM" else "special"

    i_path1 = f"images/{size}_{term}_graphs/edge_list/{p_alias}/{i}_{p_alias}_iter10.png"
    i_path2 = f"images/{size}_{term}_graphs/edge_list/{p_alias}/{i}_{p_alias}_iter50.png"
    images = [encode_image(i_path1), encode_image(i_path2)]
    
    p_path1 = f"layouts/{size}_{term}_graphs/edge_list/{p_alias}/{i}_{p_alias}_iter10.json"
    p_path2 = f"layouts/{size}_{term}_graphs/edge_list/{p_alias}/{i}_{p_alias}_iter50.json"
    with open(p_path1, "r") as f:
        p1 = js.load(f)
    with open(p_path2, "r") as f:
        p2 = js.load(f)
    pos = [p1, p2]

    return images, pos


In [None]:
def parse_graph_to_str_el(f):
    n, e = read_graph_from_edge_list(f)
    return n, str(e)

async def perform_test(session, test_pks:list, p_alias:str, ids:list, debug=False, virtual=True, console=True):
    tasks = []
    valid_requirements = ["edge", "distance", "community"] if p_alias == "SBM" else ["edge", "distance"]
    
    debug_count = 0
    model, size, modality, task = test_pks

    if task not in valid_requirements:
        raise CustomError(f"Invalid requirement {task} for {p_alias}.")


    for i in ids[size]:
        f = find_graph_data_by_pk(p_alias, size, i)
        
        debug_count += 1
        if debug == True and debug_count > 1:
            break

        nodes, graph_data = parse_graph_to_str_el(f)

        image, coordinates = prepare_data(p_alias, size, i)
            
        content = generate_prompt_q3(modality, True, task, image, coordinates, graph_data)
        if not virtual: 
            tasks.append(create_completion(session, content, modality, p_alias, model, size, i, task, True, debug=debug))
        else:
            v_c = v_completion(content, model)
            tasks.append(v_c)
            if console:
                print(v_c)
    return tasks

In [None]:
async def do_major_task(d_p_aliases:list=None, d_modalities:list=None, d_ids:list=None, d_model=None, d_size=None, d_tasks:list=None, debug=False, virtual=True, console=True):

    p_aliases = ["Cycle", "Grid", "Path", "Star", "SBM"]
    fmts = ["edge_list"]
    models = ["gpt-4o-2024-11-20", "gemini-2.0-flash-001"]
    sizes = ["small", "exlarge"]
    modalities = ["image","data+pos","image+data+pos"]
    tasks = ["edge", "distance"]
    potential_tasks = ["edge", "distance", "community"]

    if d_model != None:
        if d_model not in models:
            raise CustomError(f"Invalid d_model {d_model}.")
        else:
            models = [d_model]

    if d_size != None:
        if d_size not in sizes:
            raise CustomError(f"Invalid d_size {d_size}.")
        else:
            sizes = [d_size]

    if d_tasks != None:
        for d_req in d_tasks:
            if d_req not in potential_tasks:
                raise CustomError(f"Invalid tasks {d_tasks}, which {d_req} not in {potential_tasks}.")
            if d_req == "community" and d_p_aliases[0] != "clustered graph" and len(d_p_aliases) != 1:
                raise CustomError("Invalid requirement community for non-clustered graph.")
        tasks = d_tasks

    if d_p_aliases != None:
        for d_p_alias in d_p_aliases:
            if d_p_alias not in p_aliases:
                raise CustomError(f"Invalid p_alias {d_p_alias}, which not in {p_aliases}.")
        p_aliases = d_p_aliases

    if d_modalities != None:
        for d_modality in d_modalities:
            if d_modality not in modalities:
                raise CustomError(f"Invalid d_modality {d_modality}, which not in {modalities}.")
        modalities = d_modalities

 
    timeout_seconds = 180
    session_timeout = aiohttp.ClientTimeout(total=None,sock_connect=timeout_seconds,sock_read=timeout_seconds)
    coroutines = []
    async with aiohttp.ClientSession(timeout=session_timeout)as session:
        for p_alias in p_aliases:
            term = "general" if p_alias == "SBM" else "special"
            ids = dict()
            ex_ids = extract_ids(f"data/exlarge_{term}_graphs/edge_list/{p_alias}")
            s_ids = extract_ids(f"data/small_{term}_graphs/edge_list/{p_alias}")

            ids["exlarge"] = ex_ids
            ids["small"] = s_ids

            if d_ids != None:
                ids["exlarge"] = sorted(d_ids)
                ids["small"] = sorted(d_ids)
    
            test_pks = list(product(models, sizes, modalities, tasks))
            if d_tasks == None and p_alias == "SBM":
                test_pks.extend(list(product(models, sizes, modalities, ["community"])))
            for test_pk in test_pks:
                coroutines.extend(await perform_test(session, test_pk, 
                                                p_alias=p_alias, ids=ids, 
                                                debug=debug, virtual=virtual, console=console))
        if not console:
                print(f"Request count {len(coroutines)}.")
        if not virtual:
            results = await asyncio.gather(*coroutines)

            failed_requests = [res for res in results if "error" in res]
            with open(RETRY_FILE, "w") as f:
                f.write("")
            for req in failed_requests:
                model, p_alias, size, i, task, modality = req["PK"] 
                with open(RETRY_FILE, "a") as f:
                    f.write(f"Faild,{task},{p_alias}_{size}_{i}_{modality}_results.txt\n")
            logging.info(f"Failed Requests: {failed_requests}")

            

In [None]:
def generate_prompt_q3(input_modality:str, is_global:bool, task:str, image=None, coordinates:list=None, graphdata:str=None, **kwargs):

    if type(is_global) != bool:
        raise CustomError(f"IS_GLOBAL must be bool, now {type(is_global)}.")
    
    match input_modality:
        case "image":
            if image is None:
                raise CustomError("Input image is None when input_modality set to \"image\".")
        case "data+pos":
            if coordinates is None or type(coordinates) != list:
                raise CustomError("Invalid input coordinates when input_modality set to \"data+pos\"")
            if graphdata is None or type(graphdata) != str:
                raise CustomError("Invalid input graphdata when input_modality set to \"data+pos\"")
        case "image+data+pos":
            if image is None:
                raise CustomError("Input image is None when input_modality set to \"image+data+pos\".")
            if coordinates is None or type(coordinates) != list:
                raise CustomError("Invalid input coordinates when input_modality set to \"image+data+pos\"")
            if graphdata is None or type(graphdata) != str:
                raise CustomError("Invalid input graphdata when input_modality set to \"image+data+pos\"")
        case _:
            raise CustomError(f"Invalid input_modality {input_modality}")        
    
    if is_global:
        match task:
            case "edge":
                s_q = "Which has the fewest number of edge crossings?"
                s_answer = "Answer 1 or 2. Put your final answer in a json block, use a field \"Answer\" to present your answer."
                s_input = "\n<edge_data>"
            case "distance":
                s_q = "Which better preserves graph-theoretic distance?"
                s_answer = "Answer 1 or 2. Put your final answer in a json block, use a field \"Answer\" to present your answer."
                s_input = "\n<subgraph_data>"
            case "community":
                s_q = "Which keeps the community structure visually clearer"
                s_answer = "Answer 1 or 2. Put your final answer in a json block, use a field \"Answer\" to present your answer."
                s_input = "\n<graph_layout>"
            case _:
                raise CustomError(f"TASK must be one of [\"edge\", \"distance\", \"community\"]")
    
        match input_modality:
            case "image":
                s_input_instruction = "image"
                s_input = ""
                s_graphdata = ""
                s_pos = ""
            case "data+pos":
                s_input_instruction = "graph data and node coordinates"
                s_graphdata = f"\n{graphdata}"
                s_pos = f"\n{coordinates[0]}\n{coordinates[1]}"
            case "image+data+pos":
                s_input_instruction = "image, graph data and node coordinates"
                s_graphdata = f"\n{graphdata}"
                s_pos = f"\n{coordinates[0]}\n{coordinates[1]}"
            case _:
                raise CustomError(f"Invalid input_modality {input_modality}")
    
        prompt = f"Given two layouts of a graph in the {s_input_instruction} format. {s_q} {s_answer}{s_input}{s_graphdata}{s_pos}"
    else:
        match input_modality:
            case "image":
                s_input_instruction = "image"
                s_graphdata = ""
                s_pos = ""
            case "data+pos":
                s_input_instruction = "graph data and node coordinates"
                s_graphdata = f"\n{graphdata}"
                s_pos = f"\n{coordinates[0]}"
            case "image+data+pos":
                s_input_instruction = "image, graph data and node coordinates"
                s_graphdata = f"\n{graphdata}"
                s_pos = f"\n{coordinates[0]}"
            case _:
                raise CustomError(f"Invalid input_modality {input_modality}")

        match task:
            case "edge":
                s_q = f"Given two edges in {s_input_instruction} format , determine whether they intersect. Answer either \"Yes\" or \"No\"."
                s_input = "\n<edge_data>"
                s_anwer = "\nPut your final answer in a json block, use a field \"Answer\" to present your answer."
            case "distance":
                source, destination = kwargs["source"], kwargs["destination"]
                s_q = f"Given a subgraph in {s_input_instruction} format. For the specified source node {source} and target node {destination} , assess whether maintains the consistency between Euclidean distance and graph theoretic distance." 
                s_input = "\n<subgraph_data>"
                s_anwer = "\nAnswer \"Euclidean distance is greater than graph-theoretic distance\", or\"Euclidean distance equals to graph-theoretic distance\" or \"Euclidean distance is less than graph-theoretic distance."\
                " Put your final answer in a json block, use a field \"Answer\" to present your answer."
            case "community":
                s_q = f"Given a clustered graph layout in {s_input_instruction} format,  infer the number of visible communities."
                s_input = "\n<graph_layout>"
                s_anwer = "\nYour answer shall be an Integer. Put your final answer in a json block, use a field \"Answer\" to present your answer."
            case _:
                raise CustomError(f"TASK must be one of [\"edge\", \"distance\", \"community\"]")
        
        if input_modality == "image":
            s_input = ""

        prompt = f"{s_q}{s_input}{s_graphdata}{s_pos}{s_anwer}"
    
    
    content = [{
                    "type": "text",
                    "text": prompt,
        }]
    
    if input_modality == "image" or input_modality == "image+data+pos":
        if is_global:
            if type(image) != list:
                raise CustomError("Two images expected.")
            content.append({
                    "type": "image_url",
                    "image_url": {"url": f"data:image/png;base64,{image[0]}"},
                    "detail": "high",
                })
            content.append({
                    "type": "image_url",
                    "image_url": {"url": f"data:image/png;base64,{image[1]}"},
                    "detail": "high",
                })
        else:
            if type(image) != str:
                raise CustomError("Only 1 image expected.")
            content.append({
                    "type": "image_url",
                    "image_url": {"url": f"data:image/png;base64,{image}"},
                    "detail": "high",
                })


    return content

In [13]:
def parse_result(rs, storage_file):
    result_data = rs

    with open(storage_file, "w+") as f:
        f.write(rs)

In [None]:
async def create_completion(session, content:list, modality:str, p_alias:str, model:str, size:str, i:int, task:int, is_gloabl:bool, debug=False, attempt=1, official=False):
    
    s_gloabl = "global" if is_gloabl else "local"
    
    if debug == False:
        dir_path = f"results_{s_gloabl}/{model}/{size}/{p_alias}/{task}/{modality}" # modify structure if inappropriate
        full_res_dir_path = f"full_reses_{s_gloabl}/{model}/{size}/{p_alias}/{task}/{modality}"
    elif debug == True:
        dir_path = f"results_{s_gloabl}-debug/{model}/{size}/{p_alias}/{task}/{modality}"
        full_res_dir_path = f"full_reses_{s_gloabl}-debug/{model}/{size}/{p_alias}/{task}/{modality}"
    
    model_url, key = get_next_endpoint_and_key()

    headers = headers = {
        "Authorization": f"Bearer {key}",
        "Content-Type": "application/json",
    }

    model_alias = "deepseek-chat" if model == "deepseek-v3" else model

    payload = {
                "model": model_alias,
                "max_tokens": 8000,
                "temperature": 0,
                "messages": [
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user",
                "content": content
            }],
                }
    
    os.makedirs(dir_path, exist_ok=True)
    os.makedirs(full_res_dir_path, exist_ok=True)

    
    storage_path = f"{dir_path}/{p_alias}_{size}_{i}_{task}_results.txt"
    full_res_path = f"{full_res_dir_path}/{p_alias}_{size}_{i}_{task}_results.json"
    
    async with SEMAPHORE:
        try:
            async with session.post(
                url=f"{model_url}/v1/chat/completions",
                json=payload,
                headers=headers
            ) as response:
                if response.status == 200:
                    no_exception = True
                    buffer = []
                    async for line in response.content:
                        text = line.decode("utf-8").strip()
                        if text:
                            buffer.append(text)
                    full_response = "\n".join(buffer)
                    try:
                        result = js.loads(full_response)
                    except js.JSONDecodeError as e:
                        no_exception = False
                        logging.warning(f"JSONDecodeError: {e}, {p_alias}, {model}, {size}, {i}, {task}, {modality}")
                        if attempt < MAX_RETRIES:
                            await asyncio.sleep(RETRY_DELAY * attempt)
                        else:
                            logging.error(f"Request failed after {MAX_RETRIES} attempts: {model}, {p_alias}, {size}, {i}, {task}, {modality}")
                            return {"error": "None response", "PK": (model, p_alias, size, i, task, modality), "attempts": attempt}
                    if no_exception:
                        with open(full_res_path, "w") as f:
                            js.dump(result, f)
                        try:
                            finish_reason = result['choices'][0]['finish_reason']
                            if finish_reason == "stop":
                                rs = result['choices'][0]['message']['content']
                                parse_result(rs, storage_path)
                            elif model == "gpt-4o-2024-11-20" and finish_reason == "content_filter":
                                raise CustomError("Content filtered.")
                            else:
                                no_exception = False
                                print(finish_reason, f"{model}, {p_alias}, {size}, {i}, {task}, {modality}")
                                return {"error": finish_reason, "PK": (model, p_alias, size, i, task, modality), "attempts": attempt}
                        except TypeError as e:
                            no_exception = False
                            logging.warning(f"{e}, {model}, {p_alias}, {size}, {i}, {task}, {modality}")
                            if attempt < MAX_RETRIES:
                                await asyncio.sleep(RETRY_DELAY * attempt)
                            else:
                                logging.error(f"Request failed after {MAX_RETRIES} attempts: {model}, {p_alias}, {size}, {i}, {task}, {modality}")
                                return {"error": "No content in response", "PK": (model, p_alias, size, i, task, modality), "attempts": attempt}
                        except CustomError as e:
                            no_exception = False
                            logging.warning(f"Content filtered: {model}, {p_alias}, {size}, {i}, {task}, {modality}")
                            if os.path.isfile(storage_path):
                                os.remove(storage_path)
                            if official or attempt >= MAX_RETRIES:
                                logging.error(f"Request failed for content filtered: {model}, {p_alias}, {size}, {i}, {task}, {modality}")
                                return {"error": str(e), "PK": (model, p_alias, size, i, task, modality), "attempts": attempt}
                            else:
                                await asyncio.sleep(RETRY_DELAY * attempt)
                    if no_exception:
                        parse_result(rs, storage_path)
                        return {"Success": (model, p_alias, size, i, task, modality)}
                else:
                    logging.warning(f"Attempt  {model}, {p_alias}, {size}, {i}, {task}, {modality}, failed for {response.status}")

                    if attempt < MAX_RETRIES:
                        await asyncio.sleep(RETRY_DELAY * attempt)
                    else:
                        logging.error(f"Request failed after {MAX_RETRIES} attempts:  {model}, {p_alias}, {size}, {i}, {task}, {modality}")
                        return {"error": "MAX_RETRY", "PK": (model, p_alias, size, i, task, modality), "attempts": attempt}
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            logging.warning(f"Attempt  {model}, {p_alias}, {size}, {i}, {task}, {modality}, excepts for {e}")

            if attempt < MAX_RETRIES:
                await asyncio.sleep(RETRY_DELAY * attempt)
            else:
                logging.error(f"Request failed after {MAX_RETRIES} attempts:  {model}, {p_alias}, {size}, {i}, {task}, {modality}")
                return {"error": str(e), "PK": (model, p_alias, size, i, task, modality), "attempts": attempt}
    return await create_completion(session, content, modality, p_alias, model, size, i, task, True, debug, attempt+1)
        

In [15]:
def v_completion(content:list, model: str):
    model_alias = "deepseek-chat" if model == "deepseek-v3" else model

    json={
        "model": model_alias,
        "max_tokens": 8000,
        "temperature": 0,
        "messages": [
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user",
        "content": content
    }],
        }
    return json

In [None]:
#Example usage:
await do_major_task(d_p_aliases=["Path"], debug=False, virtual=True, console=False)