# Crear tabla storage de PARCE

Creación de la **tabla** de métricas relacionadas a las entradas o mensajes de usuarios, almacenados en nuestro contenedor de **Cosmo DB**.

**Celda #1**: Importar los datos del contenedor de almacenamiento en *Cosmos DB*.

* En *linea 6* se especifica el contenedor del cual se importan los datos.

In [None]:
# Write a Spark DataFrame into a Cosmos DB container

df = spark.read\
.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "CosmosDb1")\
.option("spark.cosmos.container", "bot-storage-FNA-v1")\
.load()

**Celda #2**: Seleccionar items del contenedor de Cosmos DB.

In [None]:
# Seleccionar campos de interés

Data = df.select("document","id")

**Celda #3**: Construir la tabla principal del bot.

El elemento **document** que seleccionamos en la celda anterior es una lista anidada, lurgo, se deben desplegar los elementos de dicha lista.

Los elementos anidados en **document** son:

* Entrada del usuario
* Respuesta del bot
* Fecha y hora
* Indicador de entendimiento del bot (**S** = Bot respondió - **N** = Bot no entendió la pregunta)

In [None]:
# 'Aplanar' columna principal

Data1 = Data.rdd.map(lambda x: (x.id,x.document["turn_question"],x.document["turn_answer"],x.document["turn_date"],x.document["turn_time"],x.document["turn_understood"]))\
.toDF(["id","turn_question","turn_answer","turn_date","turn_time","turn_understood"])

Data1 = Data1.withColumnRenamed("turn_question","usuario")
Data1 = Data1.withColumnRenamed("turn_answer","Respuesta")

**Celda #4**: Importar el *diccionario* de servicio y etiquetas de las bases de conocimiento.

In [None]:
# Importar .csv para asociar Etiquetas y Servicios.

Dicci = spark.read.load('abfss://bases@dlsfnaparce.dfs.core.windows.net/05_Diccionario.csv', format='csv'
, header=True
, delimiter='|'
, encoding="ISO-8859-1"
)

#--------------------
from pyspark.sql.functions import regexp_replace
Dicci = Dicci.withColumn('Respuesta', regexp_replace('Respuesta', "_u_",'\n'))
Dicci = Dicci.withColumn('Respuesta', regexp_replace('Respuesta', "_", ' '))
#--------------------

#print(Dicci.collect()[3]['Respuesta'])

**Celda #5**: Adicionar métricas de *etiqueta* y *servicio* a la **tabla principal**.

In [None]:
# Tabla final

Final = (Data1.join(Dicci, on = ['Respuesta'], how = 'left') )
Final = Final[['id','usuario','Respuesta','turn_date','turn_time','turn_understood','Etiqueta','Servicio']]

**Celda #6**: Renombrar indicador de entendimiento.

In [None]:
# Configurar indicador understood y Etiqueta

from pyspark.sql.functions import *

Final = Final.withColumn('Etiqueta', when(Final.turn_understood == 'N', 'Default Bot').otherwise(Final.Etiqueta))
Final = Final.withColumn('Servicio', when(Final.Etiqueta == 'Default Bot', 'Default Bot').otherwise(Final.Servicio))

**Celda #7**: Renombrar intenciones de *Saludo* y acciones del *Chit-Chat*.

In [None]:
Final = Final.withColumn('Etiqueta', when(Final.Etiqueta.isNull(), 'Saludo o ChitChat').otherwise(Final.Etiqueta))
Final = Final.withColumn('Servicio', when(Final.Etiqueta == 'Saludo o ChitChat', 'Saludo o ChitChat').otherwise(Final.Servicio))

**Celda #8**: Agregar *row_number*, para ejecución del *join* de Text Analytics.

In [None]:
from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window

w = Window().orderBy(lit('A'))
Final = Final.withColumn("row", row_number().over(w)-1)

Final = Final[['row','id','usuario','Respuesta','turn_date','turn_time','turn_understood','Etiqueta','Servicio']]

**Celda #9**: Visualización de la tabla, parcial.

In [None]:
# Final.printSchema()
display(Final)

# Final.show()

# Análisis de texto con *Text Analytics*

En esta sección, se usa el servicio de **Text Analytics** de Azure, para hacer análisis de texto de la entrada del usuario.

## Analísis de texto

