In [120]:
from azure.storage.blob import BlobServiceClient
import json
from math import ceil
from datetime import datetime
import os


In [121]:
TASK_NAME = "source_parsing_v0"
DEPLOYMENT_NAME = "gpt-4o--batch-2"

In [122]:
INPUT_DATA_PATH = f"../local_tests_data/source_raw_content/"
run_n = len(os.listdir(INPUT_DATA_PATH))
RUNID = f"RUNID_{run_n}"
INPUT_DATA_PATH = f"../local_tests_data/source_raw_content/{RUNID}/"
RUN_TIME = datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S')

OUTPUT_DATA_PATH = f"../local_tests_data/azure_openai_batch_processing_files/{RUNID}/{TASK_NAME}/BATCHFILES/"

os.makedirs(OUTPUT_DATA_PATH, exist_ok=True)

print(f"Run ID: {RUNID} at {RUN_TIME}")

Run ID: RUNID_1 at 2025-05-29 16:18:03


In [123]:
def get_source_raw_contents():
    source_raw_contents = []
    for file in os.listdir(INPUT_DATA_PATH):
        if file.endswith(".json"):
            with open(os.path.join(INPUT_DATA_PATH, file), 'r') as f:
                data = json.load(f)
                source_raw_contents.append(data)
    return source_raw_contents


In [124]:
source_raw_contents = get_source_raw_contents()

In [125]:
source_raw_contents

[{'url': 'https://thenextweb.com/',
  'name': 'the_next_web',
  'crawl_time': '2025-05-29 12:38:03'},
 {'url': 'https://techcrunch.com/latest/',
  'name': 'techcrunch',
  'raw_jina_content': '{"data":null,"code":451,"name":"SecurityCompromiseError","status":45102,"message":"Domain techcrunch.com blocked until Thu May 29 2025 10:39:32 GMT+0000 (Coordinated Universal Time) due to previous abuse found on https://techcrunch.com/2025/05/19/sylndr-with-fresh-15-7m-allows-users-to-buy-sell-finance-and-service-used-cars-in-egypt: DDoS attack suspected: Too many requests","readableMessage":"SecurityCompromiseError: Domain techcrunch.com blocked until Thu May 29 2025 10:39:32 GMT+0000 (Coordinated Universal Time) due to previous abuse found on https://techcrunch.com/2025/05/19/sylndr-with-fresh-15-7m-allows-users-to-buy-sell-finance-and-service-used-cars-in-egypt: DDoS attack suspected: Too many requests"}',
  'crawl_time': '2025-05-29 12:38:03'},
 {'url': 'https://www.businessinsider.es/tecnolo

In [126]:
prompts_per_batch_job = 200
n_to_process = len(source_raw_contents)
n_batch_jobs = ceil(n_to_process/prompts_per_batch_job)
print("Creating {} batch files".format(n_batch_jobs))

Creating 1 batch files


In [127]:
system_prompt = """
Extract an article list from the following page content. Do not make up any information that's not in the provided text. If the provided content text contains no articles list (for instance due to a 'page not found' error), return an empty list.

You must adhere to the provided criteria and schema.
"""

In [128]:
def format_task_jsonl_line(task_id, deployment_name, user_input):
    jsonl_line_template = {
        "custom_id": task_id,
        "method": "POST",
        "url": "/chat/completions",
        "body": {
            "model": deployment_name,
            "messages": [
                {
                    "role": "system",
                    "content": system_prompt.replace("\n", "\\n")
                },
                {
                    "role": "user",
                    "content": user_input.replace("\n", "\\n")
                }
            ],
            "response_format": {
            "type": "json_schema",
            "json_schema": {
                "name": "ArticleLinksList",
                "strict": True,
                "schema": {
                    "type": "object",
                    "properties": {
                        "article_links_list": {
                            "type": "array",
                            "items" : {"$ref": "#/$defs/article_link"}
                        },
                    },
                    "$defs": {
                        "article_link" : {
                            "type" : "object",
                            "properties" : {
                                "title" : {
                                    "type": "string"
                                },
                                "url" : {
                                    "type": "string",
                                },
                                "keywords" : {
                                    "type": "array",
                                    "items": {
                                        "type": "string"
                                    }
                                },
                                "language": {
                                    "type": "string",
                                },
                            },
                            "required": [
                                "title",
                                "url",
                                "keywords",
                                "language"
                            ],
                            "additionalProperties": False
                        }
                    },
                    "required": [
                        "article_links_list",
                    ],
                    "additionalProperties": False
                }
            }
        }
        }
    }
    return jsonl_line_template

In [129]:
def generate_jsonl_lines(chunk_id, chunk_sources):
    for j,source_info in enumerate(chunk_sources):
        source_name = source_info['name'] 
        raw_jina_content = source_info['raw_jina_content']
        task_id = f"{RUNID}--{TASK_NAME}--{source_name}"
        deployment_name = DEPLOYMENT_NAME
        yield json.dumps(format_task_jsonl_line(task_id=task_id, deployment_name=deployment_name, user_input=raw_jina_content)) + "\n"
    

In [130]:
for i in range(n_batch_jobs):
    print(i)
    chunk = source_raw_contents[i*prompts_per_batch_job:min(n_to_process, (i+1)*prompts_per_batch_job)]
    batchfilename = f"{RUNID}--{TASK_NAME}_BATCHFILE_{i}.jsonl"

    with open(OUTPUT_DATA_PATH + batchfilename, "w") as f:
        for line in generate_jsonl_lines(chunk_id=i, chunk_sources=chunk):
            f.write(line)

0
