# Exercise 6.1 - Analyze Text Data with prebuilt AI models

## Exercise outline

1) **Load new data with reviews of taxi trips. Join it with exisiting table**
2) **Explore and clean data**
3) **Call prebuild AI Services to add new features to the data**
4) **Use Azure OpenAI to enrich text data**

Important requirement: The following code works with Runtime 1.2. It may generate errors while running on Runtime 1.3.

In [None]:
## Lakehouse Names
BRONZE_LAKEHOUSE = "bronzerawdata"
SILVER_LAKEHOUSE = "silvercleansed"
GOLD_LAKEHOUSE = "goldcurated"

#Table Names
RAW_DATA_TABLE = "green201501"
RAW_REVIEW_TABLE = "review"
CLENSED_REVIEW_TABLE = "taxireview_cleansed"
ENRICHED_TABLE = "taxiwithreview"


# SET SQL Variables to explore data using Spark SQL
spark.sql(f"SET BRONZE_LAKEHOUSE ={BRONZE_LAKEHOUSE}")
spark.sql(f"SET SILVER_LAKEHOUSE ={SILVER_LAKEHOUSE}")
spark.sql(f"SET GOLD_LAKEHOUSE ={GOLD_LAKEHOUSE}")


## Step 1: Load reviews data and join them with Taxi data.

This part of the exercise focuses on additional data involving reviews of taxi trips. We will join the reviews with existing data, explore and clean this additional dataset.

In [None]:
##Read the additional data. You can generate the code below by dragging the file to the notebook cell.

review_df = spark.read.parquet("Files/reviews.parquet")
display(review_df)

In [None]:
## change date format in review dataframe. You can explore values in review_df and nyc_taxi_df to see for yourself, that formats of date don't match.
from pyspark.sql.functions import unix_timestamp, from_unixtime

review_df = review_df.withColumn('pickup_datetime', from_unixtime(unix_timestamp('pickup_datetime', 'MM/dd/yyyy hh:mm:ss a')))
review_df = review_df.withColumn('dropoff_datetime', from_unixtime(unix_timestamp('dropoff_datetime', 'MM/dd/yyyy hh:mm:ss a')))


In [None]:
## save the review dataframe to the table in bronze lakehouse
review_df.write.format("delta").mode("overwrite").saveAsTable(f"{BRONZE_LAKEHOUSE}.{RAW_REVIEW_TABLE}")

After displaying the reviews read a few of them.  You can read them by clicking on an example field.

The reviews were generated with the help of LLM, so they can seem a little unnatural.

Now lets join the reviews with taxi data.

In [None]:
## Reading taxi data to dataframe


nyc_taxi_df = spark.sql(f"SELECT * FROM {BRONZE_LAKEHOUSE}.{RAW_DATA_TABLE}")


display(nyc_taxi_df)

In [None]:
## Joining the data
from pyspark.sql.functions import col

# Create aliases for your DataFrames
df1_alias = nyc_taxi_df.alias("df1")
df2_alias = review_df.alias("df2")

# Define the join condition using the aliases
join_condition = [col("df1.VendorID") == col("df2.vendorid"), col("df1.lpep_pickup_datetime") == col("df2.pickup_datetime"),col("df1.lpep_dropoff_datetime") == col("df2.dropoff_datetime")]

# Perform the join using the aliases
result_df = df1_alias.join(df2_alias, join_condition, how='left')  # You can use other join types like 'inner', 'right', 'outer', etc.

# Drop one of vendorid columns.
result_df = result_df.drop(col("df2.vendorid"))

display(result_df)

Let's check whether the join is correct, by priniting number of rows of the orginal dataset and the one enriched with reviews.

In [None]:
print(nyc_taxi_df.count())
print(result_df.count())


And how many reviews do we have?

In [None]:

taxi_with_reviews = result_df.filter(col("review").isNotNull())
print(taxi_with_reviews.count())

