In [1]:
import numpy as np
import pandas as pd
import json
import os
from dotenv import load_dotenv
import pymysql.cursors
import datetime
import telegram

In [2]:
load_dotenv();

In [3]:

    
ssl_config = {
    "ssl_ca": os.getenv('CA-CERTIFICATE'),
}
    
db_config = {
    "user": os.getenv('USER_MYSQL'),
    "password": os.getenv('PASSWORD'),
    "host": os.getenv('HOST'),  
    "port": int(os.getenv('PORT')),
    "database": os.getenv('DATABASE'),
    "ssl":ssl_config
} 

def connectToDatabase():
    try:    
        conn = pymysql.connect(**db_config)    
        return conn
    except pymysql.Error as err:
        return err

def connectHealth():
    try:
        conn = connectToDatabase()
        cursor = conn.cursor() 
        cursor.execute('SHOW TABLES;')
        result = cursor.fetchall()
        disconnectToDatabase(conn, cursor)
        return f"Connected to database: {list(result)}"
    except AttributeError as atr_err:
        return atr_err
    

def disconnectToDatabase( conn, cursor):
    # Close the cursor and connection
    cursor.close()
    conn.close()

def getTheLastDate(view: str):
     try:
         conn = connectToDatabase()
         cursor = conn.cursor() 
         #query = f"SELECT * FROM {view} ORDER BY STR_TO_DATE(responseTime, '%Y-%m-%d %H:%i:%s') DESC;"
         query = f"SELECT * FROM {view} ORDER BY responseTime DESC;"
         cursor.execute(query)
         result = cursor.fetchone()
         disconnectToDatabase(conn, cursor)
         return result
     except AttributeError as atr_err:
         return atr_err

def getTheDataframe(view: str, lastTimeSub:int, lastTime: str):
     try:
         conn = connectToDatabase()
         cursor = conn.cursor() 
         #query = f"SELECT *FROM {view} WHERE STR_TO_DATE(responseTime, '%Y-%m-%d %H:%i:%s') BETWEEN STR_TO_DATE('{lastTimeSub}', '%Y-%m-%d %H:%i:%s') AND STR_TO_DATE('{lastTime}', '%Y-%m-%d %H:%i:%s') ORDER BY STR_TO_DATE(responseTime, '%Y-%m-%d %H:%i:%s') DESC;"
         query = f"SELECT *FROM {view} WHERE responseTime BETWEEN '{lastTimeSub}' AND '{lastTime}' ORDER BY responseTime DESC;"
         cursor.execute(query)
         result = cursor.fetchall()
         disconnectToDatabase(conn, cursor)
         return result
     except AttributeError as atr_err:
         return atr_err   

In [15]:

bot = telegram.Bot(token=os.getenv('BOT_TOKEN'))
chat_id_green = os.getenv('CHAT_ID_GREEN')
chat_id_yellow = os.getenv('CHAT_ID_YELLOW')
chat_id_red = os.getenv('CHAT_ID_RED')
 
#Private
def getRepeatedValuesInAColumn(df, columnName):
    '''
    Identifies the repeated values of a column in a dataframe
    Args:
       df (DataFrame) : Dataframe which contains the columns to search
       columnName (string) : Column's name to search
    Returns:
       duplicates['VIN'] (pandas Series): Returns a column as a pandas series of the duplicated values of the columnName
    '''
    duplicates = df[df.duplicated(subset=[columnName], keep=False)]
    return duplicates['VIN']

#Private
def searchValuesInADataframe(df, values:list, columnA: str, columnB: str)-> dict:
    '''
    Optimized version to search values in a dataframe columnA and retrieve the corresponding values of columnB.
    Args:
       values (list): The list of values to be searched.
       columnA (str): The name of the column to search.
       columnB (str): The name of the column to retrieve values from.
       df (DataFrame): The dataframe to search in.
    Returns:
      grouped: A dictionary with keys as the values from columnA and values as the list of corresponding entries from columnB.
    '''
    
    filtered_df = df[df[columnA].isin(values)]

    grouped = filtered_df.groupby(columnA)[columnB].apply(list).to_dict()

    return grouped   


