### ¿Que hace este script?
* Calcula para cada site id y con frecuencia diaria: trafico.anchoBandaDescarga, trafico.anchoBandaCarga, usuarios.anchoBandaDescarga, usuarios.anchoBandaCarga
* En este se escribe para la jerarquía de usuarios también

In [73]:
from elasticsearch import Elasticsearch, helpers
from ssl import create_default_context
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import parametros
import re

## Conectando a ElasticSearch

La ultima línea se utiliza para garantizar la ejecución de la consulta
* timeout es el tiempo para cada ejecución
* max_retries el número de intentos si la conexión falla
* retry_on_timeout para activar los reitentos

In [74]:
context = create_default_context(cafile=parametros.cafile)
es = Elasticsearch(
    parametros.servidor,
    http_auth=(parametros.usuario_EC, parametros.password_EC),
    scheme="https",
    port=parametros.puerto,
    ssl_context=context,
    timeout=60, max_retries=3, retry_on_timeout=True
) 

### Calculando fechas para la ejecución

* Se calculan las fechas para asociar al nombre del indice
* fecha_hoy es usada para concatenar al nombre del indice principal previa inserción

In [75]:
now = datetime.now()
fecha_hoy = str(now.strftime("%Y.%m.%d"))

### Definiendo indice principal con fecha de hoy

In [76]:
indice = parametros.usuarios_mintic_concat_index
indice_control = parametros.usuarios_mintic_control

### Función para generar JSON compatible con ES

In [77]:
def filterKeys(document):
    return {key: document[key] for key in use_these_keys }

### leyendo indice semilla-inventario

En el script que ingesta semilla, trae la información de los centros de conexión administrados. Para el indice principal se requiere:

* site_id como llave del centro de conexión.
* Datos geográficos (Departamento, municipio, centro poblado, sede, energía, latitud, longitud, COD_ISO, entre otros).

In [78]:
total_docs = 10000
try:
    response = es.search(
        index= parametros.semilla_inventario_index,
        body={
               "_source": ['site_id','nombre_municipio', 'nombre_departamento', 'nombre_centro_pob', 'nombreSede' 
                           , 'energiadesc', 'latitud', 'longitud','COD_ISO','id_Beneficiario']
        },
        size=total_docs
    )
    #print(es.info())
    elastic_docs = response["hits"]["hits"]
    fields = {}
    for num, doc in enumerate(elastic_docs):
        source_data = doc["_source"]
        for key, val in source_data.items():
            try:
                fields[key] = np.append(fields[key], val)
            except KeyError:
                fields[key] = np.array([val])

    datos_semilla = pd.DataFrame(dict([ (k,pd.Series(v)) for k,v in fields.items() ])) #pd.DataFrame(fields)
except:
    print("fecha:",now,"- Error en lectura de datos semilla")
    #exit()
    

* se valida latitud longitud
* Se crea campo location
* se renombran campos de semilla

In [79]:
def get_location(x):
    patron = re.compile('^(\-?\d+(\.\d+)?),\s*(\-?\d+(\.\d+)?)$') #patrón que debe cumplir
    if (not patron.match(x) is None):
        return x.replace(',','.')
    else:
        #Código a ejecutar si las coordenadas no son válidas
        return 'a'
datos_semilla['latitud'] = datos_semilla['latitud'].apply(get_location)
datos_semilla['longitud'] = datos_semilla['longitud'].apply(get_location)
datos_semilla['trafico.location'] = datos_semilla['latitud'] + ',' + datos_semilla['longitud']
datos_semilla['trafico.location']=datos_semilla['trafico.location'].str.replace('a,a','')
datos_semilla.drop(columns=['latitud','longitud'],inplace=True)

datos_semilla = datos_semilla.rename(columns={'nombre_municipio': 'trafico.nombreMunicipio'
                                              , 'nombre_departamento' : 'trafico.nombreDepartamento'
                                              , 'nombre_centro_pob': 'trafico.localidad'
                                              , 'nombreSede' : 'trafico.nomCentroDigital'
                                              , 'energiadesc' : 'trafico.sistemaEnergia'
                                              , 'COD_ISO' : 'trafico.codISO'
                                              , 'id_Beneficiario' : 'trafico.idBeneficiario'})
