### About
This notebook reads data from Azure SQL table, runs a custom LLM prompt against gpt-35-turbo(0613), saves results back into a new Azure SQL table.

In [43]:
print(sc.version)

StatementMeta(sparkpool1, 8, 42, Finished, Available)

3.3.1.5.2-100223822


### Imports

In [44]:
from synapse.ml.cognitive import OpenAIChatCompletion
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import concat, lit, array, udf, col, to_json

StatementMeta(sparkpool1, 8, 43, Finished, Available)

### SynapseML installation

Ref: https://microsoft.github.io/SynapseML/

(Instructions at the bottom of webpage)

In [45]:
#One-off

# %%configure -f
# {
#   "name": "synapseml",
#   "conf": {
#       "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.2-spark3.3",
#       "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
#       "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,com.fasterxml.jackson.core:jackson-databind",
#       "spark.yarn.user.classpath.first": "true",
#       "spark.sql.parquet.enableVectorizedReader": "false"
#   }
# }

StatementMeta(sparkpool1, 8, 44, Finished, Available)

### Note

Spark MS SQL Connector with spark 3.3

<b>Error</b>: com.microsoft.sqlserver.jdbc.spark

<b>Resolution</b>: 
Download on computer from here: https://repo1.maven.org/maven2/com/microsoft/azure/spark-mssql-connector_2.12/1.3.0-BETA/spark-mssql-connector_2.12-1.3.0-BETA.jar

Then upload to workspace (Manage->Worksspace packages->Upload)

Then add package to spark pool(Manage->Apache spark pool->Select the pool name->click...->Packages and select the uploaded workspace package)


### SQL Connection Configuration

Ref: https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/data-sources/apache-spark-sql-connector

Ref: https://learn.microsoft.com/en-us/sql/connect/spark/connector?view=sql-server-ver16

#### AKV
Added as a linked service to the workspace (Granted workspace MI access to AKV)

In [46]:
# The servername is in the format "jdbc:sqlserver://<AzureSQLServerName>.database.windows.net:1433"
servername = "jdbc:sqlserver://<<Azure SQL Server>>.database.windows.net:1433"
dbname = "<<Azure SQL DB>>"
url = servername + ";" + "databaseName=" + dbname + ";"
dbtable = "(SELECT TOP 20 * FROM dbo.HotelReviews WHERE LEN([Review Comments]) > 0) AS HotelReviews" #Update/remove TOP
dbtable_out = "HotelReviews_Out" #Output table
user = mssparkutils.credentials.getSecret('<<Azure Key Vault>>','AZSQLUSR')
password = mssparkutils.credentials.getSecret('<<Azure Key Vault>>','AZSQLPWD')


StatementMeta(sparkpool1, 8, 45, Finished, Available)

### Read from SQL DB

In [47]:
#Read from SQL table using MS SQL Connector
print("Reading data from a SQL server table ")
jdbcDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", dbtable) \
        .option("user", user) \
        .option("password", password).load()

# jdbcDF.show(5)
display(jdbcDF.limit(5))


StatementMeta(sparkpool1, 8, 46, Finished, Available)

Reading data from a SQL server table 


SynapseWidget(Synapse.DataFrame, 040f52e5-dc9e-4436-9f80-015e0b7a0d85)

### OpenAI Configuration

Ref: https://microsoft.github.io/SynapseML/docs/Explore%20Algorithms/OpenAI/

In [48]:
from synapse.ml.core.platform import find_secret

service_name = mssparkutils.credentials.getSecret('<<Azure Key Vault>>','OPENAISERVICENAME')
deployment_name = "<<AOAI Deployment Name>>" #e.g. "gpt-35-turbo(0613)"

key = mssparkutils.credentials.getSecret('<<Azure Key Vault>>','OPENAIAPIKEY')

assert key is not None and service_name is not None

StatementMeta(sparkpool1, 8, 47, Finished, Available)

### Transformation steps

In [49]:
base_system_message = "You are the AI analyst and lead the change management team in hospitality industry."