def verifyInfoRepeatedVins(df):
    '''
    Verifies every repeated VIN has the same information in every record (due to the short period time reviewed)
    Args:
       df (DataFrame): The dataframe where where will be searched
    Returns:
      ... (dict): A dictionary which contains the different responses as a dict for every repeated VIN in uniqueResponses and VINs with more than one response in differentResponse key
    '''
    uniqueResponses = {}
    differentResponses = {}
    vinValues = getRepeatedValuesInAColumn(df, 'VIN').unique().tolist()
    repeatedValues = searchValuesInADataframe(df, vinValues, 'VIN', 'responseBody')
    repeatedVins = repeatedValues.keys()
    for vin in repeatedVins:
        uniqueResponses[vin] = []
        listJson = []
        for responseString in repeatedValues[vin]:
            responseJson = json.loads(responseString)
            listJson.append(responseJson)
        repeatedValues[vin] = listJson
    for vin in repeatedVins:
        for responseJson in repeatedValues[vin]:
            keys = list(responseJson.keys())
            info = {}
            if 'anioModelo' in keys:
                if 'fabricante' in keys and 'paisOrigen' in keys:
                    info = {'anioModelo': responseJson['anioModelo'], 'fabricante': responseJson['fabricante'], 'marca': responseJson['marca'], 'modelo': responseJson['modelo'], 'paisOrigen': responseJson['paisOrigen'], 'robo': responseJson['robo'], 'roboFecha': responseJson['roboFecha'], 'codes': []}
                else:
                     info = {'anioModelo': responseJson['anioModelo'], 'marca': responseJson['marca'], 'modelo': responseJson['modelo'], 'robo': responseJson['robo'], 'roboFecha': responseJson['roboFecha'], 'codes': []}
            if 'mensajes' in keys and type(responseJson['mensajes'])== 'list' and responseJson['mensajes'] != []:
                    for message in responseJson['mensajes']:
                        if 'codes' in list(message.keys()):
                            info['codes'].append(message['codigo'])
                            info['codes'] = list(set(info['codes']))
                    if info.get('codes')!= None and len(info['codes'])>1:
                        info['codes'] = info['codes'].sort()
            if uniqueResponses[vin] == []:
                 uniqueResponses[vin].append(info)
            else:
                if info not in uniqueResponses[vin]:
                    uniqueResponses[vin].append(info)
                    differentResponses[vin] = info
    return {'uniqueResponses': uniqueResponses, 'differentResponses': differentResponses}

async def distinctValueVin(regularizationFrame, chat_id_red, chat_id_green, chat_id_yellow):
    differentResponses = verifyInfoRepeatedVins(regularizationFrame)['differentResponses']
    responsesKeys = differentResponses.keys()
    
    if len(responsesKeys) == 1:
        message = f'Alerta media: Existe VIN con distintos valores para una consulta{responsesKeys[0]}:{differentResponses[responsesKeys[0]]}'
        await send_telegram_message(message, chat_id_yellow)
    elif len(responsesKeys)>1:
        message = f'Alerta alta: Existen VINs con distintos valores para una consulta {responsesKeys}'
        await send_telegram_message(message, chat_id_red)
    else:
        message = 'Sin alerta de incongruencias entre VINs repetidos'
        await send_telegram_message(message, chat_id_green)

def proveErrorAlert(df):
    '''
    Assures every error in CarfaxUsaData is associated with an alert
    Args:
       df (DataFrame) : Dataframe which contains the columns to substraction
    Returns:
       ... (int): Returns a count
    '''
    carfaxUsaData = df['carfaxUsaData'].values.tolist()
    conError = 0
    conErrorSinAlerta = 0
    for index1 in range(len(carfaxUsaData)):
        carfaxDict = json.loads(carfaxUsaData[index1])
        if 'error' in list(carfaxDict.keys()):
            conError += 1
            if df.iloc[index1]['alertas'] == []:
                conErrorSinAlerta+=1
    return conErrorSinAlerta

