# Streaming de tweets con Azure Databricks y su despliegue en Power BI

#### Fuentes:

Structured Streaming with Azure Databricks into Power BI & Cosmos DB

https://github.com/giulianorapoz/DatabricksStreamingPowerBI

Tutorial: Anomaly detection on streaming data using Azure Databricks

https://docs.microsoft.com/en-us/azure/cognitive-services/anomaly-detector/tutorials/anomaly-detection-streaming-databricks


Tutorial: Sentiment analysis on streaming data using Azure Databricks

https://docs.microsoft.com/en-us/azure/azure-databricks/databricks-sentiment-analysis-cognitive-services


Power BI

https://docs.microsoft.com/en-us/azure/databricks/integrations/bi/power-bi

Power BI Connects to Azure Databricks

https://towardsdatascience.com/power-bi-connects-to-azure-databricks-44bea6731be7



In [5]:
# %load_ext watermark
%watermark -a "Christian Castro" -u -d -p numpy,pandas,matplotlib
%watermark -a "Propiedad de DataIntelligence"

Christian Castro 
last updated: 2020-04-27 

numpy 1.18.1
pandas 1.0.1
matplotlib 3.1.3
Propiedad de DataIntelligence


## 1 Introducción

En este notebook revisaremos el concepto de Streaming Estructurado con **Azure Databricks** y en el cómo se puede conectar directamente con Power BI, lo que permite la visualización y análisis avanzados.

Construiremos una ruta de consumo de datos directamente con Azure Databricks lo que nos permitirá transmitir datos a un clúster de **Apache Spark** en tiempo casi real. Mostraremos algunas de las capacidades de análisis a las que se puede llamar directamente desde Databricks utilizando la API de Text Analytics, luego conectaremos Databricks directamente a Power BI para treas de visualización y análisis. 

Se deja como tarea la lectura y escritura directamente desde Databricks  en CosmosDB como almacenamiento persistente y uso posterior.

## 2 Configuración de nuestros recursos. 

Necesitaremos lo siguiente:
* Un espacio de nombres en Azure.
* Un espacio de trabajo de Databricks y un cluster Apache Spark para ejecutar nuestros nuestros notebooks.
* Un Event Hub, para que Databricks envíe los datos.
* Una cuenta de servicios cognitivos, para acceder a la API de Text Analytics.
* Una cuenta de Twitter, para obtener un streaming de datos.
* Power BI Desktop para visualizar y analizar los datos. 
* (Opcional) Una base de datos CosmosDB, para almacenar datos de forma persistente.

### 2.1 Crear un **espacio de nombres**

* Dá click en las tres líneas horizontales de la esquina superior izquierda de Azure, dá click en **Crear un recurso** y en el cuadro de búsqueda ingresa **Event Hub**.

* Dá click en el ícono **Event Hubs** y luego en **Crear**. Ésto no creará un event hub inmediatamente, sino que primero se requerirá la creación de un **espacio de nombres**. Es por ello ue llegarán a la ventana **Crear un espacio de nombres**.

* En **Crear un espacio de nombres** sólo ingresa datos en la pestaña de **Datos básicos** (lo que no se indica que modifiques déjalo tal cual, así en toda la guía):

    * Suscripción: Suscripción de Azure 1
        *Grupo de recursos: Crea uno nuevo: (nuevogrupo2)

    * Nombre de espacio de nombres: tweetstopowerbi8
    
    * Ubicación: Busca una ubicación diferente (existen cuotas que entrega Azure por región que puede provocarte errores más adelante): (US) Centro-Sur de EE.UU.
    
    * Plan de tarifa: Estándar

Espere hasta que salga el mensaje: Validación correcta.

Dá click en **Crear**.

Surgirá un mensaje: **La implementación está en curso**
Espere hasta que aparezca el mensaje: **Se completó la implementación**

Dá click a **Ir al recurso**