In [None]:
### Save the results to a new delta table
taxi_with_reviews.write.format("delta").mode("overwrite").saveAsTable(f"{SILVER_LAKEHOUSE}.{CLENSED_REVIEW_TABLE}")

# Step 2: Explore data and add new features with prebuilt AI Services

In this Step we will use prebuilt AI Services. This feature is still in preview, however it simplifies access to AI solutions in Azure. There is no need to deploy services and configure connection for basic AI use cases in Fabric. You can access Text Analytics, Azure Open AI and other directly from notebooks in Fabric, without additional configuration. 

In our scenario we will start with determining the language of the reviews with Azure Text Analytics. The library that will allow us achieve that goal is again SynapseML.



##### Import synapseML

In [None]:
import synapse.ml.core
from synapse.ml.cognitive.language import AnalyzeText
from pyspark.sql.functions import col

In [None]:
#define TextAnalytics model for language detection.

model = (AnalyzeText()
        .setTextCol("review")  #name of the analyzed column
        .setKind("LanguageDetection") ## type of task
        .setOutputCol("response"))

result = model.transform(taxi_with_reviews)\
        .withColumn("documents", col("response.documents"))\
        .withColumn("Language", col("documents.detectedLanguage.name"))

In [None]:
enriched_df = result.select(col("review"),col("documents"),col("Language"))
display(enriched_df)

In [None]:
##let's check the distributrion of languages in reviews:
display(enriched_df.groupBy("language").count())

In [None]:
## now let's analyse key phrases

model = (AnalyzeText()
        .setTextCol("review")
        .setKind("KeyPhraseExtraction")
        .setOutputCol("response"))

result = model.transform(taxi_with_reviews)\
        .withColumn("documents", col("response.documents"))\
        .withColumn("keyPhrases", col("documents.keyPhrases"))

display(result.select("review", "keyPhrases"))

In [None]:
### Ok, lets try sentiment analysis

model = (AnalyzeText()
        .setTextCol("review")
        .setKind("SentimentAnalysis")
        .setOutputCol("response"))

result = model.transform(taxi_with_reviews)\
        .withColumn("documents", col("response.documents"))\
        .withColumn("sentiment", col("documents.sentiment"))

display(result.select("review", "sentiment"))

In [None]:
display(result.groupBy("sentiment").count())

We tried different methods available in Text Analytics in Azure. Another service prebuilt in Fabric is Azure AI Translator.

Let's translate the reviews to two languages, English and Polish, with the help of prebuilt AI Translator.

In [None]:
from synapse.ml.cognitive.translate import *
from pyspark.sql.functions import flatten

In [None]:
translate = (Translate()
    .setTextCol("review")
    .setToLanguage(["en","pl"])
    .setOutputCol("translation")
    .setConcurrency(5))

result = translate.transform(taxi_with_reviews)\
        .withColumn("translation", flatten(col("translation.translations")))\
        .withColumn("translation", col("translation.text"))

display(result.select("review", "translation"))

Both Text Analytics and AI Translator are available even on F2 Capacity. 

In the next step we will use Azure Open AI Service to analyse our data. Azure OpenAI Service also comes with some of the models prebuild in Fabric, however it is available for capacities at least F64.

Let's start with the Completion task. In this scenario for every text LLM model suggest next words and sentences. We will provide reviews and analyse what the text-davinci-003 suggests as next elements of the text.

In [None]:
from synapse.ml.cognitive.openai import OpenAICompletion
from pyspark.sql.functions import col

taxi_top = taxi_with_reviews.limit(10)

deployment_name = "text-davinci-003" # deployment_name could be text-davinci-003 or code-cushman-002
completion = (
    OpenAICompletion()
    .setDeploymentName(deployment_name)
    .setMaxTokens(200)
    .setPromptCol("review")
    .setErrorCol("error")
    .setOutputCol("completions")
)

completed_df = completion.transform(taxi_top).cache()
display(
    completed_df.select(
        col("review"),
        col("error"),
        col("completions.choices.text").getItem(0).alias("text"),
    )
)