datos_semilla.fillna('', inplace=True)

### Trae la ultima fecha para control de ejecución

Cuando en el rango de tiempo de la ejecución, no se insertan nuevos valores, las fecha maxima en indice mintic no aumenta, por tanto se usa esta fecha de control para garantizar que incremente el bucle de ejecución

In [80]:
total_docs = 1
try:
    response = es.search(
        index= indice_control,
        body={
               "_source": ["usuarios.fechaControl"],
              "query": {
                "bool": {
                  "filter": [
                  {
                    "exists": {
                      "field":"jerarquia_usuarios_velocidad"
                    }
                  }
                  ]
                }
              }
        },
        size=total_docs
    )
    #print(es.info())
    elastic_docs = response["hits"]["hits"]
    fields = {}
    for num, doc in enumerate(elastic_docs):
        fecha_ejecucion = doc["_source"]['usuarios.fechaControl']
except:
    fecha_ejecucion = '2021-05-01 00:00:00'
if response["hits"]["hits"] == []:
    fecha_ejecucion = '2021-05-01 00:00:00'
print("ultima fecha para control de ejecucion:",fecha_ejecucion)

ultima fecha para control de ejecucion: 2021-05-13 00:00:00


### Asociando datos de Speed test

Se tiene una lectura diara de velocidad para cada centro. Por tanto se debe cruzar con el fjulo principal, haciendo uso solo del año, mes día, sin incluir la hora.

In [81]:
def traeVelocidad(fecha_max_mintic,fecha_tope_mintic):
    total_docs = 10000
    response = es.search(
        index= parametros.speed_index+'*',
        body={
                "_source": ["beneficiary_code","locationid", "result_start_date"
                            , "result_download_mbps", "result_upload_mbps"],
                "query": {
                    "range": {
                        "result_start_date": {
                            "gte": fecha_max_mintic.split(' ')[0]+'T00:00:00',
                            "lt": fecha_tope_mintic.split(' ')[0]+'T23:59:59'
                        }
                    }
                }

        },
        size=total_docs
    )
    #print(es.info())
    elastic_docs = response["hits"]["hits"]
    fields = {}
    for num, doc in enumerate(elastic_docs):
        source_data = doc["_source"]
        for key, val in source_data.items():
            try:
                fields[key] = np.append(fields[key], val)
            except KeyError:
                fields[key] = np.array([val])

    return pd.DataFrame(dict([ (k,pd.Series(v)) for k,v in fields.items() ]))

* Se genera fecha en yyyy-mm-dd, y cada campo por separado
* La hora y minuto se toma aparte

Valores que se convierten a cero si son nulos
* trafico.anchoBandaDescarga
* trafico.anchoBandaCarga

In [82]:
fecha_max_mintic = fecha_ejecucion
fecha_tope_mintic = (datetime.strptime(fecha_max_mintic, '%Y-%m-%d %H:%M:%S')+timedelta(days=1)-timedelta(seconds=1)).strftime("%Y-%m-%d %H:%M:%S")
datos_speed = traeVelocidad(fecha_max_mintic,fecha_tope_mintic)

if datos_speed is None or datos_speed.empty:
    while (datos_speed is None or datos_speed.empty) and ((datetime.strptime(fecha_max_mintic[0:10], '%Y-%m-%d').strftime("%Y-%m-%d %H:%M:%S")) < str(now.strftime("%Y-%m-%d %H:%M:%S"))):
        fecha_max_mintic = (datetime.strptime(fecha_max_mintic, '%Y-%m-%d %H:%M:%S')+timedelta(days=1)).strftime("%Y-%m-%d %H:%M:%S")
        fecha_tope_mintic = (datetime.strptime(fecha_tope_mintic, '%Y-%m-%d %H:%M:%S')+timedelta(days=1)).strftime("%Y-%m-%d %H:%M:%S")
        datos_speed = traeVelocidad(fecha_max_mintic,fecha_tope_mintic)
