# ** Multi-Service Pipeline **

This example is a bit more advanced as it shows you you can use the notion of a "Pipeline" to string together multiple congnitive serices API calls.

In this example we take a combination of the Sentiment and Translate API calls but wrap them in the PipelineModel (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.PipelineModel.html).  The same result could be acheived by calling these APIs sesquentially, but this is a more elegant and efficient way to wrap all the necessary API calls into a single operation on the data frame.

# *** Prerequisites: *** 

1. A provisioned Synapse Workspace with a Spark 3.1 pool to run this notbook on.
1. A keyvault to store the secrets necessary for connecting to the anomaly detector api and blob storage.
1. A provisioined cognitive services multi-endpont api instance.

To start we'll import a bunch of necessary modules and setup values for some resoruces like storage accounts, container, and keyvault that we'll use throughout the rest of the example.

In [1]:
import numpy as np
import pandas as pd

import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.ml import PipelineModel

from notebookutils import mssparkutils
import synapse.ml
from synapse.ml.cognitive import *
from synapse.ml.stages import FixedMiniBatchTransformer, FlattenBatch, DropColumns

#Specify the name of the storage account that is associated with Synapse
storageAccountName = "<insert storage account name>"
#Specify the container you created in Storage account, you could also initialize a new name here, and Synapse will help you create that container automatically.
containerName = "<insert container name>"
#set the name of your key vault
keyVaultName = "<insert keyvault name>"
#set the region where you cognitive service is deployed
serviceLocation = "eastus"


StatementMeta(cgmsparkml, 5, 1, Submitted, Running)

Now that we have libraries and variables set, load the sample csv file and ensure the columns are of the appropriate data types.

The file we're using has multiple columns, but the column we're most intersted in is TweetText.  We'll be using the sentiment API to detect the general sentiment of this text.

In [2]:
filePath = "abfss://" + containerName + "@" + storageAccountName + ".dfs.core.windows.net/bike reviews.csv"
#Load the sample data CSV file, assuming this is in the storage account that was provisioned with the cluster no keys are necessary.
df = spark.read.format("csv").option("header", "true").option("quote",'"').load(filePath)

# Let's inspect the dataframe:
display(df.limit(10))

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 49f853ee-5d98-4067-8e1b-a4bdefacc324)

To use the cognitive services apis we must first give some configuration information about the format of the data, and locations and keys of our dependent resources.

This code assumes you've stored your endpoint key for cognitive services in keyVault.  If you want to take a short cut you can always hardcode those values here, but its not a best practice to leave keys inline in code.

In [3]:

#Input your key vault information.  The secret we need is for the cognitive services multi-service resource.
cogServicesKey = mssparkutils.credentials.getSecret(keyVaultName, "cogsvcskey")

pipe = PipelineModel(stages=[
  FixedMiniBatchTransformer().setBatchSize(10),
  TextSentiment()
    .setSubscriptionKey(cogServicesKey)
    .setLocation(serviceLocation)   #use lowercase letter like: southcentralus
    .setOutputCol("sentimentResult")
    .setErrorCol("error1")
    .setLanguage("en")
    .setTextCol("Review"),
  Translate()
    .setLocation(serviceLocation)
    .setSubscriptionKey(cogServicesKey)
    .setToLanguage(["en"])
    .setProfanityAction("Marked")
    .setOutputCol("translation")
    .setErrorCol("error2")
    .setTextCol("Review"),
  DropColumns().setCols(["error1","error2"]),
  FlattenBatch()])

results = pipe.transform(df.na.fill("",["Review"]))

display(results)

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 920a807e-6cb0-4740-a1de-7fa24b1b2556)

The prior cell results shows us that the api returns a json response.   This response has a top level element that gives us the overall sentiment and confidence for the tweet.  It also includes an array of each sentence and confidence results for each sentence.  In addition the translate API can translate to multiple target languages, so the API returns an array of translated text.

For our purposes we only translated to one language, and we only care about the overall sentiment. Therefpre we only care aobut the first result of translate and the top level attributes of the sentiment response.   To make this easier to consume we'll make the results easier to read by doing some post processing to beautify the response.

In this case we'll overwrite the original TweetText column because the intent of calling the translate API was to serve as a profanity filter.  You can see the sanitized text denoted as *** in the results.  Also the sentiment attributes will be "promoted" to columns in the dataframe.


In [4]:
from pyspark.sql.functions import col

results = (results
      .withColumn("sentiment", col("sentimentResult.document.sentiment"))
      .withColumn("positive", col("sentimentResult.document.confidenceScores.positive"))
      .withColumn("neutral", col("sentimentResult.document.confidenceScores.neutral"))
      .withColumn("negative", col("sentimentResult.document.confidenceScores.negative"))
      .withColumn("Review", col("translation.translations").getField("text")[0])
)

display(results)

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, f1641d3e-1bdf-4022-b0f0-5cc878c58bd4)