Con el servicio de **Text Analytics** de *Azure* se pretenden enriquecer la tabla de información del chatbot, esta herramienta permite obtener métricas de **sentimientos** de la entrada del usuario.

**Celda #1**: Conectarse a **text_analytics_client** con las credenciales correspondentes del servicio desplegado.

In [None]:
from azure.core.credentials import AzureKeyCredential
from azure.ai.textanalytics import (
    TextAnalyticsClient,
    RecognizeEntitiesAction,
    AnalyzeSentimentAction,
)

credential = AzureKeyCredential("505c3ba3a57a4b749a5e5a96a589d39a")
endpoint="https://eastus2.api.cognitive.microsoft.com/"

text_analytics_client = TextAnalyticsClient(endpoint, credential)

**Celda #2**: La columna de interés es **Final.usuario** que corresponde a la entrada del usuario, luego, se procede como sigue:

1. Convertir **Final.usuario** en lista.
2. Pasar la lista anterior a la herramienta de *Text Analytics*, con: 1. Análisis de sentimientos y 2. Frases claves.

In [None]:
#--- Convertir df.usuario a lista
documents = list(Final.select('usuario').toPandas()['usuario'])

**Celda #3**: Crear una lista de n-tuplas, donde cada tupla relaciona el id posicional de la lista de entradas con sus respectivas métricas de sentimiento, y luego, reestructurarlo en un dataframe.

In [None]:
#------- Análisis de sentimientos
response = text_analytics_client.analyze_sentiment(documents, language="es")
result = [doc for doc in response if not doc.is_error]

n = list()

for doc in result:
    k = (doc.id
         ,doc.sentiment
         ,doc.confidence_scores.positive
         ,doc.confidence_scores.neutral
         ,doc.confidence_scores.negative)
    n.append(k)

#print(n)

#-- Crear DF
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType, FloatType

# Create a schema for the dataframe
schema = StructType([
    StructField('Id', StringType(), True),
    StructField('sentimiento', StringType(), True),
    StructField('score_positivo', FloatType(), True),
    StructField('score_neutral', FloatType(), True),
    StructField('score_negativo', FloatType(), True)
])

# Convert list to RDD
rdd = spark.sparkContext.parallelize(n)

# Create data frame
df01 = spark.createDataFrame(rdd,schema)
# print(df.schema)
df01 = df01.withColumnRenamed("Id","row")
df01.show()

In [None]:
#------- Detector de frases clave
response = text_analytics_client.extract_key_phrases(documents, language="es")
result = [doc for doc in response if not doc.is_error]

n = list()

for doc in result:
    k = (doc.id
         ,doc.key_phrases)
    n.append(k)

#print(n)

#-- Crear DF
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType, FloatType

# Create a schema for the dataframe
schema = StructType([
    StructField('Id', StringType(), True),
    StructField('frase_clave', StringType(), True)
])

# Convert list to RDD
rdd = spark.sparkContext.parallelize(n)

# Create data frame
df02 = spark.createDataFrame(rdd,schema)
# print(df.schema)
df02 = df02.withColumnRenamed("Id","row")
df02.show()

**Celda #4**: Join de los dataframe de nuevas métricas.

In [None]:
# Join de métricas nuevas

inter = (df01.join(df02, on = ['row'], how = 'left') )
#inter.show()

**Celda #5**: Relacionar con *join* el df Final con el df de métricas de sentimientos.

In [None]:
# Tabla parcial de sentimientos

View = (Final.join(inter, on = ['row'], how = 'inner') )
display(View)

In [None]:
View.printSchema()

# Predicción de intenciones con LUIS

En esta sección, con la app **LUIS** de Azure en SDK, se crean 2 nuevas columnas:
* **appLUIS**: Prediccion de intención de LUIS de la columna *usuario*.
* **scoreLUIS**: Score de la predicción de LUIS.

**Celda #1**: Importación de librerias.

In [None]:
from azure.cognitiveservices.language.luis.runtime import LUISRuntimeClient
from msrest.authentication import CognitiveServicesCredentials
from azure.cognitiveservices.language.luis.authoring import LUISAuthoringClient
from azure.cognitiveservices.language.luis.authoring.models import ApplicationCreateObject
import requests
import json
from types import SimpleNamespace
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType, FloatType

**Celda #2**: Configuración de credenciales.