else:
    pass

# 7. Speed test a indice

In [83]:
use_these_keys = ['trafico.nomCentroDigital', 
                  'trafico.localidad',
                  'trafico.siteID',
                  'trafico.nombreDepartamento', 
                  'trafico.codISO', 
                  'trafico.sistemaEnergia',
                  'trafico.nombreMunicipio', 
                  'trafico.idBeneficiario',
                  'trafico.location', 
                  'trafico.anchoBandaDescarga',
                  'trafico.anchoBandaCarga',
                  'trafico.totales.fecha',
                  'trafico.totales.anyo',
                  'trafico.totales.mes',
                  'trafico.totales.dia',
                  'nombreDepartamento',
                    'nombreMunicipio',
                    'idBeneficiario',
                    'fecha',
                    'anyo',
                    'mes',
                    'dia',
                  '@timestamp']

#def doc_generator(df):
#    df_iter = df.iterrows()
#    for index, document in df_iter:
#        yield {
#                "_index": indice, 
#                "_id": f"{ 'Velocidad-' + document['trafico.siteID'] + '-' + document['trafico.totales.fecha']}",
#                "_source": filterKeys(document),
#            }

In [84]:
try:
    datos_speed = datos_speed.drop(datos_speed[(datos_speed["result_download_mbps"]<0) | (datos_speed["result_upload_mbps"]<0)].index)
    datos_speed['trafico.totales.fecha'] = datos_speed['result_start_date'].str.split("T", n = 1, expand = True)[0]
    datos_speed['result_download_mbps'] = datos_speed['result_download_mbps'] * 1000
    datos_speed['result_upload_mbps'] = datos_speed['result_upload_mbps'] * 1000
    datos_speed = datos_speed[['beneficiary_code','trafico.totales.fecha','result_download_mbps','result_upload_mbps']].groupby(['beneficiary_code','trafico.totales.fecha']).agg(['max']).reset_index()
    datos_speed.columns = datos_speed.columns.droplevel(1)
    datos_speed.rename(columns={'result_download_mbps': 'trafico.anchoBandaDescarga'
                             ,'result_upload_mbps' :  'trafico.anchoBandaCarga'
                             , 'beneficiary_code' : 'site_id'
                             }, inplace=True)

    datos_speed["trafico.totales.anyo"] = datos_speed["trafico.totales.fecha"].str[0:4]
    datos_speed["trafico.totales.mes"] = datos_speed["trafico.totales.fecha"].str[5:7]
    datos_speed["trafico.totales.dia"] = datos_speed["trafico.totales.fecha"].str[8:10]
    datos_speed = pd.merge(datos_speed,  datos_semilla, on='site_id', how='inner')
    datos_speed = datos_speed.rename(columns={'site_id' : 'trafico.siteID'})
    datos_speed.dropna(subset=['trafico.anchoBandaDescarga','trafico.anchoBandaCarga'])
    datos_speed.fillna({'trafico.anchoBandaDescarga':0
                      , 'trafico.anchoBandaCarga':0
                       },inplace=True)
    datos_speed.fillna('', inplace=True)
    datos_speed['nombreDepartamento'] = datos_speed['trafico.nombreDepartamento']
    datos_speed['nombreMunicipio'] = datos_speed['trafico.nombreMunicipio']
    datos_speed['idBeneficiario'] = datos_speed['trafico.idBeneficiario']
    datos_speed['fecha'] = datos_speed['trafico.totales.fecha']
    datos_speed['anyo'] = datos_speed['trafico.totales.anyo']
    datos_speed['mes'] = datos_speed['trafico.totales.mes']
    datos_speed['dia'] = datos_speed['trafico.totales.dia']
    datos_speed['@timestamp'] = now.isoformat()

#    salida = helpers.bulk(es, doc_generator(datos_speed))
#    print("Fecha: ", now,"- Datos Velocidad carga y descarga (Trafico) en indice principal:",salida[0])
except:
    print("Fecha: ", now,"- No hay Datos Velocidad carga y descarga(Trafico) para insertar en indice principal")

