# Preparations

In [91]:
#import packages
from typing import Any, List, Mapping, Optional
import json
import urllib
import os
import requests
import numexpr as ne
import re
import numpy as np
from fractions import Fraction

In [92]:
def load_textfile(file_name):
    """Loads file content as a string, handling errors gracefully."""
    try:
        with open(file_name, 'r') as file:
            return file.read().strip()  # Strip removes unwanted newlines
    except FileNotFoundError:
        print(f"Error: File '{file_name}' not found.")
    except Exception as e:
        print(f"Error: {e}")

# Load API key and set as environment variable
if (api_key := load_textfile("gpt_api_key.pswrd")):  #you must use your API key for OpenAI in the pswrd file
    os.environ['OPENAI_API_KEY'] = api_key

# Create an instance of LLM object

In [93]:
class OpenAI_LLM:
    """
    A class to interact with OpenAI's GPT-based API for generating text responses.
    
    Attributes:
        api_key (str): OpenAI API key for authentication.
        model (str): The model to be used (default is "gpt-4o").
        temperature (float): Controls randomness in responses (lower is more deterministic).
        max_tokens (int): Maximum number of tokens allowed in the response.
        endpoint (str): API endpoint for making chat completion requests.
    """

    def __init__(self, api_key: str, model: str = "gpt-4o", temperature: float = 0.0, max_tokens: int = 15000):
        """
        Initializes the OpenAI_LLM instance with API credentials and model settings.

        Args:
            api_key (str): The OpenAI API key.
            model (str, optional): The model to use for generating responses. Defaults to "gpt-4o".
            temperature (float, optional): The degree of randomness in responses (0.0 = deterministic, 1.0 = high randomness). Defaults to 0.1.
            max_tokens (int, optional): The maximum number of tokens in the generated response. Defaults to 15000.
        """
        self.api_key = api_key
        self.model = model
        self.temperature = temperature
        self.max_tokens = max_tokens
        self.endpoint = "https://api.openai.com/v1/chat/completions"  # OpenAI API endpoint

    def generate_response(self, prompt: str) -> str:
        """
        Generates a response from the OpenAI API based on the provided prompt.

        Args:
            prompt (str): The input prompt for the AI model.

        Returns:
            str: The AI-generated response.

        Raises:
            Exception: If the API request fails or returns an error.
        """
        # Define request headers for authentication and content type
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }

        # Define request payload with model settings and user input
        payload = {
            "model": self.model,
            "messages": [{"role": "user", "content": prompt}],  # Structured conversation format
            "temperature": self.temperature,
            "max_tokens": self.max_tokens
        }

        try:
            # Send a POST request to OpenAI's API with the payload
            response = requests.post(self.endpoint, json=payload, headers=headers)

            # Ensure the response status is OK (200)
            if response.status_code == 200:
                response_data = response.json()  # Parse JSON response
                return response_data['choices'][0]['message']['content']  # Extract and return the AI's response
            
            else:
                # Handle API errors gracefully by raising an exception with details
                raise Exception(f"API request failed with status code {response.status_code}: {response.text}")

        except requests.RequestException as e:
            # Handle network-related errors (e.g., timeout, connectivity issues)
            raise Exception(f"Network error while communicating with OpenAI API: {e}")

In [94]:
#instanceiate llm object
llm = OpenAI_LLM(api_key=api_key)

# Definition of Prompt Template