Entramos al Espacio de nombres de Event Hubs llamado: tweetstopowerbi8.

### 2.2 Crear un Event Hub

En el espacio de nombres tweetstopowerbi8, da click a **+ Event Hub**. Dale un nombre: tweetstopowerbi8 y luego click a **Crear**.

Ahora el **Event Hub** está listo para funcionar, y tenemos todas las cadenas de conexión necesarias para que Databricks envíe datos.

fig 2

En el espacio de nombres tweetstopowerbi8, en la lista desplegable de la izquierda, en el listado de Configuración da click a **Directivas de acceso compartido** y en la ventana de la derecha a **RootManageSharedAccessKey**

Se cargará automáticamente la **Directiva SAS**

Copia en un txt las claves y ls cadenas de conexión:

Clave principal
fpKYGIGLA3QSCNihV5I8e3Sv/mCfmb3ZUgZQocKCXUc=

Clave secundaria
Aak8snk7d89yQULitOuoWtpct6OeYmhpaieRdpIe4mM=

Cadena de conexión: clave principal
Endpoint=sb://tweetstopowerbi8.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=fpKYGIGLA3QSCNihV5I8e3Sv/mCfmb3ZUgZQocKCXUc=
    
Cadena de conexión: clave secundaria   
Endpoint=sb://tweetstopowerbi8.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=Aak8snk7d89yQULitOuoWtpct6OeYmhpaieRdpIe4mM=

Ahora el **Event Hub** está listo para funcionar, y tenemos todas las cadenas de conexión necesarias para que Databricks envíe datos.

### 2.3 Construir un Workspace Databricks

Busque **Azure Databricks** en **+ Crear un recurso** al hacer click a las tres lineas horizontales de la esquina superior izquierda. Da click en **Crear**. 

* Suscripción: Suscripción de Azure 1
    *Grupo de recursos: Crea uno nuevo: nuevogrupo2

* Nombre de espacio de nombres: tweetstopowerbi8
    
* Ubicación: Busca una ubicación diferente (existen cuotas que entrega Azure por región que puede provocarte errores más adelante): (US) Centro-Sur de EE.UU.
    
* Plan de tarifa: **Premium** Ésto es muy importante, pues si no elegimos premiun no vamos a obtener la cadena de conexión jdbc.

Dá click en revisar y crear.

Espera a que se te indique: **Validación correcta** y da click a **Crear**.

Se inicializará la implementación.

Cuando la implementación esté completa, damos click a **Ir al recurso**

Le damos click  Iniciar área de trabajo, lo que nos redirecionará al portal de Azure Databrick. Necesitaremos ingresar las credenciales de Azure nuevamente.

#### 2.3.1 Construyendo Clusters.

Ya estamos en Azure Databricks!

Le damos clik a **New Cluster** debajo de la franja **Common Tasks**.

Acepte todos los demás valores predeterminados que no sean los siguientes:

Ingrese un nombre para el clúster: tweetstopowerbi8

Databricks Runtime Version: Runtime 6.4 (Scala 2.11, Spark 2.4.5)

Asegúrate de que la casilla de verificación: **Terminar después de 120 minutos de inactividad** esté seleccionada. Proporcione una duración (en minutos) para terminar el clúster, si el clúster no se está utilizando.

Seleccione **Crear clúster**.

La creación del clúster lleva varios minutos. Una vez que el clúster se esté ejecutando, puede adjuntar notebooks al clúster y ejecutar trabajos de Spark.

In [None]:
fig3

#### 2.3.2 Adjuntar bibliotecas al clúster de Spark

Utilizarás las API de Twitter para enviar tweets a Event Hubs y el conector Apache Spark Event Hubs para leer y escribir datos en Azure Event Hubs. 

Para usar estas API como parte de tu clúster, agrégalas dándole click a **Install New** en **Libraries** en el cluster recién creado.

Se abrirá la ventana **Install Library**, seleccionamos Maven en instalamos como coordenadas:

