# Script de Firebird

Este script se ejecuta diariamente para procesar todas las campañas de márketing activas.

En primer lugar descarga de Firestore la lista de campañas activas. Para cada una de ellas:

1. Obtiene una muestra de tweets relevantes para la campaña.
2. Analiza los tweets mediante un modelo PLN.
3. Procesa los resultados para obtener conclusiones sobre el total de los datos, y agrupadas por provincias.
4. Almacena los resultados en Firestore.

In [1]:
# -----------------------------------------------------------------------------
# Importar librerías y credenciales
# -----------------------------------------------------------------------------

try:
    import tweepy
    from firebase_admin import credentials, firestore, initialize_app
    from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline
except ImportError:
    raise ImportError("La aplicación requiere las librerías tweepy, firebase_admin y transformers.")    

try:
    import keys
except ImportError:
    raise ImportError("No se encuentra el archivo 'keys.py' con las credenciales de la API de Twitter.")

import logging
logging.getLogger().setLevel(logging.INFO)

import datetime, os

import pandas as pd



  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# -----------------------------------------------------------------------------
# Descargar modelo (sólo si no se ha descargado previamente)
# -----------------------------------------------------------------------------

model_path = './models/transformers/' 

if not os.path.exists('./models/transformers'):
    
    model_name = "nlptown/bert-base-multilingual-uncased-sentiment"

    model = AutoModelForSequenceClassification.from_pretrained(model_name)
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    classifier = pipeline('sentiment-analysis', model=model, tokenizer=tokenizer)
    classifier.save_pretrained(model_path)

    logging.info("Modelo descargado.")


In [3]:
# -----------------------------------------------------------------------------
# Inicializar servicios
# -----------------------------------------------------------------------------

logging.info("Inicializando servicios...")

# Firestore
cred = credentials.Certificate('service-acc-key.json')
initialize_app(cred)
db = firestore.client()
logging.info("Firestore inicializado.")

# Tweepy
client = tweepy.Client(bearer_token=keys.bearer, access_token=keys.access_token, access_token_secret=keys.access_token_secret)
logging.info("Tweepy inicializado.")

# Clasificador
model = AutoModelForSequenceClassification.from_pretrained(model_path, local_files_only=True)
tokenizer = AutoTokenizer.from_pretrained(model_path, local_files_only=True)
classifier = pipeline('sentiment-analysis', model=model, tokenizer=tokenizer)
logging.info("Modelo NLP inicializado.")


INFO:root:Inicializando servicios...
INFO:root:Firestore inicializado.
INFO:root:Tweepy inicializado.
INFO:root:Modelo NLP inicializado.


In [4]:
# -----------------------------------------------------------------------------
# Inicializar fechas
# -----------------------------------------------------------------------------

currentTimezone = datetime.datetime.now().astimezone().tzinfo
currentTime = datetime.datetime.now(tz=currentTimezone)
currentDay = datetime.datetime(currentTime.year, currentTime.month, currentTime.day, tzinfo=currentTimezone)
targetDay = currentDay - datetime.timedelta(days=1)

# Fechas en formato ISO (YYYY-MM-DD), que utilizo como identificadores en Firestore
dateCode = targetDay.date().isoformat()

listaProvincias = ["A Coruña", "Álava", "Albacete", "Alicante", "Almería", "Asturias", "Ávila", "Badajoz", "Baleares", "Barcelona", "Burgos", "Cáceres", "Cádiz", "Cantabria", "Castellón", "Ciudad Real", "Córdoba", "Cuenca", "Girona", "Granada", "Guadalajara", "Gipuzkoa", "Huelva", "Huesca", "Jaén", "La Rioja", "Las Palmas", "León", "Lérida", "Lugo", "Madrid", "Málaga", "Murcia", "Navarra", "Ourense", "Palencia", "Pontevedra", "Salamanca", "Segovia", "Sevilla", "Soria", "Tarragona", "Santa Cruz de Tenerife", "Teruel", "Toledo", "Valencia", "Valladolid", "Vizcaya", "Zamora", "Zaragoza"]   


In [5]:
# -----------------------------------------------------------------------------
# Recopilar tweets
# -----------------------------------------------------------------------------
import math

