# Tutorial: Build intelligent applications using Azure Cognitive Services in Microsoft Machine Learning for Apache Spark

In this article, you will learn how to use Microsoft Machine Learning for Apache Spark ([SynapseML ](https://github.com/microsoft/SynapseML)) to create machine learning applications. 
SynapseML expands the distributed machine learning solution of Apache Spark by adding many deep learning and data science tools, such as [Azure Cognitive Services](../../cognitive-services/big-data/cognitive-services-for-big-data.md), [OpenCV](https://opencv.org/), [LightGBM](https://github.com/Microsoft/LightGBM) and more.  

SynapseML allows you to build powerful and highly scalable predictive and analytical models from various Spark data sources.





<img width="200"  src="https://mmlspark.blob.core.windows.net/graphics/Readme/cog_services_on_spark_2.svg">

This tutorial covers samples using [Azure Cognitive Services](https://azure.microsoft.com/services/cognitive-services/) in SynapseML for 

- Text Analytics - get the sentiment (or mood) of a set of sentences.
- Computer Vision - get the tags (one-word descriptions) associated with a set of images.
- Bing Image Search - search the web for images related to a natural language query.
- Anomaly Detector - detect anomalies within a time series data.
- Speech to Text - convert streams or files of spoken audio to text.

If you don't have an Azure subscription, [create a free account before you begin](https://azure.microsoft.com/free/).


## Prerequisites 

- [Azure Synapse Analytics workspace](https://docs.microsoft.com/azure/synapse-analytics/get-started-create-workspace) with an Azure Data Lake Storage Gen2 storage account configured as the default storage. You need to be the *Storage Blob Data Contributor* of the Data Lake Storage Gen2 file system that you work with.
- Spark pool in your Azure Synapse Analytics workspace. For details, see [Create a Spark pool in Azure Synapse](https://docs.microsoft.com/azure/synapse-analytics/get-started-analyze-spark).
- Pre-configuration steps described in the tutorial [Configure Cognitive Services in Azure Synapse](https://docs.microsoft.com/azure/synapse-analytics/machine-learning/tutorial-configure-cognitive-services-synapse).

## Get started
To get started, import mmlspark and configurate service keys. 

In [6]:

from synapse.ml.cognitive import *
from notebookutils import mssparkutils

# A general Cognitive Services key for Text Analytics and Computer Vision (or use separate keys that belong to each service)
cognitive_service_key = mssparkutils.credentials.getSecret("ezzat-keyvault", "ez-cog-lang")
cognitive_service_translator_key = mssparkutils.credentials.getSecret("ezzat-keyvault", "ez-cog-translate")

# A Bing Search v7 subscription key
#bingsearch_service_key = mssparkutils.credentials.getSecret("ADD_YOUR_KEY_VAULT_NAME", "ADD_YOUR_BING_SEARCH_KEY","ADD_YOUR_KEY_VAULT_LINKED_SERVICE_NAME")
# An Anomaly Dectector subscription key
#anomalydetector_key = mssparkutils.credentials.getSecret("ADD_YOUR_KEY_VAULT_NAME", "ADD_YOUR_ANOMALY_KEY","ADD_YOUR_KEY_VAULT_LINKED_SERVICE_NAME")

StatementMeta(testspark2, 7, 5, Finished, Available)

## Text analytics sample

The [Text Analytics](https://docs.microsoft.com/azure/cognitive-services/text-analytics/) service provides several algorithms for extracting intelligent insights from text. For example, we can find the sentiment of given input text. The service will return a score between 0.0 and 1.0 where low scores indicate negative sentiment and high score indicates positive sentiment. This sample uses three simple sentences and returns the sentiment for each.


### Sentiment analysis

In [98]:
from pyspark.sql.functions import col

# Create a dataframe that's tied to it's column names
df_sentences = spark.createDataFrame([
  ("I am so happy today, its sunny!", "en-US"), 
  ("this is a dog", "en-US"), 
  ("I am frustrated by this rush hour traffic!", "en-US") 
], ["text", "language"])

# Run the Text Analytics service with options
sentiment = (TextSentiment()
    .setTextCol("text")
    .setLocation("eastus") # Set the location of your cognitive service
    .setSubscriptionKey(cognitive_service_translator_key)
    .setOutputCol("sentiment")
    .setErrorCol("error")
    .setLanguageCol("language"))

# Show the results of your text query in a table format
#display(sentiment.transform(df_sentences))
#sentiment.transform(df_sentences).printSchema()
display(sentiment.transform(df_sentences).select("text","language", "sentiment.document.sentiment","sentiment.document.confidenceScores.*"))

StatementMeta(testspark2, 1, 98, Finished, Available)

SynapseWidget(Synapse.DataFrame, cf3c2adc-bbe7-4f0b-95f4-193d790d0d80)

## Opinion mining

In [None]:
import  pyspark.sql.functions as f

# Create a dataframe that's tied to it's column names
df = spark.createDataFrame([
  ("Bill Clinton is so happy today, its sunny!", "en-US"),
  ("The room was great, but the staff was unfriendly", "en-US"),
  ("The cognitive services on spark aint bad", "en-US"),
], ["text", "language"])

# Run the Text Analytics service with options
sentiment = (TextSentiment()
            .setLocation("eastus") # Set the location of your cognitive service
            .setSubscriptionKey(cognitive_service_key)
            .setTextCol("text")
            .setOutputCol("sentiment")
            .setErrorCol("error")
            .setLanguageCol("language")
            .setOpinionMining(True)
            )

# Show the results of your text query in a table format
#display(sentiment.transform(df))
display(sentiment.transform(df)
      .select("text","language", "sentiment.document.*")
      .withColumn("sentences",f.explode_outer("sentences"))
      .select(f.col("text").alias("original_text"),"sentiment"
            ,f.col("sentences.sentiment").alias("sentences_sentiment")
            ,f.col("sentences.confidenceScores").alias("sentences_conf_score")
            ,f.col("sentences.text").alias("sentences_text")
            ,f.col("sentences.targets").alias("sentences_targets")
            ,f.col("sentences.assessments").alias("sentences_assessments")
      )
     
)

## Language detector

In [32]:
# Create a dataframe that's tied to it's column names
df = spark.createDataFrame([
  ("Hello World",),
  ("Bonjour tout le monde",),
  ("Bonjour tout le monde, my name is John",),
  ("La carretera estaba atascada. Había mucho tráfico el día de ayer.",),
  ("你好",),
  ("こんにちは",),
  (":) :( :D",)
], ["text",])

# Run the Text Analytics service with options
language = (LanguageDetector()
            .setLocation("eastus") # Set the location of your cognitive service
            .setSubscriptionKey(cognitive_service_key)
            .setTextCol("text")
            .setOutputCol("language")
            .setErrorCol("error")
            )

# Show the results of your text query in a table format
display(language.transform(df).select("text","language.document.detectedLanguage.*"))

StatementMeta(testspark2, 0, 32, Finished, Available)

SynapseWidget(Synapse.DataFrame, 43f4ee72-8346-4ce5-a923-29c966800352)

### Translate text

In [None]:
from synapse.ml.cognitive import *
import pyspark.sql.functions as f


translate = (Translate()
            .setLocation("eastus") # Set the location of your cognitive service
            .setSubscriptionKey(cognitive_service_translator_key)
            .setTextCol("text")
            .setFromLanguageCol("lang")
            .setToLanguage(["en-US"])
            .setOutputCol("translation")
            .setConcurrency(5))


df = spark.createDataFrame([
  (["Hello, what is your name?", "Bye"],"en-US"),
  (["Bonjour, comment ca va?"],"fr"),
  (["Como estas?",],"es"),
], ["text","lang"])

display(translate.transform(df)
        .withColumn("translation",f.explode_outer("translation"))
        .withColumn("translations",f.explode_outer("translation.translations"))
        .select("text","lang",f.col("translations.text").alias("translated_text"),)
        )

In [170]:
from synapse.ml.cognitive import *
df = spark.createDataFrame([
  ("happy", "en-US"),
  ("unfriendly", "en-US"),
  ("unpredictable", "en"),
], ["text", "language"])


dictionaryLookup = (DictionaryLookup()
                  .setSubscriptionKey(cognitive_service_translator_key)
                  .setLocation("eastus")
                  .setFromLanguageCol("language")
                  .setToLanguage("es")
                  .setTextCol("text")
                  .setOutputCol("result"))
display(dictionaryLookup
    .transform(df)
    .withColumn("translations", f.flatten(col("result.translations")))
    .withColumn("normalizedTarget", col("translations.normalizedTarget"))
    .select("text","language","normalizedTarget")
    
    )

StatementMeta(testspark2, 1, 170, Finished, Available)

SynapseWidget(Synapse.DataFrame, 0d0dbbfd-fc17-4063-a973-f81be63cd088)

In [182]:
from synapse.ml.cognitive import *
df = (spark.createDataFrame([
  ("fly", "volar"),("happy", "feliz"),("unpredictable", "impredecible")
], ["text", "translation"])
    .withColumn("textAndTranslation", f.array(f.struct(col("text"), col("translation")))))


dictionaryExamples = (DictionaryExamples()
                  .setSubscriptionKey(cognitive_service_translator_key)
                  .setLocation("eastus")
                  .setFromLanguage("en")
                  .setToLanguage("es")
                  .setTextAndTranslationCol("textAndTranslation")
                  .setOutputCol("result"))
display(dictionaryExamples
    .transform(df)
    .withColumn("examples", f.explode(f.flatten(col("result.examples"))))
    .select("text","translation","result","examples.*"))

StatementMeta(testspark2, 1, 182, Finished, Available)

SynapseWidget(Synapse.DataFrame, ad6bbe3e-9335-4635-825c-2fa8b8f88501)

## Entity detector

In [75]:
import pyspark.sql.functions as f


df = spark.createDataFrame([
    ("1", "Microsoft released Windows 10"),
    ("2", "In 1975, Bill Gates III and Paul Allen founded the company.")
], ["if", "text"])

entity = (EntityDetector()
        .setLocation("eastus") # Set the location of your cognitive service
        .setSubscriptionKey(cognitive_service_key)
        .setLanguage("en")
        .setOutputCol("replies")
        .setErrorCol("error"))

entity.transform(df).printSchema()
display(entity.transform(df)
        .select("text","replies.document.*")
        .withColumn("entities",f.explode("entities"))
        .select("text","entities.name","entities.language","entities.matches","entities.id","entities.url",)
        #.withColumn("matches",f.explode("matches"))
        #.select("text","name","matches.*","id","url",)
        )

StatementMeta(testspark2, 0, 75, Finished, Available)

root
 |-- if: string (nullable = true)
 |-- text: string (nullable = true)
 |-- error: struct (nullable = true)
 |    |-- response: string (nullable = true)
 |    |-- status: struct (nullable = true)
 |    |    |-- protocolVersion: struct (nullable = true)
 |    |    |    |-- protocol: string (nullable = true)
 |    |    |    |-- major: integer (nullable = false)
 |    |    |    |-- minor: integer (nullable = false)
 |    |    |-- statusCode: integer (nullable = false)
 |    |    |-- reasonPhrase: string (nullable = true)
 |-- replies: struct (nullable = true)
 |    |-- statistics: struct (nullable = true)
 |    |    |-- documentsCount: integer (nullable = false)
 |    |    |-- validDocumentsCount: integer (nullable = false)
 |    |    |-- erroneousDocumentsCount: integer (nullable = false)
 |    |    |-- transactionsCount: integer (nullable = false)
 |    |-- document: struct (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- entities: array (nullable = true)

SynapseWidget(Synapse.DataFrame, b90acc22-1573-47c0-afdc-7ab8cf1a3270)

## Key phrase extractor

In [59]:
df = spark.createDataFrame([
    ("en", "Hello world. This is some input text that I love."),
    ("fr", "Bonjour tout le monde"),
    ("es", "La carretera estaba atascada. Había mucho tráfico el día de ayer.")
], ["lang", "text"])

keyPhrase = (KeyPhraseExtractor()
            .setLocation("eastus") # Set the location of your cognitive service
            .setSubscriptionKey(cognitive_service_key)
            .setLanguageCol("lang")
            .setOutputCol("replies")
            .setErrorCol("error"))

display(keyPhrase.transform(df).select("text","lang","replies.document.*"))

StatementMeta(testspark2, 0, 59, Finished, Available)

SynapseWidget(Synapse.DataFrame, 19d6d4b1-f093-4823-98d0-fa0ae5685065)

# Named Entity Recognition


In [71]:
df = spark.createDataFrame([
    ("1", "en", "I had a wonderful trip to Seattle last week."),
    ("2", "en", "I visited Space Needle 2 times.")
], ["id", "language", "text"])

ner = (NER()
        .setLocation("eastus") # Set the location of your cognitive service
        .setSubscriptionKey(cognitive_service_key)
        .setLanguageCol("language")
        .setOutputCol("replies")
        .setErrorCol("error")
        )

display(ner.transform(df)
        .select("text","language","replies.document.*")
        .withColumn("entities",f.explode("entities"))
        .select("text","language","entities.*")
        )

StatementMeta(testspark2, 0, 71, Finished, Available)

SynapseWidget(Synapse.DataFrame, 0a694d39-e486-49d1-bfce-82a060027992)

# Personally Identifiable Information (PII)

In [83]:
df = spark.createDataFrame([
    ("1", "en", "My SSN is 859-98-0987"),
    ("2", "en", "Your ABA number - 111000025 - is the first 9 digits in the lower left hand corner of your personal check."),
    ("3", "en", "Is 998.214.865-68 your Brazilian CPF number?")
], ["id", "language", "text"])

pii = (PII()
    .setLocation("eastus") # Set the location of your cognitive service
    .setSubscriptionKey(cognitive_service_key)
    .setLanguageCol("language")
    .setOutputCol("replies")
    .setErrorCol("error"))

display(pii.transform(df)
        .select("text","language","replies.document.*")
        #.withColumn("entities",f.explode("entities"))
        .select("text","redactedText","language","entities")

)

StatementMeta(testspark2, 0, 83, Finished, Available)

SynapseWidget(Synapse.DataFrame, 8ac46ce2-a861-45ce-9088-624431a071a4)

## Text Analytics for Health

In [20]:
df = spark.createDataFrame(
    [
        ("20mg of ibuprofen twice a day",),
        ("1tsp of Tylenol every 4 hours",),
        ("6-drops of Vitamin B-12 every evening",),
    ],
    ["text"],
)

healthcare = (
    AnalyzeHealthText()
    .setLocation("eastus") # Set the location of your cognitive service
    .setSubscriptionKey(cognitive_service_key)
    .setLanguage("en")
    .setOutputCol("response")
)

display(healthcare.transform(df)
            .select("text","response.document.*")
        )

StatementMeta(testspark2, 7, 19, Finished, Available)

SynapseWidget(Synapse.DataFrame, 5e576b66-ac75-45a4-bc77-7b3c655ea699)

## Clean up resources
To ensure the Spark instance is shut down, end any connected sessions(notebooks). The pool shuts down when the **idle time** specified in the Apache Spark pool is reached. You can also select **stop session** from the status bar at the upper right of the notebook.

![stopsession](https://adsnotebookrelease.blob.core.windows.net/adsnotebookrelease/adsnotebook/image/stopsession.png)

## Next steps

* [Check out Synapse sample notebooks](https://github.com/Azure-Samples/Synapse/tree/main/MachineLearning) 
* [SynapseML GitHub Repo](https://github.com/microsoft/SynapseML)