com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.10

org.twitter4j:twitter4j-core:4.0.7

In [None]:
fig4

### 3 Kit de herramientas de servicios cognitivos

Para calcular el sentimiento de los tweets, se requiere un acceso a los **Servicios Cognitivos** de Microsoft. Esto permitirá que Databricks llame a la API de Text Analytics en tiempo casi real directamente desde el notebook y calcule el sentimiento de un tweet dado. 

Crea un recurso como ya lo hemos hecho varias veces y busca:
Text Analytics. Dá click en **Crear**.

1 Nombre: tweetstopowerbi8

2 Suscripción: Suscripción de Azure 1

3 Ubicación: (US) Centro-Sur de EE.UU.

4 Plan de tarifa: S

5 Grupo de recursos: Crea uno nuevo: nuevogrupo2

Da click en **Crear**

Una vez implementado, haz click en ir al recurso, y bajo ADMINISTRACIÓN  DE RECURSOS, dá click en **Claves y punto de conexión** 

Toma nota de la URL del extremo y las claves. Estos valores serán necesarios para que Databricks llame con éxito a la API de Text Analytics.

NOMBRE:
tweetstopowerbi8
Extremo:
https://tweetstopowerbi8.cognitiveservices.azure.com/
Clave1:
3f6e681ce10c430fadc5fa12b5899774
Clave2:
b9eb64027de54831ae0a754d2bff93a2

### 4 Crear notebooks en Databricks

Crearemos cuatro notebooks en el workspace de Databricks con los siguientes nombres:

1 SendTweetsToEventHub (To send tweets to the event hub)
2 TweetSentiment (To calculate sentiment from stream of tweets from event hub)
3 ScheduledTableCreate (To create and continuously update the dataset)
4 DatasetValidation (To validate the dataset directly within Databricks)

En el cluster de Azure Databricks tweetstopowerbi8, en la franja negra de la izquierda anda seleccionando consecutivamente: Workspace, Shared, Shared, Create, Notebook.

Crea los cuatro notebooks en el lenguaje **Scala**.

### 6.1 Código del Notebook SendTweetsToEventHub

Hay un detalle que hay que tmar en cuanta: Tweeter aplica una restricción a la cantidad que puede ser consumida de tweets cada 15 minutos.
https://dev.twitter.com/docs/rate-limiting/1.1