In [95]:
def math_prompt_template(math_query):
    instruction = f"""
**Prompt:**

You are given a mathematical problem described in text form. Your task is to convert 
this problem into a `numexpr` input string, which is a format used for evaluating 
mathematical expressions efficiently in Python. 
Follow these steps:

1. break the solution down into small meaningful steps. The steps must be as small as possible.
2. each step should have only one output variable if possible
3. specify the steps in a json structure with specifying the step, description of the step, 
   input variables, equation and output variables
4. be aware, in json no inline arithmetic is allowed. 


**Example 1:**

**Textual Problem:**  
"The supply of a ship with a crew of 15 men is designed for 40 days.
How long can the ship stay at sea in total, if after 28 days, 5 men and 1 woman come aboard, 
and the woman consumes one-third less provisions per day than a man?"

**JSON structure:**  
{{
"steps": [
    {{
    "step": 1,
    "description": "Calculate the total initial provision of the ship in person-days.",
    "input_variables": {{
        "initial_person_count": 15,
        "initial_days": 40
    }},
    "equation": "initial_person_count * initial_days",
    "output_variables": {{
        "total_provision": 0
    }}
    }},
    {{
    "step": 2,
    "description": "Determine the remaining provision after 28 days with 15 persons.",
    "input_variables": {{
        "initial_person_count": 15,
        "elapsed_days": 28,
        "total_provision": 600
    }},
    "equation": "total_provision - (initial_person_count * elapsed_days)",
    "output_variables": {{
        "remaining_provision": 0
    }}
    }},
    {{
    "step": 3,
    "description": "Calculate the effective number of persons after reinforcements.",
    "input_variables": {{
        "remaining_person_count": 15,
        "new_person_count": 5,
        "additional_person_fraction": "2/3"
    }},
    "equation": "remaining_person_count + new_person_count + (2 / 3)",
    "output_variables": {{
        "effective_person_count": 0
    }}
    }},
    {{
    "step": 4,
    "description": "Determine the additional days the ship can remain at sea with the remaining provision.",
    "input_variables": {{
        "remaining_provision": 180,
        "effective_person_count": 20.67
    }},
    "equation": "remaining_provision / effective_person_count",
    "output_variables": {{
        "additional_days": 0
    }}
    }},
    {{
    "step": 5,
    "description": "Calculate the total duration the ship can remain at sea.",
    "input_variables": {{
        "elapsed_days": 28,
        "additional_days": 8.71
    }},
    "equation": "elapsed_days + additional_days",
    "output_variables": {{
        "total_duration": 0
    }}
    }}
]
}}

**Instructions:**

Now, apply the same process to the following problem:

***Textual Problem:***
{math_query}
"""
    return instruction

# Import mathematical task and 1st llm call to get calculation instructions

TASK selection

In [96]:
task = load_textfile("task_piplinecalc.txt")
#task = load_textfile("task_probabilitycalc.txt")
print(task)

A pipeline of 4,000 meters is to be laid by 22 workers in 64 workdays, with each day consisting of 8 hours. After 10 workdays, 3 workers drop out. How many overtime hours would each of the remaining workers need to work per day to complete the now 5,000-meter pipeline on schedule (by the 64th workday)?


In [97]:
# Get the response from GPT API
response = llm.generate_response(math_prompt_template(task))

In [98]:
#extract from the llm response the step wise calculation instructions
match = re.search(r'json(.*)', response, re.DOTALL)
if match:
    response = re.search(r'json(.*)', response, re.DOTALL)
    response = str(response.group(0)).strip()
    response = re.search(r'\{.*\}', response, re.DOTALL)
    response = str(response.group(0)).strip()
else:
    response = re.search(r'\{.*\}', response, re.DOTALL)
    response = str(response.group(0)).strip()

In [99]:
print(response)

