# More complex prompts

As well as simple plaintext prompts, we can specify the full json of openai prompts. This allows us to support more advanced features such as multiple prompts (e.g. a system and user prompt), as well as [structured outputs](https://platform.openai.com/docs/guides/structured-outputs).

Let's solve a slightly more complicated problem - extracting the names, ages, and sex of patients visiting a doctor from clinical notes:

In [9]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # for table pretty printing

data = [
    {
        "record_id": "7384711",
        "date": "2025-01-01",
        "notes": "Patient John Doe, a 45-year-old male, presents with a history of hypertension and complains of shortness of breath"
    },
    {
        "record_id": "579110",
        "date": "2025-01-02",
        "notes": "Mrs. Smith, a 60-year-old female, has been experiencing severe back pain for the past month. She mentions no history of recent injuries."
    },
    {
        "record_id": "8664564",
        "date": "2025-01-05",
        "notes": "Patient is a 30-year-old male who presents with symptoms consistent with the flu. No known chronic conditions are reported."
    },
]

df = spark.createDataFrame(data)
df

date,notes,record_id
2025-01-01,"Patient John Doe,...",7384711
2025-01-02,"Mrs. Smith, a 60-...",579110
2025-01-05,Patient is a 30-y...,8664564


Instead of a simple plaintext prompt, let's form a json which includes both a system and user prompt, as well as enforcing a json output format:

In [74]:
import pyspark.sql.functions as F

df = df.withColumn(
    "prompt",
    F.concat(
        F.lit("""\
        {
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "model": "gpt-3.5-turbo-0125",
                "messages": [
                    {
                        "role": "system",
                        "content": [
                            {
                                "type": "text",
                                "text": "You are an expert name, age, and sex extractor."
                            }
                        ]
                    },
                    {
                        "role": "user",
                        "content": [
                            {
                                "type": "text",
                                "text": "Extract the patient name, age, and sex from the following clinical note. \\nReturn a JSON format with the keys ['name', 'age', 'sex']:\\n\\n"""),
        F.col("notes"),
        F.lit(""" "
                            }
                        ]
                    }
                ],
                "temperature": 1,
                "max_tokens": 2048,
                "top_p": 1,
                "frequency_penalty": 0,
                "presence_penalty": 0,
                "response_format": { "type": "json_object" }
            }
        }""")
    ),
)
df

date,notes,record_id,prompt
2025-01-01,"Patient John Doe,...",7384711,{\n ...
2025-01-02,"Mrs. Smith, a 60-...",579110,{\n ...
2025-01-05,Patient is a 30-y...,8664564,{\n ...


Now we setup the openai client

In [None]:
from openai import OpenAI
client = OpenAI()

Now let's import the `spark_batch_ai` library and process the table. We specify `prompt_is_json` which tells the model to consider the prompt as json, and the `response_format` which allows spark to parse the openai output:

In [81]:
from pyspark_batch_ai import process_dataframe
import pyspark.sql.types as T

df_with_result = process_dataframe(
    df,
    client,
    prompt_is_json=True,
    response_schema=T.StructType([
            T.StructField("name", T.StringType()),
            T.StructField("age", T.StringType()),
            T.StructField("sex", T.StringType())
    ])
).drop("prompt")
df_with_result

[32m2025-01-10 12:42:43.149[0m | [1mINFO    [0m | [36mpyspark_batch_ai.core[0m:[36mprocess_dataframe[0m:[36m120[0m - [1mDetected output format: json[0m
[32m2025-01-10 12:42:43.318[0m | [1mINFO    [0m | [36mpyspark_batch_ai.core[0m:[36m_submit_and_process[0m:[36m366[0m - [1mTotal number of jobs to run: 1[0m
[32m2025-01-10 12:42:45.084[0m | [1mINFO    [0m | [36mpyspark_batch_ai.core[0m:[36m_submit_and_process[0m:[36m374[0m - [1mCurrently running: 1, Jobs left in queue: 0[0m
[32m2025-01-10 12:43:45.781[0m | [1mINFO    [0m | [36mpyspark_batch_ai.core[0m:[36mmonitor_batches[0m:[36m319[0m - [1mBatch ID: batch_678115c4e6188190981d0beac0c5df0e, Status changed from validating to in_progress[0m
[32m2025-01-10 12:44:46.046[0m | [1mINFO    [0m | [36mpyspark_batch_ai.core[0m:[36mmonitor_batches[0m:[36m319[0m - [1mBatch ID: batch_678115c4e6188190981d0beac0c5df0e, Status changed from in_progress to completed[0m
[32m2025-01-10 12:44:46.046[

date,notes,record_id,response
2025-01-01,"Patient John Doe,...",7384711,"{John Doe, 45, male}"
2025-01-02,"Mrs. Smith, a 60-...",579110,"{Mrs. Smith, 60, ..."
2025-01-05,Patient is a 30-y...,8664564,"{Patient, 30, male}"


You can then access `response.name`, `response.age` etc, or split them into seperate columns with:

In [82]:
df_with_result.select("*", "response.*").drop("response")

date,notes,record_id,name,age,sex
2025-01-01,"Patient John Doe,...",7384711,John Doe,45,male
2025-01-02,"Mrs. Smith, a 60-...",579110,Mrs. Smith,60,female
2025-01-05,Patient is a 30-y...,8664564,Patient,30,male