In [None]:
import scala.collection.JavaConverters._
    import com.microsoft.azure.eventhubs._
    import java.util.concurrent._
    import scala.collection.immutable._
    import scala.concurrent.Future
    import scala.concurrent.ExecutionContext.Implicits.global

    val namespaceName = "tweetstopowerbi8.servicebus.windows.net/"
    val eventHubName = "tweetstopowerbi8"
    val sasKeyName = "RootManageSharedAccessKey"
    val sasKey = "ZhvoV1uJtdw1d8tTKmTM5PNB55gHCjSOn24yjVXcwQg="
    val connStr = new ConnectionStringBuilder()
                .setNamespaceName(namespaceName)
                .setEventHubName(eventHubName)
                .setSasKeyName(sasKeyName)
                .setSasKey(sasKey)

    val pool = Executors.newScheduledThreadPool(1)
    val eventHubClient = EventHubClient.create(connStr.toString(), pool)

    def sleep(time: Long): Unit = Thread.sleep(time)

    def sendEvent(message: String, delay: Long) = {
      sleep(delay)
      val messageData = EventData.create(message.getBytes("UTF-8"))
      eventHubClient.get().send(messageData)
      System.out.println("Sent event: " + message + "\n")
    }

    // Add your own values to the list
    val testSource = List("Azure is the greatest!", "Azure isn't working :(", "Azure is okay.")

    // Specify 'test' if you prefer to not use Twitter API and loop through a list of values you define in `testSource`
    // Otherwise specify 'twitter'

    // val dataSource = "test"
    val dataSource = "twitter"

    if (dataSource == "twitter") {

      import twitter4j._
      import twitter4j.TwitterFactory
      import twitter4j.Twitter
      import twitter4j.conf.ConfigurationBuilder

      // Twitter configuration!
      // Replace values below with you

      val twitterConsumerKey = "koO4XqTuWFr5ADGcE8kjIkVoU"
      val twitterConsumerSecret = "3F4sk9qU8zbKBROuLPUUj1uvE2YuhseXPe0ahMQoivg4icN5bL"
      val twitterOauthAccessToken = "1230251564616515586-2KqPsCG2mIJp3irRjENgHpCfQUxTUg"
      val twitterOauthTokenSecret = "6PJfMtYGY7w6csiIX9m1S5jFEKNZ3hE9PVkHKeN1S14iM"

      val cb = new ConfigurationBuilder()
        cb.setDebugEnabled(true)
        .setOAuthConsumerKey(twitterConsumerKey)
        .setOAuthConsumerSecret(twitterConsumerSecret)
        .setOAuthAccessToken(twitterOauthAccessToken)
        .setOAuthAccessTokenSecret(twitterOauthTokenSecret)

      val twitterFactory = new TwitterFactory(cb.build())
      val twitter = twitterFactory.getInstance()

      // Getting tweets with keyword "Azure" and sending them to the Event Hub in realtime!
      val query = new Query("source:twitter4j colmedchile")
      query.setCount(100)
      query.lang("en")
      var finished = false
      while (!finished) {
        val result = twitter.search(query)
        val statuses = result.getTweets()
        var lowestStatusId = Long.MaxValue
        for (status <- statuses.asScala) {
          if(!status.isRetweet()){
            sendEvent(status.getText(), 5000)
          }
          lowestStatusId = Math.min(status.getId(), lowestStatusId)
        }
        query.setMaxId(lowestStatusId - 1)
      }

    } else if (dataSource == "test") {
      // Loop through the list of test input data
      while (true) {
        testSource.foreach {
          sendEvent(_,5000)
        }
      }

    } else {
      System.out.println("Unsupported Data Source. Set 'dataSource' to \"twitter\" or \"test\"")
    }

    // Closing connection to the Event Hub
    eventHubClient.get().close()

El siguiente paso es tomar esta secuencia de tweets y aplicarle un análisis de opinión. Las siguientes líneas de código leídas desde EventHub, llaman a la API de Text Analytics y pasan el cuerpo del tweet para que se calcule el análisis. Obtén el análisis de los Tweets en el notebook TweetSentiment.

Agrega las siguientes líneas de código para llamar a la API de Text Analytics para calcular el análisis de opinión de la transmisión de Twitter.

### 6.2 Creamos el notebook TweetSentiment, que analiza los tweets desde Event Hub.

El siguiente programa está escrito en Scala:

In [None]:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

import org.apache.spark.eventhubs._

// Build connection string with the above information
val connectionString = ConnectionStringBuilder("Endpoint=sb://tweetstopowerbi7.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=ZhvoV1uJtdw1d8tTKmTM5PNB55gHCjSOn24yjVXcwQg=")
  .setEventHubName("tweetstopowerbi7")
  .build

val customEventhubParameters =
  EventHubsConf(connectionString)
  .setMaxEventsPerTrigger(5)
.setStartingPosition(EventPosition.fromEndOfStream) //added this with martin from the databricks offical doc

val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()
incomingStream.printSchema
// Event Hub message format is JSON and contains "body" field
// Body is binary, so we cast it to string to see the actual content of the message
val messages =
  incomingStream
  .withColumn("Offset", $"offset".cast(LongType))
  .withColumn("Time (readable)", $"enqueuedTime".cast(TimestampType))
  .withColumn("Timestamp", $"enqueuedTime".cast(LongType))
  .withColumn("Body", $"body".cast(StringType))
  .select("Timestamp", "Body")