def collectTweets(query, numTweets):    

    tweetIDs = []
    tweetTexts = []
    tweetLocations = []

    listaUsuarios = []

    if numTweets > 1000:
        raise Exception("Para evitar gastos, la aplicación no admite analizar más de 1000 tweets por campaña y día.")

    # max_results es el número de resultados por página (limitado a 100 por la API de Twitter),
    # y limit es el número de páginas
    for response in tweepy.Paginator(client.search_recent_tweets, query=query, max_results=100, 
    start_time=targetDay, end_time=currentDay, expansions= 'author_id', user_fields=['location'], 
    limit=math.ceil(numTweets/100)):

        # Sólo quiero analizar un máximo de un tweet por persona
        # Si un usuario ha escrito varios tweets, aparece varias veces en la lista de tweets pero sólo una
        # en la lista de usuarios, por lo que necesito un contador separado para esta lista
        contadorUsuarios = 0

        for i in range(response.meta['result_count']):

            autor = response.data[i]['author_id']
            if autor in listaUsuarios:            
                continue
            listaUsuarios.append(autor)

            tweetIDs.append(response.data[i].id)
            tweetTexts.append(response.data[i].text)
            tweetLocations.append(response.includes['users'][contadorUsuarios].location)

            contadorUsuarios += 1

    # Obtenemos también el número total de tweets, por si resultara ser superior al límite de tweets
    totalTweets = client.get_recent_tweets_count(query=query, start_time = targetDay, end_time=currentDay, 
    granularity='day').meta['total_tweet_count']

    return (tweetIDs, tweetTexts, tweetLocations, totalTweets)

In [6]:
# -----------------------------------------------------------------------------
# Realizar análisis
# -----------------------------------------------------------------------------
def analyzeText(texts):

    def getClassification(result):
        # La label es un string (p.ej. '5 stars'), hago [0] para seleccionar sólo el número
        label = int(result['label'][0])
        score = result['score']

        # El tweet es "neutral" si su puntuación es de 3 estrellas, o bien si el modelo
        # no nos da suficiente precisión para considerarlo positivo o negativo
        if label == 3 or score < 0.3:
            return 0
        elif label > 3:
            return 1
        else:
            return -1

    return [getClassification(result) for result in classifier(texts)]

In [7]:
# -----------------------------------------------------------------------------
# Obtener las ciudades a partir de la información de localización
# -----------------------------------------------------------------------------

def getCities(tweetLocations):

    def normalizeCityName(s):
        return s.lower().replace("á", "a").replace("é", "e").replace("í", "i").replace("ó", "o").replace("ú", "u")

    listaProvincias = ["A Coruña", "Álava", "Albacete", "Alicante", "Almería", "Asturias", "Ávila", "Badajoz", "Baleares", "Barcelona", "Burgos", "Cáceres", "Cádiz", "Cantabria", "Castellón", "Ciudad Real", "Córdoba", "Cuenca", "Girona", "Granada", "Guadalajara", "Gipuzkoa", "Huelva", "Huesca", "Jaén", "La Rioja", "Las Palmas", "León", "Lérida", "Lugo", "Madrid", "Málaga", "Murcia", "Navarra", "Ourense", "Palencia", "Pontevedra", "Salamanca", "Segovia", "Sevilla", "Soria", "Tarragona", "Santa Cruz de Tenerife", "Teruel", "Toledo", "Valencia", "Valladolid", "Vizcaya", "Zamora", "Zaragoza"]
    listaProvinciasN = [normalizeCityName(provincia) for provincia in listaProvincias]

    def findLocationInCityList(location):
        if location:
            location = normalizeCityName(location)
            for provincia in listaProvinciasN:
                if provincia in location:
                    return provincia
        return None

    return [findLocationInCityList(location) for location in tweetLocations]

In [8]:
# -----------------------------------------------------------------------------
# Generar estadísticas
# -----------------------------------------------------------------------------