The completion tasks relies on older GPT models like da-vinci. More oppotunities arise when we us chat-completion task.

Chat completion allows us to use GPT in versions 3.5 or 4. It brings us the most reliable answers by specifing system message, and user prompt. To provide this information for prebuilt model we need to prepare our data.

The model expects the data to be in the following format:

[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": here values from column with text}
]

We will start by analysing an example dataframe

In [None]:

import synapse.ml.core
from synapse.ml.services.openai import *

In [None]:
from pyspark.sql import Row
from pyspark.sql.types import *


def make_message(role, content):
    return Row(role=role, content=content, name=role)


chat_df = spark.createDataFrame(
    [
        (
            [
                make_message(
                    "system", "You are an AI chatbot with red as your favorite color"
                ),
                make_message("user", "Whats your favorite color"),
            ],
        ),
        (
            [
                make_message("system", "You are very excited"),
                make_message("user", "How are you today"),
            ],
        ),
    ]
).toDF("messages")


chat_completion = (
    OpenAIChatCompletion()
    .setDeploymentName("gpt-35-turbo-16k") # deploymentName could be one of {gpt-35-turbo, gpt-35-turbo-16k, gpt-4}
    .setMessagesCol("messages")
    .setErrorCol("error")
    .setOutputCol("chat_completions")
)

display(
    chat_completion.transform(chat_df).select(
        "messages", "chat_completions.choices.message.content"
    )
)

Now let's try to achieve similar results for Taxi Dataset, by preparing the dataframe.

In [None]:
from pyspark.sql import Row
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StructType, StructField, StringType


# Define the schema for the Row structure
row_schema = ArrayType(StructType([
    StructField("role", StringType(), False),
    StructField("content", StringType(), False),
    StructField("name", StringType(), False)
]))

# Define the UDF to create the desired structure
@udf(row_schema)
def create_structure(review):
    return [
        Row(role='system', content='You are a helpful assistant that returns the sentiment of the message. You respond with one word: positive, mixed, negative', name='system'),
        Row(role='user', content=review, name='user')
    ]

# Add the new column with the structured data using the UDF
taxi_with_reviews_sentimet = taxi_with_reviews.withColumn("prompt", create_structure("review"))

# Show the result
display(taxi_with_reviews_sentimet)

The problem that we tried to solve with OpenAI is similar to the one with TextAnalytics, Sentiment Analysis. The key part of our code is the system message. In our case it's

        Row(role='system', content='You are a helpful assistant that returns the sentiment of the message. You respond with one word: positive, mixed, negative', name='system'),

You can customise this message however you want, and with this simple code deal with multiple problems like sentiment analysis, entity recognition, classification, or generating responses to reviews


Finally, it's time to release your own creativity. Try different system messages on the code below.

In [None]:
from pyspark.sql import Row
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StructType, StructField, StringType


# Define the schema for the Row structure
row_schema = ArrayType(StructType([
    StructField("role", StringType(), False),
    StructField("content", StringType(), False),
    StructField("name", StringType(), False)
]))

# Define the UDF to create the desired structure
@udf(row_schema)
def create_structure(review):
    return [
        Row(role='system', content='You are a helpful assistant that returns the sentiment of the message. You respond with one word: positive, mixed, negative', name='system'),
        Row(role='user', content=review, name='user')
    ]

# Add the new column with the structured data using the UDF
responses = taxi_with_reviews.withColumn("prompt", create_structure("review"))

# Show the result
display(responses)

Use the code below to create one dataframe with results of sentiment analysis, translation and key phrases extraction.z

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

taxi_with_reviews = taxi_with_reviews.withColumn("review_id", monotonically_increasing_id())


In [None]:
from pyspark.sql.functions import col, flatten, monotonically_increasing_id, udf
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, Row


# Translation transformation
translate = (Translate()
    .setTextCol("review")
    .setToLanguage(["en", "pl"])
    .setOutputCol("translation")
    .setConcurrency(5))