.as[(String, String)] //added this with martin from the databrick official doc


// NEW CODE CELL ----------

import java.io._
import java.net._
import java.util._

class Document(var id: String, var text: String, var language: String = "", var sentiment: Double = 0.0) extends Serializable

class Documents(var documents: List[Document] = new ArrayList[Document]()) extends Serializable {

    def add(id: String, text: String, language: String = "") {
        documents.add (new Document(id, text, language))
    }
    def add(doc: Document) {
        documents.add (doc)
    }
}

// NEW CODE CELL ----------

class CC[T] extends Serializable { def unapply(a:Any):Option[T] = Some(a.asInstanceOf[T]) }
object M extends CC[scala.collection.immutable.Map[String, Any]]
object L extends CC[scala.collection.immutable.List[Any]]
object S extends CC[String]
object D extends CC[Double]

// NEW CODE CELL ----------

import javax.net.ssl.HttpsURLConnection
import com.google.gson.Gson
import com.google.gson.GsonBuilder
import com.google.gson.JsonObject
import com.google.gson.JsonParser
import scala.util.parsing.json._

object SentimentDetector extends Serializable {

  // Cognitive Services API connection settings
  val accessKey = "ef5cfde06ebb4b3fadb16305e61bc807"
  val host = "https://tweetstopowerbi7.cognitiveservices.azure.com/"
  val languagesPath = "/text/analytics/v2.0/languages"
  val sentimentPath = "/text/analytics/v2.0/sentiment"
  val languagesUrl = new URL(host+languagesPath)
  val sentimenUrl = new URL(host+sentimentPath)

  def getConnection(path: URL): HttpsURLConnection = {
    val connection = path.openConnection().asInstanceOf[HttpsURLConnection]
    connection.setRequestMethod("POST")
    connection.setRequestProperty("Content-Type", "text/json")
    connection.setRequestProperty("Ocp-Apim-Subscription-Key", accessKey)
    connection.setDoOutput(true)
    return connection
  }

  def prettify (json_text: String): String = {
    val parser = new JsonParser()
    val json = parser.parse(json_text).getAsJsonObject()
    val gson = new GsonBuilder().setPrettyPrinting().create()
    return gson.toJson(json)
  }

  // Handles the call to Cognitive Services API.
  // Expects Documents as parameters and the address of the API to call.
  // Returns an instance of Documents in response.
  def processUsingApi(inputDocs: Documents, path: URL): String = {
    val docText = new Gson().toJson(inputDocs)
    val encoded_text = docText.getBytes("UTF-8")
    val connection = getConnection(path)
    val wr = new DataOutputStream(connection.getOutputStream())
    wr.write(encoded_text, 0, encoded_text.length)
    wr.flush()
    wr.close()

    val response = new StringBuilder()
    val in = new BufferedReader(new InputStreamReader(connection.getInputStream()))
    var line = in.readLine()
    while (line != null) {
        response.append(line)
        line = in.readLine()
    }
    in.close()
    return response.toString()
  }

  // Calls the language API for specified documents.
  // Returns a documents with language field set.
  def getLanguage (inputDocs: Documents): Documents = {
    try {
      val response = processUsingApi(inputDocs, languagesUrl)
      // In case we need to log the json response somewhere
      val niceResponse = prettify(response)
      val docs = new Documents()
      val result = for {
            // Deserializing the JSON response from the API into Scala types
            Some(M(map)) <- scala.collection.immutable.List(JSON.parseFull(niceResponse))
            L(documents) = map("documents")
            M(document) <- documents
            S(id) = document("id")
            L(detectedLanguages) = document("detectedLanguages")
            M(detectedLanguage) <- detectedLanguages
            S(language) = detectedLanguage("iso6391Name")
      } yield {
            docs.add(new Document(id = id, text = id, language = language))
      }
      return docs
    } catch {
          case e: Exception => return new Documents()
    }
  }