def generateStats(tweetIDs, tweetSentiments, tweetCities, totalTweets):
    df = pd.DataFrame({"id": tweetIDs, "sentiment": tweetSentiments, "city": tweetCities})

    # Para asegurar la trazabilidad del dato, estas DataFrames se almacenan como backup.
    # Para ahorrar en espacio, no almacenamos el texto, ya que se puede obtener
    # a partir del tweetID en caso de ser necesario.
    df.to_pickle(f"backups/{dateCode}-{userID}.bk")

    results = {}
    multiplier = 1

    if totalTweets > limit:
        analyzedTweets = len(df)
        multiplier = totalTweets / analyzedTweets

    def convertFormat(pdSeries):
        d = dict(pdSeries)
        return (int(d[1]*multiplier) if 1 in d else 0, 
        int(d[0]*multiplier) if 0 in d else 0, 
        int(d[-1]*multiplier) if -1 in d else 0)

    results['total'] = convertFormat(df['sentiment'].value_counts())

    df_cities = df.groupby('city')
    vc_cities = df_cities['sentiment'].value_counts()

    for city in list(vc_cities.index.get_level_values('city')):
        results[city] = convertFormat(vc_cities[city])

    return results

In [9]:
# -----------------------------------------------------------------------------
# Subir la información a Firestore
# -----------------------------------------------------------------------------
def uploadFirestore(stats):
    for city, results in stats.items():
        # Firestore no permite documentos vacíos, por lo que añado una variable dummy
        # al documento correspondiente a cada día
        db.collection('data').document(userID).collection("days").document(dateCode).set({'dummy': True})
        db.collection('data').document(userID).collection("days").document(dateCode).collection("cities").document(city).set({
            'pos': results[0],
            'neut': results[1],
            'neg': results[2]
        })


In [10]:
# -----------------------------------------------------------------------------
# Procesar las campañas
# -----------------------------------------------------------------------------
campaigns = db.collection('data').stream()

logging.info('Cargando lista de campañas.')

for doc in campaigns:

    campaign = doc.to_dict()
    # Ejemplo de formato:
    # {'tweetLimit': 1000, 
    # 'isActive': True, 
    # 'duration': 100, 
    # 'userID': 'pruebaEspaña', 
    # 'query': '#españa', 
    # 'start': DatetimeWithNanoseconds(2022, 6, 4, 21, 12, 57, 524937, tzinfo=datetime.timezone.utc)}

    if campaign['isActive']:

        query = campaign['query']
        limit = campaign['tweetLimit']
        userID = campaign['userID']
        lastUpdate = campaign['lastUpdate']

        # Comprobar que la campaña sigue estando activa
        # En caso contrario, marcarla como inactiva y pasar a la siguiente
        if campaign['start'] + datetime.timedelta(days = campaign['duration']) < datetime.datetime.now(tz=datetime.timezone.utc):
            doc.reference.update({'isActive': False})
            logging.info(f'La campaña {userID} ha caducado; se marca como inactiva.')
            continue

        # Si ya hemos analizado la campaña hoy, pasar a la siguiente
        if lastUpdate == dateCode:
            logging.info(f'La campaña {userID} ya ha sido analizada en la fecha {dateCode}.')
            continue

        logging.info(f'Procesando campaña activa: {userID}.')

        tweetIDs, tweetTexts, tweetLocations, totalTweets = collectTweets(query, limit)
        
        if totalTweets == 0:
            logging.info(f'No se han encontrado tweets, pasamos a la siguiente campaña.')
            continue

        logging.info(f'Recolectada una muestra de {min(limit, totalTweets)} tweets, de un total de {totalTweets}.')

        tweetSentiments = analyzeText(tweetTexts)  
        tweetCities = getCities(tweetLocations)
        logging.info(f'{min(limit, totalTweets)} tweets analizados.')

        stats = generateStats(tweetIDs, tweetSentiments, tweetCities, totalTweets)
        logging.info(f'Estadísticas calculadas.')

        uploadFirestore(stats)
        doc.reference.update({'lastUpdate': dateCode})
        logging.info(f'Resultados subidos a Firestore.')
        logging.info(f'Campaña {userID} analizada.')

logging.info(f'Ejecución completada.')

INFO:root:Cargando lista de campañas.
INFO:root:La campaña Mercadona ya ha sido analizada en la fecha 2022-06-04.
INFO:root:Procesando campaña activa: atleti.
INFO:root:Recolectada una muestra de 1000 tweets, de un total de 4115.
INFO:root:1000 tweets analizados.
INFO:root:Estadísticas calculadas.
INFO:root:Resultados subidos a Firestore.
INFO:root:Campaña atleti analizada.
INFO:root:La campaña total ya ha sido analizada en la fecha 2022-06-04.
INFO:root:Ejecución completada.


In [None]:
# -----------------------------------------------------------------------------
# Obtener trending topics
# -----------------------------------------------------------------------------