translation_result = translate.transform(taxi_with_reviews)\
    .withColumn("translation", flatten(col("translation.translations")))\
    .withColumn("translation", col("translation.text"))\
    .select("review_id", "translation")

# Key phrase extraction transformation
key_phrase_model = (AnalyzeText()
    .setTextCol("review")
    .setKind("KeyPhraseExtraction")
    .setOutputCol("response"))

key_phrase_result = key_phrase_model.transform(taxi_with_reviews)\
    .withColumn("documents", col("response.documents"))\
    .withColumn("keyPhrases", col("documents.keyPhrases"))\
    .select("review_id", "keyPhrases")

# Language detection transformation
language_model = (AnalyzeText()
    .setTextCol("review")
    .setKind("LanguageDetection")
    .setOutputCol("response"))

language_result = language_model.transform(taxi_with_reviews)\
    .withColumn("documents", col("response.documents"))\
    .withColumn("language", col("documents.detectedLanguage.name"))\
    .select("review_id", "language")

# Sentiment analysis transformation
sentiment_model = (AnalyzeText()
    .setTextCol("review")
    .setKind("SentimentAnalysis")
    .setOutputCol("response"))

sentiment_result = sentiment_model.transform(taxi_with_reviews)\
    .withColumn("documents", col("response.documents"))\
    .withColumn("sentiment", col("documents.sentiment"))\
    .select("review_id", "sentiment")

# Join all the results based on the 'review_id'
final_result = taxi_with_reviews.join(translation_result, "review_id", "inner")\
    .join(key_phrase_result, "review_id", "inner")\
    .join(language_result, "review_id", "inner")\
    .join(sentiment_result, "review_id", "inner")\
    .select("review", "translation", "keyPhrases", "language", "sentiment")

# Display the final result
display(final_result)

And add the response to the review generated by OpenAI model. Feel free to have some fun with system message.

In [None]:
from pyspark.sql.functions import col, flatten, monotonically_increasing_id, udf
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, Row


# Define the schema for the Row structure
row_schema = ArrayType(StructType([
    StructField("role", StringType(), False),
    StructField("content", StringType(), False),
    StructField("name", StringType(), False)
]))

# Define the UDF to create the desired structure
@udf(row_schema)
def create_structure(review):
    return [
        Row(role='system', content='You are a helpful assistant that responds to the review of taxi trip.', name='system'),
        Row(role='user', content=review, name='user')
    ]

# Add the new column with the structured data using the UDF
df_with_new_column = taxi_with_reviews.withColumn("prompt", create_structure(col("review")))

# Define the OpenAIChatCompletion transformation
chat_completion = (
    OpenAIChatCompletion()
    .setDeploymentName("gpt-35-turbo-16k")  # Adjust the deploymentName as needed
    .setMessagesCol("prompt")
    .setErrorCol("error")
    .setOutputCol("chat_completions")
)

# Transform the DataFrame to include chat completions
df_with_chat_completions = chat_completion.transform(df_with_new_column.limit(2))


# Extract the relevant information from the chat completions to add as a new column
df_with_openai_response = df_with_chat_completions\
    .withColumn("openai_response", col("chat_completions.choices.message.content"))

# Now, integrate with the previous transformations (translation, key phrase extraction, language detection, sentiment analysis)


# Join the OpenAI response with the final_result DataFrame based on the 'review_id'
final_result_with_openai = final_result.join(df_with_openai_response.select("review_id", "openai_response"), "review_id", "inner")

# Display the final DataFrame with all transformations and the OpenAI model response
display(final_result_with_openai.select("review", "translation", "keyPhrases", "language", "sentiment", "openai_response"))

In [None]:
## save the final DataFrame to gold lakehouse

final_result_with_openai.write.mode("overwrite").format("delta").saveAsTable(f"{GOLD_LAKEHOUSE}.{ENRICHED_TABLE}")
print(f"Results saved to delta table: {ENRICHED_TABLE}")