  // Calls the sentiment API for specified documents. Needs a language field to be set for each of them.
  // Returns documents with sentiment field set, taking a value in the range from 0 to 1.
  def getSentiment (inputDocs: Documents): Documents = {
    try {
      val response = processUsingApi(inputDocs, sentimenUrl)
      val niceResponse = prettify(response)
      val docs = new Documents()
      val result = for {
            // Deserializing the JSON response from the API into Scala types
            Some(M(map)) <- scala.collection.immutable.List(JSON.parseFull(niceResponse))
            L(documents) = map("documents")
            M(document) <- documents
            S(id) = document("id")
            D(sentiment) = document("score")
      } yield {
            docs.add(new Document(id = id, text = id, sentiment = sentiment))
      }
      return docs
    } catch {
        case e: Exception => return new Documents()
    }
  }
}

// User Defined Function for processing content of messages to return their sentiment.
val toSentiment = udf((textContent: String) => {
  val inputDocs = new Documents()
  inputDocs.add (textContent, textContent)
  val docsWithLanguage = SentimentDetector.getLanguage(inputDocs)
  val docsWithSentiment = SentimentDetector.getSentiment(docsWithLanguage)
  if (docsWithLanguage.documents.isEmpty) {
    // Placeholder value to display for no score returned by the sentiment API
    (-1).toDouble
  } else {
    docsWithSentiment.documents.get(0).sentiment.toDouble
  }
})

// NEW CODE CELL ----------

// Prepare a dataframe with Content and Sentiment columns
val streamingDataFrame = incomingStream.selectExpr("cast (body as string) AS Content").withColumn("body", toSentiment($"Content"))

// Display the streaming data with the sentiment

streamingDataFrame.writeStream.outputMode("append").format("console").option("truncate", false).start()

// NEW CODE CELL ----------

//WRITE THE STREAM TO PARQUET FORMAT/////
import org.apache.spark.sql.streaming.Trigger.ProcessingTime

val result =
  streamingDataFrame
    .writeStream
    .format("parquet")
    .option("path", "/mnt/DatabricksSentimentPowerBI")
    .option("checkpointLocation", "/mnt/sample/check2")
    .start()

### 6.3 Crear tabla de datos para que Power BI se conecte

Necesitamos escribir los datos como formato **parquet** en el almacenamiento de blobs que pasan en la ruta de nuestro almacenamiento de blobs montado.

Añadimos las siguientes líneas al final del programa TweetSentiment.

In [None]:
//WRITE THE STREAM TO PARQUET FORMAT/////  
import org.apache.spark.sql.streaming.Trigger.ProcessingTime 
val result = streamingDataFrame
.writeStream
.format("parquet")
.option("path", "/mnt/DatabricksSentimentPowerBI")
.option("checkpointLocation", "/mnt/sample/check2")
.start()

Para verificar que los datos se escriban en el almacenamiento de blobs montado directamente desde el notebook Databricks, crea un nuevo notebook **DatasetValidation** y ejecuta los siguientes comandos para mostrar el contenido de los archivos de parquet directamente dentro de Databricks. Si los datos se escriben correctamente, una salida al consultar la tabla en Databricks debería ser similar a la siguiente:

fig nnn

### 6.4 Crear el notebook ScheduledTableCreate

Ahora tenemos transmisión de datos de Twitter con el sentimiento adjunto que fluye en un almacenamiento de blobs montado. El siguiente paso es conectar Databricks (y este conjunto de datos) directamente a PowerBIPower BI para su posterior análisis y disección de datos. 

Para hacer esto, necesitamos escribir los archivos de parquet en un conjunto de datos que PowerBIPower BI podrá leer con éxito a intervalos regulares (es decir, actualizar continuamente el conjunto de datos a intervalos específicos para el flujo de datos por lotes). Para hacer esto, cree el cuaderno final ScheduledDatasetCreation y ejecute el siguiente conjunto de comandos scala como un programa para ejecutarse cada minuto. (Esto actualizará la tabla creada cada 1 minuto con el flujo de datos)