base_prompt = """You are provided with review comment submitted by the guests after their stay at the hotel. 
Perform a thorough analysis of the comments written by our guests.

If comment has a positive sentiment, identify what we need to be keep doing.
If comment has a negative sentiment, identify what we need to improve.
For neutral sentiment comment, say "N/A"

Format your findings as per structure below:
{
    "sentiment":"positive" / "negative" / "neutral",
    "identified_action": "",
    "action_type": "continue" / "improve" / "N/A"
}

Only return one finding per review comment.

Provided review comments:
"""

StatementMeta(sparkpool1, 8, 48, Finished, Available)

In [50]:
def make_message(role1, content1, role2, content2):
    return [
        Row(role=role1, content=content1, name=role1),
        Row(role=role2, content=content2, name=role2) 
        ]

make_message_udf = udf(make_message, ArrayType(StructType([
    StructField('role', StringType(), True),
    StructField('content', StringType(), True),
    StructField('name', StringType(), True)
])))

jdbcDF = jdbcDF.withColumn(
    "messages", 
    make_message_udf(
        lit("system"), 
        lit(base_system_message),
        lit("user"), 
        concat(lit(base_prompt),jdbcDF["Review Comments"])
        )
    
    )

display(jdbcDF.limit(5))

StatementMeta(sparkpool1, 8, 49, Finished, Available)

SynapseWidget(Synapse.DataFrame, f3f67c75-9893-4767-82f7-a1e61914ee17)

In [51]:
chat_completion = (
    OpenAIChatCompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMaxTokens(200)
    .setTemperature(0.5)
    .setMessagesCol("messages")
    .setErrorCol("error")
    .setOutputCol("chat_completions")
)

StatementMeta(sparkpool1, 8, 50, Finished, Available)

In [52]:
chat_completionDF=chat_completion.transform(jdbcDF)
display(chat_completionDF)

StatementMeta(sparkpool1, 8, 51, Finished, Available)

SynapseWidget(Synapse.DataFrame, d6362ae1-800a-4687-be3b-76c1b28d0527)

In [53]:
display(
    chat_completionDF.select(
        "messages", "chat_completions.choices.message.content"
    )
)

StatementMeta(sparkpool1, 8, 52, Finished, Available)

SynapseWidget(Synapse.DataFrame, 27366bc4-203c-444e-966e-0be5c3a6a0b4)

In [54]:
chat_completionDF.printSchema()

StatementMeta(sparkpool1, 8, 53, Finished, Available)

root
 |-- Date: string (nullable = true)
 |-- Hotel: string (nullable = true)
 |-- Highlights: string (nullable = true)
 |-- Review Comments: string (nullable = true)
 |-- messages: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- role: string (nullable = true)
 |    |    |-- content: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- error: struct (nullable = true)
 |    |-- response: string (nullable = true)
 |    |-- status: struct (nullable = true)
 |    |    |-- protocolVersion: struct (nullable = true)
 |    |    |    |-- protocol: string (nullable = true)
 |    |    |    |-- major: integer (nullable = false)
 |    |    |    |-- minor: integer (nullable = false)
 |    |    |-- statusCode: integer (nullable = false)
 |    |    |-- reasonPhrase: string (nullable = true)
 |-- chat_completions: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- object: string (nullable = true)
 |    |-- created: str

### Save output to Azure SQL

In [55]:
chat_completionDF = chat_completionDF.withColumn("messages_json", to_json(col("messages"))).drop("messages")
chat_completionDF = chat_completionDF.withColumn("chat_completions_json", to_json(col("chat_completions"))).drop("chat_completions")
chat_completionDF = chat_completionDF.withColumn("error_json", to_json(col("error"))).drop("error")

StatementMeta(sparkpool1, 8, 54, Finished, Available)

In [56]:
display(chat_completionDF)

StatementMeta(sparkpool1, 8, 55, Finished, Available)

SynapseWidget(Synapse.DataFrame, c833ab4f-db59-455a-b54a-d78569b7cf1c)

In [57]:
try:
  chat_completionDF.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", dbtable_out) \
    .option("user", user) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("MSSQL Connector write failed", error)

print("MSSQL Connector write(overwrite) succeeded  ")

StatementMeta(sparkpool1, 8, 56, Finished, Available)

MSSQL Connector write(overwrite) succeeded  