{
  "steps": [
    {
      "step": 1,
      "description": "Calculate the total work required in worker-hours to lay the original 4,000-meter pipeline.",
      "input_variables": {
        "pipeline_length": 4000,
        "total_workers": 22,
        "total_days": 64,
        "hours_per_day": 8
      },
      "equation": "total_workers * total_days * hours_per_day",
      "output_variables": {
        "total_worker_hours": 0
      }
    },
    {
      "step": 2,
      "description": "Calculate the work rate in meters per worker-hour for the original pipeline.",
      "input_variables": {
        "pipeline_length": 4000,
        "total_worker_hours": 11264
      },
      "equation": "pipeline_length / total_worker_hours",
      "output_variables": {
        "work_rate": 0
      }
    },
    {
      "step": 3,
      "description": "Calculate the total work required in worker-hours to lay the new 5,000-meter pipeline.",
      "input_variables": {
        "new_pipeline_length": 5000,
     

# 2nd llm call to correct possible json errors

In [100]:
def json_prompt_template(json_input):
    corrected_json = f"""
**Prompt:**

You are expert for json files. The input string is a json. 
Change the strin in order to match all requirements to a json file.
Return only the raw json within {{}}

**Example 1:**

**Input string:**  
a
{{
    "glossary": {{
        "title": "example glossary",
		"GlossDiv": {{
            "title": "S",
			"GlossList": {{
                "GlossEntry": {{
                    "ID": "SGML",
					"SortAs": "SGML",
					"GlossTerm": "Standard Generalized Markup Language",
				"Acronym": "SGML",
					"Abbrev": "ISO 8879:1986",
					"GlossDef": {{
                        "para": "A meta-markup language, used to create markup languages such as DocBook.",
						"GlossSeeAlso": ["GML", "XML"]
                    }},
					"GlossSee": "markup"
                }}
            }}
        }}
    }}
}}

**corrected_json:**  
{{
    "glossary": {{
        "title": "example glossary",
		"GlossDiv": {{
            "title": "S",
			"GlossList": {{
                "GlossEntry": {{
                    "ID": "SGML",
					"SortAs": "SGML",
					"GlossTerm": "Standard Generalized Markup Language",
				    "Acronym": "SGML",
					"Abbrev": "ISO 8879:1986",
					"GlossDef": {{
                        "para": "A meta-markup language, used to create markup languages such as DocBook.",
						"GlossSeeAlso": ["GML", "XML"]
                    }},
					"GlossSee": "markup"
                }}
            }}
        }}
    }}
}}


{json_input}
"""
    return corrected_json

In [101]:
#2nd llm call to correct json syntax
response = llm.generate_response(json_prompt_template(response))
#extract the stepwise calculation instructions, located between {}
response = re.search(r'\{.*\}', response, re.DOTALL)
response = str(response.group(0)).strip() 
#convert json into dictionary
response_dict = json.loads(response)

In [102]:
# Function to safely convert values to float
def convert_to_float(value):
    if isinstance(value, (int, float)):  
        return float(value)  # Already a number
    
    if isinstance(value, str):
        if "/" in value:  # Detect fraction format
            try:
                return float(Fraction(value))  # Convert fraction string to float
            except ValueError:
                raise ValueError(f"❌ Invalid fraction: {value}")
        else:
            try:
                return float(value)  # Convert numeric string
            except ValueError:
                raise ValueError(f"❌ Invalid number format: {value}")
    
    raise TypeError(f"❌ Unsupported type: {type(value)} for value {value}")

In [103]:
#define an overall dictionary for all coming variables
global_input_variables = {}
total_steps = len(response_dict['steps'])
#loop over all steps, suggested by the llm
for step in range(total_steps):
    #store input variables from step in overall dictionary
    input_variables =  response_dict['steps'][step]['input_variables']
    for key, value in input_variables.items():
        global_input_variables[key] = convert_to_float(value)
    #get calculation instruction
    equation = response_dict['steps'][step]['equation']
    # visualize stepwise calculation
    print(f"step {step}\n{response_dict['steps'][step]['description']}")
    print(f"NumExpr({equation})")

    #pick up the input variables needed for calculations in this step
    input_next_step = {}
    for key, value in input_variables.items():
        input_next_step[key] = global_input_variables[key]

    #execute mathematical instructions with NumExpr
    print(f"inputnextstep: {input_next_step}")
    result = ne.evaluate(equation, local_dict=input_next_step)
    #print(f"result: {result}")
    #checking the expected output variables
    output_variables =  response_dict['steps'][step]['output_variables']
    #store the calculated outputs into the expected variables
    for key, value in output_variables.items():
        global_input_variables[key] = result.item()


# Extract the final result after all steps
final_result = result.item()

# Simulate larger font by using block-style formatting
highlighted_result = f"\n\033[1;32m" + "▓" * 50 + f"\n🎯  FINAL RESULT: {final_result}  🎯\n" + "▓" * 50 + "\033[0m"

print(highlighted_result)

step 0
Calculate the total work required in worker-hours to lay the original 4,000-meter pipeline.
NumExpr(total_workers * total_days * hours_per_day)
inputnextstep: {'pipeline_length': 4000.0, 'total_workers': 22.0, 'total_days': 64.0, 'hours_per_day': 8.0}
step 1
Calculate the work rate in meters per worker-hour for the original pipeline.
NumExpr(pipeline_length / total_worker_hours)
inputnextstep: {'pipeline_length': 4000.0, 'total_worker_hours': 11264.0}
step 2
Calculate the total work required in worker-hours to lay the new 5,000-meter pipeline.
NumExpr(new_pipeline_length / work_rate)
inputnextstep: {'new_pipeline_length': 5000.0, 'work_rate': 0.355}
step 3
Calculate the remaining workdays after 10 workdays have passed.
NumExpr(total_days - elapsed_days)
inputnextstep: {'total_days': 64.0, 'elapsed_days': 10.0}
step 4
Calculate the remaining worker-hours needed after 10 workdays.
NumExpr(new_total_worker_hours - elapsed_worker_hours)
inputnextstep: {'new_total_worker_hours': 1408