#### Configuración de token SAS para la conexión con Azure

In [None]:
spark.conf.set("fs.azure.account.auth.type.tweetsfiles.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.tweetsfiles.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.tweetsfiles.dfs.core.windows.net", "sv=2022-11-02&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2024-07-07T22:27:21Z&st=2024-05-04T14:27:21Z&spr=https&sig=hNwdq5gK3xFjA6yJworsK95Kg13Piu1wunXuPB6fnJY%3D")

#### Instalación de dependencias necesarias
* Libreria de sentimiento utilizada: https://github.com/sentiment-analysis-spanish/sentiment-spanish/blob/master/README.md

In [None]:
#%pip install transformers
#%pip install torch
#dbutils.library.restartPython()

#### Imports

In [None]:
from transformers import pipeline
from pyspark import SparkContext
import pyspark.sql.functions as fn
import pyspark.sql.types as tipos
sc = SparkContext.getOrCreate()
#variables con los valores de Azure
filename = dbutils.widgets.get('file_name_origin')
#routename = dbutils.widgets.get('save_route_databricks')

#### Selección de datos para sacar la columna del sentimiento y la URL de cada tweet

In [None]:
esquema = tipos.StructType([
    tipos.StructField("user_id",tipos.LongType(),True),
    tipos.StructField("username",tipos.StringType(),True),
    tipos.StructField("tweet_id",tipos.LongType(), True),
    tipos.StructField("created_at",tipos.StringType(),True),
    tipos.StructField("text",tipos.StringType(),True),
    tipos.StructField("language",tipos.StringType(),True),
    tipos.StructField("reply_count",tipos.IntegerType(),True),
    tipos.StructField("retweet_count",tipos.IntegerType(),True),
    tipos.StructField("likes",tipos.IntegerType(),True),
    tipos.StructField("view_count",tipos.FloatType(),True),
    tipos.StructField("location",tipos.StringType(),True),
    tipos.StructField("hashtags",tipos.StringType(),True),
    tipos.StructField("in_reply_to",tipos.StringType(),True),
    tipos.StructField("quote",tipos.StringType(),True),
    tipos.StructField("quote_count",tipos.IntegerType(),True),
    tipos.StructField("possibly_sensetive",tipos.BooleanType(),True),
    tipos.StructField("latitud",tipos.FloatType(),True),
    tipos.StructField("longitud",tipos.FloatType(),True),
    tipos.StructField("temas",tipos.StringType(),True),
    tipos.StructField("primaryID",tipos.IntegerType(),False)
])


#Indicamos el archivo a procesar
file_location = f'abfss://rawdata@tweetsfiles.dfs.core.windows.net/processed_dataflow_csv/{filename}'
raw_tweets = spark.read.schema(esquema).format("csv").option("delimiter", "*").option("header", "true").load(file_location).distinct()

sentiment_classifier = pipeline(
    model="lxyuan/distilbert-base-multilingual-cased-sentiments-student", 
    top_k=1
)

# Definimos la UDF para obtener el sentimiento => return esperado: 'positivo, 0.42542972'
def apply_sentiment(text):
    texto = str(text)
    sentiment = sentiment_classifier(texto)
    score_sentiment = sentiment[0][0]['score']
    label_sentiment = sentiment[0][0]['label']

    if label_sentiment == 'positive' and score_sentiment <= 0.61:
        label_sentiment = 'neutral'
    return f"{label_sentiment}, {score_sentiment}"
sentiment_udf = udf(apply_sentiment)


# Definimos la UDF para sacar modificar los valores nulos
def apply_clear_null(text):
    if text is None:
        text = "No especificado"
    return text
clear_null_udf = udf(apply_clear_null)

tweets = (raw_tweets
             .select("primaryID","user_id","username", "tweet_id", "created_at", "retweet_count", "likes", "view_count", "hashtags", "text", "location", "temas", "latitud", "longitud")
             # Sacamos la URL y la eliminamos del texto principal. Ademas, normalizamos el texto eliminando las tildes
             .withColumn("URL", fn.regexp_extract("text", r'(http)\S+', 0))
             .withColumn("text", fn.regexp_replace(fn.translate("text", "áéíóú", "aeiou"), r'(http)\S+', ""))
             # Creamos la columna de sentimientos y de ahí, sacamos 2 columnas [score y el valor]
             .withColumn("Sentimientos", sentiment_udf('text'))
             .withColumn("Score", fn.substring(fn.split("Sentimientos",r',\s')[1], 0, 4))
             .withColumn("Sentimientos", fn.split("Sentimientos", r',\s')[0])
             # Normalizamos los valores nulos
             .withColumn("location", clear_null_udf("location"))
             .withColumn("hashtags", clear_null_udf("hashtags"))
             )





#### Guardar los datos en el DataLake

In [None]:
#ruta = f'abfss://outputdata@tweetsfiles.dfs.core.windows.net/{routename}'
ruta = f'abfss://outputdata@tweetsfiles.dfs.core.windows.net/lib/{filename}'
ruta_temporal = f'abfss://outputdata@tweetsfiles.dfs.core.windows.net/tmp'
tweets.repartition(1).write.format("CSV").option("header", "true").option("delimiter", "*").mode("overwrite").save(ruta_temporal)

contenedor = dbutils.fs.ls(ruta_temporal)
for fichero in contenedor:
  if '.csv' in fichero.name:
    file = fichero.name

dbutils.fs.cp(f'{ruta_temporal}/{file}', ruta)
dbutils.fs.rm(ruta_temporal, True)