async def carfaxAlertsRelation(regularizationFrame, maxCarfaxAlerts, chat_id_red, chat_id_green, chat_id_yellow):
    carfaxAlerts = proveErrorAlert(regularizationFrame)
    if carfaxAlerts == maxCarfaxAlerts:
        message = f'Alerta media: Existen {carfaxAlerts} registros con error y sin alerta'
        await send_telegram_message(message, chat_id_yellow)
    elif carfaxAlerts > maxCarfaxAlerts:
        message = f'Alerta alta: Existen {carfaxAlerts} registros con error y sin alerta'
        await send_telegram_message(message, chat_id_red)
    else:
        message = 'Sin errores entre mensajes carfaxUsaData y Alertas'
        await send_telegram_message(message, chat_id_green)
        
async def httpCodesAlerts(regularizationFrame, maxCodeAlerts, errorCodes, chat_id_red, chat_id_green, chat_id_yellow):
    regularizationFrame['responseCode'] = regularizationFrame['responseCode'].astype(int)
    bad_codes = regularizationFrame[regularizationFrame['responseCode'] >= errorCodes]
    
    bad_code_counts = bad_codes['responseCode'].value_counts()
    
    for code, count in bad_code_counts.items():
        if count == 2:
            message = f'Alerta media de errores HTTP con código {code} del microservicio {nombreMicroservicio}'
            await send_telegram_message(message, chat_id_yellow)
        elif count > 2:
            message = f'Alerta alta de errores HTTP con código {code} del microservicio {nombreMicroservicio}'
            await send_telegram_message(message, chat_id_red)
    
        if list(bad_code_counts)==[]:
            message = 'Sin errores de código HTTP'
            await send_telegram_message(message, chat_id_green)        

async def recordsAlerts(regularizationFrame, meanStdRecords, lastTimeSub, days_of_week, chat_id_red, chat_id_green, chat_id_yellow):    
    key = f'{days_of_week[lastTimeSub.weekday()]}_{lastTimeSub.hour}'
    
    meanStd = meanStdRecords[f'{days_of_week[lastTimeSub.weekday()]}_{lastTimeSub.hour}']
    
    std = meanStd['std']
    mean = meanStd['mean']
    
    if std/mean >0.6:
        maxRecords = mean/6 + (1/3)*std
        minRecords = mean/6 - (1/3)*std
    else:
        maxRecords = mean/6 + (1/4)*std
        minRecords = mean/6 - (1/4)*std
        
    if minRecords < 0:
        minRecords = 0
    
    recordsNumber = regularizationFrame.shape[0]
    
    if recordsNumber> maxRecords and recordsNumber/maxrecords < 2:
        message = f'Alerta media de cantidad de registros. Se registran más de {maxRecords} para la hora: {recordsNumber}'
        await send_telegram_message(message, chat_id_yellow)
    elif recordsNumber> maxRecords and recordsNumber/maxrecords >= 2:
        message = f'Alerta alta de cantidad de registros. Se registran más de {maxRecords} para la hora: {recordsNumber}'
        await send_telegram_message(message, chat_id_red)
    else:
        print('Sin alerta. El tráfico no rebasa la cota superior.')
    if recordsNumber< minRecords and minRecords/recordsNumber < 2:
        message = f'Alerta media de cantidad de registros. Se registran menos de {minRecords} para la hora: {recordsNumber}'
        await send_telegram_message(message, chat_id_yellow)
    elif recordsNumber< minRecords and minRecords/recordsNumber >= 2:
        message = f'Alerta alta de cantidad de registros. Se registran menos de {minRecords} para la hora: {recordsNumber}'
        await send_telegram_message(message, chat_id_red)
    else:
        message = 'Sin alerta. El tráfico no rebasa la cota inferior.'
        await send_telegram_message(message, chat_id_green) 