Fecha:  2021-06-07 20:31:40.796321 - Datos Velocidad carga y descarga (Trafico) en indice principal: 1


In [85]:
try:
    datos_speed.rename(columns={
        'trafico.siteID':'usuarios.siteID',
        'trafico.nombreDepartamento':'usuarios.nombreDepartamento',
        'trafico.codISO':'usuarios.codISO',
        'trafico.sistemaEnergia':'usuarios.sistemaEnergia',
        'trafico.nombreMunicipio':'usuarios.nombreMunicipio',
        'trafico.localidad' : 'usuarios.localidad',
        'trafico.nomCentroDigital' : 'usuarios.nomCentroDigital',
        'trafico.idBeneficiario':'usuarios.idBeneficiario',
        'trafico.location':'usuarios.location',
        'trafico.anchoBandaDescarga':'usuarios.anchoBandaDescarga',
        'trafico.anchoBandaCarga':'usuarios.anchoBandaCarga',
        'trafico.totales.fecha':'usuarios.fecha',
        'trafico.totales.anyo':'usuarios.anyo',
        'trafico.totales.mes':'usuarios.mes',
        'trafico.totales.dia':'usuarios.dia' }, inplace=True)
    use_these_keys = ['usuarios.nomCentroDigital'
                  , 'usuarios.codISO'
                  , 'usuarios.idBeneficiario'
                  , 'usuarios.localidad'
                  , 'usuarios.siteID'
                  , 'usuarios.nombreDepartamento'
                  , 'usuarios.sistemaEnergia'
                  , 'usuarios.nombreMunicipio'
                  , 'usuarios.localidad'    
                  , 'usuarios.nomCentroDigital'    
                  , 'usuarios.location'
                  , 'usuarios.fecha'
                  , 'usuarios.anchoBandaDescarga'
                  , 'usuarios.anchoBandaCarga'
                  , 'usuarios.anyo'
                  , 'usuarios.mes'
                  , 'usuarios.dia'
                  , 'nombreDepartamento'
                  , 'nombreMunicipio'
                  , 'idBeneficiario'
                  , 'fecha'
                  , 'anyo'
                  , 'mes'
                  , 'dia'    
                  , '@timestamp']

    def doc_generator_u(df):
        df_iter = df.iterrows()
        for index, document in df_iter:
            yield {
                    "_index": indice, 
                    "_id": f"{'VelUsu-'+ str(document['usuarios.siteID']) + '-' + str(document['usuarios.fecha'])}",
                    "_source": filterKeys(document)
                }
    salida = helpers.bulk(es, doc_generator_u(datos_speed))
    print("Fecha: ", now,"- Datos Velocidad carga y descarga(Usuario) en indice principal:",salida[0])        
except:
    print("Fecha: ", now,"- No hay Datos Velocidad carga y descarga(Usuario) para insertar en indice principal")    

Fecha:  2021-06-07 20:31:40.796321 - Datos Velocidad carga y descarga(Usuario) en indice principal: 1


### Actualizando fecha control

* Se actualiza la fecha de control. Si el calculo supera la fecha hora actual, se asocia esta ultima.

In [86]:
fecha_ejecucion = (datetime.strptime(fecha_max_mintic, '%Y-%m-%d %H:%M:%S')+timedelta(days=1)).strftime("%Y-%m-%d %H:%M:%S")#[0:15] + '0:00'    

if fecha_ejecucion > str(now.strftime('%Y-%m-%d %H:%M:%S'))[0:10] + ' 00:00:00':
    fecha_ejecucion = str(now.strftime('%Y-%m-%d %H:%M:%S'))[0:10] + ' 00:00:00'
response = es.index(
        index = indice_control,
        id = 'jerarquia_usuarios_velocidad',
        body = { 'jerarquia_usuarios_velocidad': 'usuarios_velocidad','usuarios.fechaControl' : fecha_ejecucion}
)
print("actualizada fecha control de ejecucion:",fecha_ejecucion)

actualizada fecha control de ejecucion: 2021-05-14 00:00:00
