In [None]:
%%configure -f
{
  "name": "synapseml",
  "conf": {
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.0,org.apache.spark:spark-avro_2.12:3.3.1",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,com.fasterxml.jackson.core:jackson-databind",
      "spark.yarn.user.classpath.first": "true",
      "spark.sql.parquet.enableVectorizedReader": "false",
      "spark.sql.legacy.replaceDatabricksSparkAvro.enabled": "true"
  }
}

In [None]:
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

if running_on_synapse():
    from notebookutils.visualization import display

# Fill in the following lines with your service information
service_name = "xxxxxxx" # please replace this with your Service Name
deployment_name = "text-davinci-003"
deployment_name_embeddings = "text-search-ada-doc-001"
deployment_name_embeddings_query = "text-search-ada-query-001"

key = find_secret = "xxxxxxxx"  # please replace this with your key as a string

assert key is not None and service_name is not None

In [None]:
df = spark.createDataFrame(
    [
        ("Hello my name is",),
        ("The best code is code thats",),
        ("SynapseML is ",),
    ]
).toDF("prompt")

In [None]:
from synapse.ml.cognitive import OpenAICompletion

completion = (
    OpenAICompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setUrl("https://{}.openai.azure.com/".format(service_name))
    .setMaxTokens(200)
    .setPromptCol("prompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

In [None]:
from pyspark.sql.functions import col

completed_df = completion.transform(df).cache()
display(
    completed_df.select(
        col("prompt"),
        col("error"),
        col("completions.choices.text").getItem(0).alias("text"),
    )
)

In [None]:
batch_df = spark.createDataFrame(
    [
        (["The time has come", "Pleased to", "Today stocks", "Here's to"],),
        (["The only thing", "Ask not what", "Every litter", "I am"],),
    ]
).toDF("batchPrompt")

In [None]:
batch_completion = (
    OpenAICompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setUrl("https://{}.openai.azure.com/".format(service_name))
    .setMaxTokens(200)
    .setBatchPromptCol("batchPrompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

In [None]:
completed_batch_df = batch_completion.transform(batch_df).cache()
display(completed_batch_df)

In [None]:
from pyspark.sql.types import StringType
from synapse.ml.stages import FixedMiniBatchTransformer
from synapse.ml.core.spark import FluentAPI

completed_autobatch_df = (
    df.coalesce(
        1
    )  # Force a single partition so that our little 4-row dataframe makes a batch of size 4, you can remove this step for large datasets
    .mlTransform(FixedMiniBatchTransformer(batchSize=4))
    .withColumnRenamed("prompt", "batchPrompt")
    .mlTransform(batch_completion)
)

display(completed_autobatch_df)

In [None]:
translate_df = spark.createDataFrame(
    [
        ("Japanese: Ookina hako \nEnglish: Big box \nJapanese: Midori tako\nEnglish:",),
        (
            "French: Quel heure et il au Montreal? \nEnglish: What time is it in Montreal? \nFrench: Ou est le poulet? \nEnglish:",
        ),
    ]
).toDF("prompt")

display(completion.transform(translate_df))

In [None]:
qa_df = spark.createDataFrame(
    [
        (
            "Q: Where is the Grand Canyon?\nA: The Grand Canyon is in Arizona.\n\nQ: What is the weight of the Burj Khalifa in kilograms?\nA:",
        )
    ]
).toDF("prompt")

display(completion.transform(qa_df))

In [None]:
from synapse.ml.cognitive import OpenAIEmbedding

embedding = (
    OpenAIEmbedding()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name_embeddings)
    .setUrl("https://{}.openai.azure.com/".format(service_name))
    .setTextCol("combined")
    .setErrorCol("error")
    .setOutputCol("embeddings")
)

In [None]:
import pyspark.sql.functions as F

df = spark.read.options(inferSchema="True", delimiter=",", header=True).csv(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/fine_food_reviews_1k.csv"
)

df = df.withColumn(
    "combined",
    F.format_string("Title: %s; Content: %s", F.trim(df.Summary), F.trim(df.Text)),
)

display(df)

In [None]:
from pyspark.sql.functions import col

completed_df = embedding.transform(df).cache()
display(completed_df)

In [None]:
import numpy as np

matrix = np.array(completed_df.select("embeddings").collect())[:, 0, :]
matrix.shape

In [None]:
import pandas as pd
from sklearn.manifold import TSNE
import numpy as np

# Create a t-SNE model and transform the data
tsne = TSNE(
    n_components=2, perplexity=15, random_state=42, init="random", learning_rate=200
)
vis_dims = tsne.fit_transform(matrix)
vis_dims.shape

In [None]:
import matplotlib.pyplot as plt
import matplotlib
import numpy as np

scores = np.array(completed_df.select("Score").collect()).reshape(-1)

colors = ["red", "darkorange", "gold", "turquoise", "darkgreen"]
x = [x for x, y in vis_dims]
y = [y for x, y in vis_dims]
color_indices = scores - 1

colormap = matplotlib.colors.ListedColormap(colors)
plt.scatter(x, y, c=color_indices, cmap=colormap, alpha=0.3)
for score in [0, 1, 2, 3, 4]:
    avg_x = np.array(x)[scores - 1 == score].mean()
    avg_y = np.array(y)[scores - 1 == score].mean()
    color = colors[score]
    plt.scatter(avg_x, avg_y, marker="x", color=color, s=100)

plt.title("Amazon ratings visualized in language using t-SNE")

In [None]:
embedding_query = (
    OpenAIEmbedding()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name_embeddings_query)
    .setUrl("https://{}.openai.azure.com/".format(service_name))
    .setTextCol("query")
    .setErrorCol("error")
    .setOutputCol("embeddings")
)

In [None]:
query_df = (
    spark.createDataFrame(
        [
            (
                0,
                "desserts",
            ),
            (
                1,
                "disgusting",
            ),
        ]
    )
    .toDF("id", "query")
    .withColumn("id", F.col("id").cast("int"))
)

In [None]:
completed_query_df = embedding_query.transform(query_df).cache()

In [None]:
from synapse.ml.nn import *

knn = (
    KNN()
    .setFeaturesCol("embeddings")
    .setValuesCol("id")
    .setOutputCol("output")
    .setK(10)
)  # top-k for retrieval

knn_index = knn.fit(completed_df)

In [None]:
df_matches = knn_index.transform(completed_query_df).cache()

df_result = (
    df_matches.withColumn("match", F.explode("output"))
    .join(df, df["id"] == F.col("match.value"))
    .select("query", F.col("combined"), "match.distance")
)

display(df_result)