def calculateDeltaTime(df, columnA, columnB):
    '''
    Converts columnA and columnB in datetime type and makes the difference between both columns
    Args:
       df (DataFrame) : Dataframe which contains the columns to substraction
       columnA (string) : Minuend column
       columnB (string): Substrahend column
    Returns:
       responsePeriod (DataFrame): Returns a column of the seconds difference as a dataframe
    '''
    responseTimeType = pd.to_datetime(df[columnA], format='%Y-%m-%d %H:%M:%S')
    requestTimeType = pd.to_datetime(df[columnB], format='%Y-%m-%d %H:%M:%S')
    responsePeriod = (responseTimeType - requestTimeType).to_frame()
    responsePeriod['secondsDifference'] = pd.to_timedelta(responsePeriod[0]).dt.total_seconds()
    responsePeriod.drop([0], axis='columns', inplace=True)

    return responsePeriod

async def delayAlerts(regularizationFrame, maxCountDelay, chat_id_red, chat_id_green, chat_id_yellow):
    timeFrame = calculateDeltaTime(regularizationFrame[(regularizationFrame['responseTime']!='xD')&(regularizationFrame['requestTime']!='xD')], 'responseTime', 'requestTime')
    
    timeMax = timeFrame['secondsDifference'].max()
    
    if timeMax > timeMaximum:
        
        regularizationFrame['secondsDifference'] = timeFrame['secondsDifference']
        slowtime = regularizationFrame[regularizationFrame['secondsDifference'] > timeMaximum]
        slowtime_counts = slowtime['secondsDifference'].value_counts()
        count = int(slowtime_counts.sum())
    
        if count == 2:
            message = f'Alerta media de lentitud del microservicio {nombreMicroservicio}'
            await send_telegram_message(message, chat_id_yellow)
        elif count > 2:
            message = f'Alerta alta de lentitud del microservicio {nombreMicroservicio}'
            await send_telegram_message(message, chat_id_red)
    else:
        message = f'Sin reporte de lentitud del microservicio {nombreMicroservicio}'
        await send_telegram_message(message, chat_id_green)    

async def ipAlerts(regularizationFrame, ipMaximum, chat_id_red, chat_id_green, chat_id_yellow):
    ips_vins_frame = (
        regularizationFrame.groupby('ip')['VIN']
        .nunique()  
        .reset_index()  
        .rename(columns={'VIN': 'count'})  
    )
    ipsMax = int(ips_vins_frame['count'].max())
    suspiciousIps = list(ips_vins_frame[ips_vins_frame['count'] == ipsMax]['ip'])
    if ipsMax > ipMaximum:
        message = f'Alerta media de ips que checan más de 20 VINs distintos {suspiciousIps} con cantidad: {ipsMax} del microservicio {nombreMicroservicio}'
        await send_telegram_message(message, chat_id_yellow)
    else:
        message = f'Sin alertas para IPs con máximos: {suspiciousIps} con cantidad: {ipsMax} del microservicio {nombreMicroservicio}'
        await send_telegram_message(message, chat_id_green)  

async def connectionHealth(connection, chat_id_red, chat_id_green):
    print(connection)
    if type(connection)==AttributeError:
        message = f'No ha sido posible conectarse a la base de datos: {connection}'
        await send_telegram_message(message, chat_id_red)
    else:
        message = 'Conectado a la base de datos'
        await send_telegram_message(message, chat_id_green)    
        

async def send_telegram_message(message, chat_id_semaphore):
    """
    Envía un mensaje de alerta a Telegram (asincrónico).
    
    Args:
        message (str): El mensaje a enviar.
    """
    try:
        await bot.send_message(chat_id=chat_id_semaphore, text=message)
        print(f"Mensaje enviado a Telegram: {message}")
    except Exception as e:
        print(f"Error al enviar mensaje a Telegram: {e}")

In [13]:
view = os.getenv('VIEW_NAME')
n_minutes = int(os.getenv('DIFFERENCE_MINUTES'))
ipMaximum = int(os.getenv('IPS_MAXIMUM'))
nombreMicroservicio = 'Regularización'
errorCodes = 500
timeMaximum = 6
days_of_week = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']

In [17]:
connectionHealth(connectHealth(), chat_id_red, chat_id_green)

<coroutine object connectionHealth at 0x0000014BBF29EA40>

In [None]:
def regularizationAnalysis():
    connectionToMySQL = ConnectionMySQL()