In [None]:
#----------- Credenciales
authoringKey = 'ad5103875f974213af5db4ca3d997460'
authoringEndpoint = 'https://cog-luis-fna-parce-authoring.cognitiveservices.azure.com/'

app_id = "7e20d5a3-2949-449d-b550-0f2e66ddaa40"
predictionKey = 'bc07e0dc5d25489091aad03387caf06a'
predictionEndpoint = 'https://EastUS2.api.cognitive.microsoft.com/'

#----------- Client
client = LUISAuthoringClient(authoringEndpoint, CognitiveServicesCredentials(authoringKey))

#----------- Client runtime
runtimeCredentials = CognitiveServicesCredentials(predictionKey)
clientRuntime = LUISRuntimeClient(endpoint=predictionEndpoint, credentials=runtimeCredentials)

**Celda #3**: Creación de función de consulta a la appLUIS.

In [None]:
def LUIS(x):
    x = str(x)

    #----------- Configuración del REST CALL
    headers = {}
    params ={'query': x, 'subscription-key': predictionKey}

    # Make the REST call.
    response = requests.get(f'{predictionEndpoint}luis/prediction/v3.0/apps/{app_id}/slots/production/predict'
                            , headers=headers
                            , params=params)
    
    from types import SimpleNamespace    
    a = response.json()
    m = SimpleNamespace(**a)

    b = m.prediction
    n = SimpleNamespace(**b)

    c = n.intents
    o = SimpleNamespace(**c)

    k = list(c.values())
    p = k[0]
    q = SimpleNamespace(**p)
    
    #, q.score
    return str(n.topIntent)+'-'+str(q.score)

**Celda #4**: Convertir la función anterior a **UDF**.

In [None]:
# Convert a Python function to PySpark UDF

from pyspark.sql.functions import udf, col

""" Converting function to UDF """
LUIS_UDF = udf(lambda z: LUIS(z),StringType())

**Celda #5**: Aplicar la función UDF a la columna View.usuario.

In [None]:
# Aplicar la funcion UDF de LUIS a la columna de usuario
View = Final.withColumn("LUIS", LUIS_UDF(col("usuario")))

**Celda #6**: Split de la nueva columna *LUIS*.

In [None]:
import pyspark

# Split de la columna LUIS
split_cols = pyspark.sql.functions.split(View['LUIS'], '-')
  
# Aplicar split con '-'
View = View.withColumn('appLUIS', split_cols.getItem(0)).withColumn('LUIS_score', split_cols.getItem(1))

# Eliminar col LUIS
View = View.drop('LUIS')

# Ajustar tipo de col en score
View = View.withColumn("LUIS_score", View["LUIS_score"].cast(FloatType()))

Visualizar

In [None]:
display(View)
View.printSchema()

# Guardar la tabla
En esta sección se ejecutan los pasos para **guardar** la tabla creada en las secciones anteriores.

**Celda #1**: Crear la tabla *Final*. Se ejecutan una única vez (*Crear el if condicional para que no haya error al volver a ejecutar*).

In [None]:
# 
View.createOrReplaceTempView("Final")

In [None]:
%%spark
val scala_df = spark.sqlContext.sql("SELECT * FROM Final")
scala_df.write.synapsesql("syndwfnaparce.dbo.Final", Constants.INTERNAL)

**Celda #2**: Conexión con el servicio de datalake.

In [None]:
spark.conf.set(
  "fs.azure.account.key.dlsfnaparce.blob.core.windows.net",
  "1y5feFvWNy6C4YGZlGlR/9wEwLeG51EDyGqsgkOiOLW2AndQmnj4PE7Ri+TgRJIC0Mghu5Z4KgqwjdQHZEhE0w==")

**Celda #3**: Relacionamos un **contenedor** (*test*) para crear la carpeta *'storage_bot'*, donde se guarda la información con la extención **.parquet**.

In [None]:
target_folder_path = 'abfss://test@dlsfnaparce.dfs.core.windows.net/storage_bot/'

View.write.format("parquet").mode("overwrite").save(target_folder_path)

**Celda #4**: Ingresar a la ruta definida anteriormente, y visualizar lo que se ha guardado.

In [None]:
df_datos = spark.read.load('abfss://test@dlsfnaparce.dfs.core.windows.net/storage_bot/*.parquet', format='parquet')
display(df_datos)