### 7 Conectar Power BI al clúster de Databricks

Para permitir que PowerBIPower BI se conecte primero a Databricks, se requiere que la información de conexión JDBC de los clústeres se proporcione como una dirección de servidor para la conexión PowerBIPower BI. 

Para obtener esto, navegue al clúster dentro de Databricks y seleccione el clúster que se va a conectar. En la página del clúster, seleccione la pestaña JDBC / ODBC (Nota: **si no creó un espacio de trabajo Premium Databricks, esta opción no estará disponible**).

Para construir la dirección del servidor, tome la URL JDBC que se muestra en el clúster y haga lo siguiente: • Reemplace jdbc: hive2 con https. • Elimine todo en la ruta entre el número de puerto y sql que retiene los componentes para que tenga una url similar a la siguiente: • https://westeurope.azuredatabricks.net:443/sql/protocolv1/o/1406775902027556/0424- 131603-inky272 

jdbc:spark://southindia.azuredatabricks.net:443/default;transportMode=http;ssl=1;httpPath=sql/protocolv1/o/7401067854218687/0425-221454-peg462;AuthMech=3;UID=token;PWD=<personal-access-token>
    
#### Dirección el servidor:
    
https://southindia.azuredatabricks.net:443/sql/protocolv1/o/7401067854218687/0425-221454-peg462

Para generar el acceso personal a tokentoken, seleccione Configuración de usuario en el panel de control del clúster.

## Crear un clúster de Apache Spark dentro de Databricks

Para ejecutar notebooks para consumir el streming de data, primero se requiere un clúster. Para crear un clúster Apache Spark dentro de Databricks, cargue un Workspace desde el recurso Databricks se creó. Desde el portal de Databricks, seleccione Cluster.

dá click en: 'iniciar area de trabajo'

En un nuevo clúster, proporcione los siguientes valores para crear el clúster. ¡NOTA! - Para las capacidades de lectura / escritura en CosmosDB, se requiere una versión Apache Spark de 2.2.0. Al momento de escribir 2.3.0 aún no es compatible.

Adjuntar bibliotecas a Spark Cluster

Para permitir que la API de Twitter envíe tweets a Databricks y Databricks para leer y escribir datos en Event Hubs y CosmosDB, se requieren tres paquetes: 

• Conector Spark Event Hubs: com.microsoft.azure:azure-eventhubs-spark_2.11:2.3. 1 

• API de Twitter - org.twitter4j: twitter4j-core: 4.0.6 

• CosmosDB Spark Connector: http://repo1.maven.org/maven2/com/microsoft/azure/azure-cosmosdb-spark_2.2.0_2.11/ 1.1.1 / azure-cosmosdb-spark_2.2.0_2.11-1.1.1-uber.jar

Haga clic derecho en el espacio de trabajo de Databricks y seleccione Crear> Biblioteca. En la página Nueva biblioteca, seleccione Maven Coordinate e ingrese los nombres de las bibliotecas anteriores. se mantiene se encuentra aquí: https://github.com/Azure/azure-cosmosdb-spark




## Kit de herramientas de servicios cognitivos

Para calcular el sentimiento de los tweets, se requiere acceso a los Servicios Cognitivos de Microsoft. Esto permitirá que Databricks llame a la API de Text Analytics en tiempo casi real directamente desde el cuaderno y calcule el sentimiento de un tweet dado. Busque la API de Text Analytics en Azure Portal. Proporcione un nombre, ubicación y nivel de precios. (F0 será suficiente para los propósitos de esta demostración)

Una vez creado, haga clic en **Claves y puntos de conexión** y tome nota de la URL del punto final y la Clave principal que se utilizará. Estos valores serán necesarios para que Databricks llame con éxito a la API de Text Analytics.

endpoint:
https://tweetstopowerbi5.cognitiveservices.azure.com/


