# Azure OpenAI for Big Data

The Azure OpenAI service can be used to solve a large number of natural language tasks through prompting the completion API. To make it easier to scale your prompting workflows from a few examples to large datasets of examples we have integrated the Azure OpenAI service with the distributed machine learning library [SynapseML](https://www.microsoft.com/en-us/research/blog/synapseml-a-simple-multilingual-and-massively-parallel-machine-learning-library/). This integration makes it easy to use the [Apache Spark](https://spark.apache.org/) distributed computing framework to process millions of prompts with the OpenAI service. This tutorial shows how to apply large language models at a distributed scale using Azure Open AI and Azure Synapse Analytics. 

## Step 1: Prerequisites

The key prerequisites for this quickstart include a working Azure OpenAI resource, and an Apache Spark cluster with SynapseML installed. We suggest creating a Synapse workspace, but an Azure Databricks, HDInsight, or Spark on Kubernetes, or even a python environment with the `pyspark` package will work. 

1. An Azure OpenAI resource – request access [here](https://customervoice.microsoft.com/Pages/ResponsePage.aspx?id=v4j5cvGGr0GRqy180BHbR7en2Ais5pxKtso_Pz4b1_xUOFA5Qk1UWDRBMjg0WFhPMkIzTzhKQ1dWNyQlQCN0PWcu) before [creating a resource](https://docs.microsoft.com/en-us/azure/cognitive-services/openai/how-to/create-resource?pivots=web-portal#create-a-resource)
1. [Create a Synapse workspace](https://docs.microsoft.com/en-us/azure/synapse-analytics/get-started-create-workspace) or Use Databricks
1. [Create a serverless Apache Spark pool](https://docs.microsoft.com/en-us/azure/synapse-analytics/ get-started-analyze-spark#create-a-serverless-apache-spark-pool) or Use Databricks


## Step 2: Import this guide as a notebook

The next step is to add this code into your Spark cluster. You can either create a notebook in your Spark platform and copy the code into this notebook to run the demo. Or download the notebook and import it into Synapse Analytics

1. Install SynapseML on your cluster. Please see the installation instructions for Synapse at the bottom of [the SynapseML website](https://microsoft.github.io/SynapseML/). Note that this requires pasting an additional cell at the top of the notebook you just imported
3.	Connect your notebook to a cluster and follow along, editing and runing the cells below.

## Step 3: Fill in your service information

Next, please edit the cell in the notebook to point to your service. In particular set the `service_name`, `deployment_name`, `location`, and `key` variables to match those for your OpenAI service. In case you are using AAD, you don't need to set the key but you need to install ```pip install azure-identity``` to run the authentication flow.

In [None]:
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret

# Bootstrap Spark Session


# Fill in the following lines with your service information
service_name = ""
deployment_name = "gpt-35-turbo"
deployment_name_embeddings = "text-embedding-ada-002"
deployment_name_embeddings_query = "text-similarity-ada-001"

key = ""  # please replace this with your key as a string. 

assert key is not None and service_name is not None

## Step 4: Create the OpenAICompletion Apache Spark Client

To apply the OpenAI Completion service to your dataframe you just created, create an OpenAICompletion object which serves as a distributed client. Parameters of the service can be set either with a single value, or by a column of the dataframe with the appropriate setters on the `OpenAICompletion` object. Here we are setting `maxTokens` to 2000. A token is around 4 characters, and this limit applies to the sum of the prompt and the result.

In [None]:
from synapse.ml.cognitive import OpenAICompletion

completion = (
    OpenAICompletion(stop= "<|im_end|>")
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setUrl("https://{}.openai.azure.com/".format(service_name))
    .setMaxTokens(2000)
    .setPromptCol("prompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

#### For AAD authentication

Option 1: Use DeviceCodeCredential to login interactively using user token

In [None]:
#If you use AAD as authenitcation method, make sure you are assigned Cognitive Services User role
from azure.identity import DeviceCodeCredential
from synapse.ml.cognitive import OpenAICompletion

interactive_credential = DeviceCodeCredential() # add tenant_id="YOUR_TENANT_ID" if you have more than 1 tenants
token = interactive_credential.get_token("https://cognitiveservices.azure.com/.default")

completion = (
    OpenAICompletion(stop= "<|im_end|>", AADToken=token.token)
    .setDeploymentName(deployment_name)
    .setUrl("https://{}.openai.azure.com/".format(service_name))
    .setMaxTokens(2000)
    .setPromptCol("prompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

Option 2: Use DefaultAzureCredential to acquire token using AAD Service Principal

In [None]:
os.environ["AZURE_TENANT_ID"] = TENANT_ID
os.environ["AZURE_CLIENT_ID"] = OPENAI_APP_ID
os.environ["AZURE_CLIENT_SECRET"] = OPENAI_APP_SECRET

In [None]:
from azure.identity import DefaultAzureCredential
import openai

# Request credential
default_credential = DefaultAzureCredential()
token = default_credential.get_token("https://cognitiveservices.azure.com/.default")

completion = (
    OpenAICompletion(stop= "<|im_end|>", AADToken=token.token)
    .setDeploymentName(deployment_name)
    .setUrl("https://{}.openai.azure.com/".format(service_name))
    .setMaxTokens(2000)
    .setPromptCol("prompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

## Step 5: Create a dataset for summary

Next, create a dataframe consisting of a series of rows, with one prompt per row. 

You can also load data directly from ADLS or other databases. For more information on loading and preparing Spark dataframes, see the [Apache Spark data loading guide](https://spark.apache.org/docs/latest/sql-data-sources.html).

In [None]:
from pyspark.sql.functions import lit, col, concat

import pandas as pd
request = "Generate a customer support transcript between A HR/Payroll customer representative and an employee of a company. Just generate the transcript without adding any comment as the output is used by another system as is"
pandas_df = pd.DataFrame({"prompt":[f"<|im_start|>system\nAssistant is a large language model trained by OpenAI.\n<|im_end|>\n<|im_start|>user\n{request}<|im_end|>\n<|im_start|>assistant\n"]*6})
data_df = spark.createDataFrame(pandas_df)
completed_df = completion.transform(data_df).cache()

out =completed_df.select(
    col("completions.choices.text").getItem(0).alias("text"),
)
out.write.mode("overwrite").saveAsTable("adp_customer_trans")
display(out)

## Step 6: Transform the dataframe with the OpenAICompletion Client

Now that you have the dataframe and the completion client, you can transform your input dataset and add a column called `completions` with all of the information the service adds. We will select out just the text for simplicity.

In [None]:
request = "summarize the following text"
sum_df = spark.table("adp_customer_trans")
sum_df = sum_df.withColumn('prompt',concat(lit(f"<|im_start|>system\nAssistant is a large language model trained by OpenAI.\n<|im_end|>\n<|im_start|>user\n{request}:\n"), col('text'), lit("<|im_end|>\n<|im_start|>assistant\n")))
summary_result =completion.transform(sum_df.select(col("prompt"))).cache()



In [None]:
display(summary_result.select(
    col("completions.choices.text").getItem(0).alias("summary"),
))

## Additional Usage Examples

### Improve throughput with request batching 

The example above makes several requests to the service, one for each prompt. To complete multiple prompts in a single request, use batch mode. First, in the OpenAICompletion object, instead of setting the Prompt column to "Prompt", specify "batchPrompt" for the BatchPrompt column.
To do so, create a dataframe with a list of prompts per row.

**Note** that as of this writing there is currently a limit of 20 prompts in a single request, as well as a hard limit of 2048 "tokens", or approximately 1500 words.

In [None]:
import pandas as pd
chat_script = """
Customer: Hi, I'm having some issues with my payroll. Can you help me? 
Customer Service Representative: Yes, of course. Can you tell me more about the problem you are having? 
Customer: Well, my paycheck seems to be incorrect. I'm not sure what's going on. 
Customer Service Representative: I'm sorry to hear that. Let's take a look. Can you tell me your company name and your employee ID number? 
Customer: Sure. My company is XYZ Corporation, and my employee ID is 12345. 
Customer Service Representative: Thank you. Let me pull up your information. Okay, I see that you were paid last Friday. Is that the paycheck you are referring to? 
Customer: Yes, that's the one. 
Customer Service Representative: Okay, let me take a closer look. I see that your gross pay is correct, but there seem to be some deductions that are higher than usual. Can you tell me if anything has changed recently, like your tax withholding or benefits enrollment? 
Customer: No, I don't think so. I haven't made any changes to my benefits, and I don't remember changing my tax withholding either. 
Customer Service Representative: Okay, let me check your account settings. It looks like your tax withholding was changed to a higher rate last month. Did you make that change yourself? 
Customer: No, I didn't. I'm not sure how that could have happened. 
Customer Service Representative: It's possible that it was an error or a miscommunication with your HR department. I can help you correct it now. Can you confirm your current tax filing status and the number of allowances you want to claim? 
Customer: Yes, I'm single and I want to claim one allowance. 
Customer Service Representative: Okay, let me make that change for you. It should reflect on your next paycheck. Is there anything else I can help you with? 
Customer: Actually, while we're on the topic of taxes, I have a question about my W-2 form. 
Customer Service Representative: Sure, what's your question? 
Customer: I received my W-2 form in the mail, but I noticed that my social security number is incorrect. What should I do? 
Customer Service Representative: Oh no, I'm sorry to hear that. We will need to correct that as soon as possible. Can you confirm your correct social security number? 
Customer: Yes, it's 123-45-6789. 
Customer Service Representative: Thank you. I will update your records and issue a corrected W-2 form to you. Is there anything else you need assistance with today? 
Customer: Actually, yes. I was supposed to receive a bonus in my last paycheck, but I didn't see it on there. Can you help me figure out what happened? 
Customer Service Representative: Yes, let me check your payroll history. Okay, it looks like the bonus was not included in your last paycheck. I apologize for the error. I can process the bonus payment for you now and it should be reflected on your next paycheck. 
Customer: Thank you, I appreciate that. I also have a question about my 401(k) plan. How do I enroll in the plan and make changes to my contributions? 
Customer Service Representative: Great question. To enroll in the plan, you will need to speak with your HR department and fill out the necessary forms. Once you are enrolled, you can make changes to your contributions by logging in to your ADP account online or through our mobile app. Would you like me to assist you with that? 
Customer: Yes, please. 
(Customer Service Representative walks the customer through the process of logging in and making changes to their 401(k) contributions) 
Customer Service Representative: Okay, your changes have been saved. Is there anything else I can help you with today? 
Customer: Yes, actually. I have a question about my time off accrual. How do I check how much time I have left? 
Customer Service Representative: That's a great question. You can check your time off accrual and balance by logging in to your ADP account and viewing your pay statements. Your time off balance should be listed there. Do you need help logging in? 
Customer: No, I think I can handle that. Thank you. 
Customer Service Representative: You're welcome. Is there anything else I can help you with today? 
Customer: Yes, I have a question about my direct deposit. Can I split my paycheck between two bank accounts? 
Customer Service Representative: Yes, you can split your paycheck between two or more bank accounts. You will need to speak with your HR department to set up the split deposit and provide them with the necessary information for each account. Once the split deposit is set up, it should be reflected on your next paycheck. Is there anything else I can help you with today? 
Customer: No, that's all for now. Thank you for your help. 
Customer Service Representative: You're welcome. Don't hesitate to call us back if you have any more questions or concerns. Have a great day!
"""
pandas_df = pd.DataFrame({"batchPrompt":[[f"<|im_start|>system\nAssistant is a large language model trained by OpenAI.\n<|im_end|>\n<|im_start|>user\nSummarize the following conversation\n{item}<|im_end|>\n<|im_start|>assistant\n" for item in [chat_script]*5]]})
batch_df = spark.createDataFrame(pandas_df)
display(batch_df)

Next we create the OpenAICompletion object. Rather than setting the prompt column, set the batchPrompt column if your column is of type `Array[String]`.

In [None]:
batch_completion = (
    OpenAICompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setUrl("https://{}.openai.azure.com/".format(service_name))
    .setMaxTokens(2000)
    .setBatchPromptCol("batchPrompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

In [None]:
#If you use AAD as authenitcation method, make sure you are assigned Cognitive Services User role
from synapse.ml.cognitive import OpenAICompletion

batch_completion = (
    OpenAICompletion(stop= "<|im_end|>", AADToken=token.token)
    .setDeploymentName(deployment_name)
    .setUrl("https://{}.openai.azure.com/".format(service_name))
    .setMaxTokens(2000)
    .setBatchPromptCol("batchPrompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

In the call to transform a request will then be made per row. Since there are multiple prompts in a single row, each request will be sent with all prompts in that row. The results will contain a row for each row in the request.

In [None]:
completed_batch_df = batch_completion.transform(batch_df)
display(completed_batch_df)

### Using an automatic minibatcher

If your data is in column format, you can transpose it to row format using SynapseML's `FixedMiniBatcherTransformer`.

In [None]:
from pyspark.sql.types import StringType
from synapse.ml.stages import FixedMiniBatchTransformer
from synapse.ml.core.spark import FluentAPI
sum_df = spark.table("adp_customer_trans")
sum_df = sum_df.withColumn('prompt',concat(lit(f"<|im_start|>system\nAssistant is a large language model trained by OpenAI.\n<|im_end|>\n<|im_start|>user\n{request}:\n"), col('text'), lit("<|im_end|>\n<|im_start|>assistant\n")))

completed_autobatch_df = (
    sum_df.coalesce(
        2
    )  # Force a single partition so that our little 6-row dataframe makes a batch of size 3, you can remove this step for large datasets
    .mlTransform(FixedMiniBatchTransformer(batchSize=3))
    .withColumnRenamed("prompt", "batchPrompt")
    .mlTransform(batch_completion)
)

display(completed_autobatch_df)

# OpenAI Embeddings

We will use t-SNE to reduce the dimensionality of the embeddings from 1536 to 2. Once the embeddings are reduced to two dimensions, we can plot them in a 2D scatter plot.

In [None]:
from synapse.ml.cognitive import OpenAIEmbedding

embedding = (
    OpenAIEmbedding()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name_embeddings)
    .setUrl("https://{}.openai.azure.com/".format(service_name))
    .setTextCol("combined")
    .setErrorCol("error")
    .setOutputCol("embeddings")
)

In [None]:
import pyspark.sql.functions as F

df = spark.read.options(inferSchema="True", delimiter=",", header=True).csv(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/fine_food_reviews_1k.csv"
)

df = df.withColumn(
    "combined",
    F.format_string("Title: %s; Content: %s", F.trim(df.Summary), F.trim(df.Text)),
)

display(df)

In [None]:
from pyspark.sql.functions import col

completed_df = embedding.transform(df).cache()
display(completed_df)

## Retrieve embeddings

In [None]:
import numpy as np

matrix = np.array(completed_df.select("embeddings").collect())[:, 0, :]
matrix.shape

## Reduce dimensionality
We reduce the dimensionality to 2 dimensions using t-SNE decomposition.

In [None]:
import pandas as pd
from sklearn.manifold import TSNE
import numpy as np

# Create a t-SNE model and transform the data
tsne = TSNE(
    n_components=2, perplexity=15, random_state=42, init="random", learning_rate=200
)
vis_dims = tsne.fit_transform(matrix)
vis_dims.shape

## Plot the embeddings
We colour each review by its star rating, ranging from red for negative reviews, to green for positive reviews..

We can observe a decent data separation even in the reduced 2 dimensions.

In [None]:
import matplotlib.pyplot as plt
import matplotlib
import numpy as np

scores = np.array(completed_df.select("Score").collect()).reshape(-1)

colors = ["red", "darkorange", "gold", "turquoise", "darkgreen"]
x = [x for x, y in vis_dims]
y = [y for x, y in vis_dims]
color_indices = scores - 1

colormap = matplotlib.colors.ListedColormap(colors)
plt.scatter(x, y, c=color_indices, cmap=colormap, alpha=0.3)
for score in [0, 1, 2, 3, 4]:
    avg_x = np.array(x)[scores - 1 == score].mean()
    avg_y = np.array(y)[scores - 1 == score].mean()
    color = colors[score]
    plt.scatter(avg_x, avg_y, marker="x", color=color, s=100)

plt.title("Amazon ratings visualized in language using t-SNE")

## Use embeddings to build a semantic search Index

Note that for some OpenAI models, users should use separate models for embedding documents and queries. These models are denoted by the "-doc" and "-query" suffixes respectively.

In [None]:
embedding_query = (
    OpenAIEmbedding()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name_embeddings_query)
    .setUrl("https://{}.openai.azure.com/".format(service_name))
    .setTextCol("query")
    .setErrorCol("error")
    .setOutputCol("embeddings")
)

## Create a dataframe of search queries

Note: The data types of the ID columns in the document and query dataframes should be the same

In [None]:
query_df = (
    spark.createDataFrame(
        [
            (
                0,
                "desserts",
            ),
            (
                1,
                "disgusting",
            ),
        ]
    )
    .toDF("id", "query")
    .withColumn("id", F.col("id").cast("int"))
)

## Generate embeddings for queries

In [None]:
completed_query_df = embedding_query.transform(query_df).cache()

## Build index for fast retrieval

In [None]:
from synapse.ml.nn import *

knn = (
    KNN()
    .setFeaturesCol("embeddings")
    .setValuesCol("id")
    .setOutputCol("output")
    .setK(10)
)  # top-k for retrieval

knn_index = knn.fit(completed_df)

## Retrieve results

In [None]:
df_matches = knn_index.transform(completed_query_df).cache()

df_result = (
    df_matches.withColumn("match", F.explode("output"))
    .join(df, df["id"] == F.col("match.value"))
    .select("query", F.col("combined"), "match.distance")
)

display(df_result)