clave 1:

dae1210d95694ace8758d4cc0bea86d2

clave 2:

35a59dc1f1274710aa101ac432fb90d6


Crear notebooks Databricks

Para ejecutar el código, necesitaremos crear 4 cuadernos en el espacio de trabajo de Databricks creado de la siguiente manera:

* EventHubTweets (para enviar tweets al centro de eventos)
* TweetSentiment (Para calcular el sentimiento de la secuencia de tweets del centro de eventos)
* ScheduledDatasetCreation (Para crear y actualizar continuamente el conjunto de datos)
* DatasetValidation (para validar el conjunto de datos directamente dentro de Databricks)


### Crear tabla de datos para que Power BI se conecte

Primero, necesitamos escribir datos como formato **parquet** en el almacenamiento de blobs que pasan en la ruta de nuestro almacenamiento de blobs montado.

//WRITE THE STREAM TO PARQUET FORMAT/////  
import org.apache.spark.sql.streaming.Trigger.ProcessingTime 
val result = streamingDataFrame
.writeStream
.format("parquet")
.option("path", "/mnt/DatabricksSentimentPowerBI")
.option("checkpointLocation", "/mnt/sample/check2")
.start()

### Conecte Power BI al clúster de Databricks

Para permitir que PowerBIPower BI se conecte primero a Databricks, se requiere que la información de conexión JDBC de los clústeres se proporcione como una dirección de servidor para la conexión PowerBIPower BI. Para obtener esto, navegue al clúster dentro de Databricks y seleccione el clúster que se va a conectar. En la página del clúster, seleccione la pestaña JDBC / ODBC (Nota: si no creó un espacio de trabajo Premium Databricks, esta opción no estará disponible).



Para construir la dirección del servidor, tome la URL JDBC que se muestra en el clúster y haga lo siguiente: • Reemplace jdbc: hive2 con https. • Elimine todo en la ruta entre el número de puerto y sql que retiene los componentes para que tenga una url similar a la siguiente: 

https://westeurope.azuredatabricks.net:443/sql/protocolv1/o/1406775902027556/0424- 131603-inky272 


Para verificar que los datos se escriben en el almacenamiento de blobs montado directamente desde el cuaderno Databricks, cree un nuevo DatasetValidation del cuaderno y ejecute los siguientes comandos para mostrar el contenido de los archivos de parquet directamente dentro de Databricks. Si los datos se escriben correctamente, una salida al consultar la tabla en Databricks debería ser similar a la siguiente.

Ahora tenemos la transmisión de datos de Twitter con el sentimiento adjunto fluyendo hacia un almacenamiento de blobs montado. El siguiente paso es conectar Databricks (y este conjunto de datos) directamente a PowerBIPower BI para su posterior análisis y disección de datos. Para hacer esto, necesitamos escribir los archivos de parquet en un conjunto de datos que PowerBIPower BI podrá leer con éxito a intervalos regulares (es decir, actualizar continuamente el conjunto de datos a intervalos específicos para el flujo de datos por lotes). Para hacer esto, cree el cuaderno final ScheduledDatasetCreation y ejecute el siguiente conjunto de comandos scala como un programa para ejecutarse cada minuto. (Esto actualizará la tabla creada cada 1 minuto con el flujo de datos)

El paso final es conectar Databricks a PowerBIPower BI para permitir el flujo de datos por lotes y realizar análisis. Para hacer esto, abra el escritorio de PowerBIPower BI abierto y haga clic en Obtener datos. Seleccione Spark (beta) para comenzar a configurar la conexión del clúster Databricks.

https://southindia.azuredatabricks.net:443/sql/protocolv1/o/7401067854218687/0425-221454-peg462

Para generar el acceso personal a tokentoken, seleccione Configuración de usuario en el panel de control del clúster.

Token:

dapi50bc7f1e8a1b5a3b24c1a58d9696dea7