In [4]:
import json
import pandas as pd
from datetime import datetime, timedelta
import dateutil.relativedelta
import sys
from elasticsearch import Elasticsearch

'''
Función que genera un dataFrame a partir de loa registros almacenados en elastic para cada test entre las fecha actual y el mes anterior.
Este json debe tener el formato tal cual lo genera una query a elastic, es decir, con los camos obligatorios hits.hits.
Pasos:
    - Cargar fichero
    - Comprobar que existe el campo hits.hits
    - Recorrer el array hits.hits
    - Filtrar los registros que no sean de tipo HAR
    - Almacenar en un diccionario el tipo de registro y el resto de campos del resultado
    - Transformar la fecha al formato por defecto "%Y-%m-%d %H:%M:%S"
    - Convertir a dataFrame
'''


'\nFunción que genera un dataFrame a partir de loa registros almacenados en elastic para cada test entre las fecha actual y el mes anterior.\nEste json debe tener el formato tal cual lo genera una query a elastic, es decir, con los camos obligatorios hits.hits.\nPasos:\n    - Cargar fichero\n    - Comprobar que existe el campo hits.hits\n    - Recorrer el array hits.hits\n    - Filtrar los registros que no sean de tipo HAR\n    - Almacenar en un diccionario el tipo de registro y el resto de campos del resultado\n    - Transformar la fecha al formato por defecto "%Y-%m-%d %H:%M:%S"\n    - Convertir a dataFrame\n'

In [5]:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
from elasticsearch.helpers import bulk
from elasticsearch.helpers import BulkIndexError
import logging

def connect_to_elastic():
    elastic_url = 'https://10.210.4.118:9200'
    elastic_user = 'elastic'
    elastic_password = 'eilv2Dh04gbCBYAouc1o'

    es = Elasticsearch(
        elastic_url,
        #ca_certs= elastic_crt,
        basic_auth=(elastic_user, elastic_password),
        timeout=300,
        verify_certs=False
    )
    return es

def run_elastic_query(elastic_connection, query, index_name, page_size=10000, scroll="6m"):
    # Disable Elasticsearch scroll logs
    logging.getLogger("elasticsearch").setLevel(logging.WARNING)

    
    page_size = page_size
    scroll_id = None

    response = scan(
            elastic_connection,
            query=query,
            index=index_name,
            scroll=scroll,
            size=page_size
    )

    results = []
    for hit in response:
        results.append(hit["_source"])

    return results


def insert_df_to_elastic(elastic_connection, df, index_name):
    # Disable Elasticsearch scroll logs
    logging.getLogger("elasticsearch").setLevel(logging.WARNING)
    
    data_json = df.to_dict(orient='records')
    actions = [
        {
            "_index": index_name,
            "_source":  {key: value for key, value in record.items() if key != "id"},
            "_id": record["id"]
        }
        for record in data_json
    ]

    try:
        bulk(elastic_connection, actions, refresh=True)
        
    except BulkIndexError as e:
        for error in e.errors:
            print("Indexing failed for document:", error['index'])
            print("Reason:", error['error']['reason'])

In [6]:
# Instantiate Elasticsearch with the trusted SSL context


logging.getLogger("elasticsearch").setLevel(logging.WARNING)
import warnings

# Suppress the specific warning related to the InsecureRequestWarning
warnings.filterwarnings("ignore")
# Crear conexion a elastic
index_name = "raw-asm"
es = connect_to_elastic()

In [7]:
# Definición de variables
datetime_ini = datetime.now() - timedelta(days=2)
datetime_ini = str(datetime_ini) 
datetime_ini = datetime.strptime(datetime_ini, "%Y-%m-%d %H:%M:%S.%f")
datetime_ini_str = datetime_ini.strftime("%Y-%m-%d %H:%M:%S")
print("datetime_ini", datetime_ini_str)

datetime_fi = datetime.now() - timedelta(days=1)
datetime_fi = str(datetime_fi)
datetime_fi = datetime.strptime(datetime_fi,"%Y-%m-%d %H:%M:%S.%f")
datetime_fi_str = datetime_fi.strftime("%Y-%m-%d %H:%M:%S")
print("datetime_fi", datetime_fi_str)

datetime_ini 2023-11-30 12:33:15
datetime_fi 2023-12-01 12:33:15


In [8]:
aggs = {
    "unique_idTest": {
      "terms": {
        "field": "idTest.keyword",
        "size": 10000
          }
        }
     }
result = es.search(index=index_name, aggs=aggs)


In [8]:
for hit in result["aggregations"]["unique_idTest"]['buckets']:
        idTest = hit['key']
        query ={
            "bool": {
              "must": [
                {
                  "match": {
                    "idTest": str(idTest)
                  }
                },
                {
                  "range": {
                    "fechaProcesado": {
                      "gte": previous_month,
                      "lte": current_date 
                    }
                  }
                }
              ]
            }
        }
        print(query)

{'bool': {'must': [{'match': {'idTest': '263'}}, {'range': {'fechaProcesado': {'gte': '2023-10-03 17:16:00', 'lte': '2023-11-03 17:16:00'}}}]}}
{'bool': {'must': [{'match': {'idTest': '125'}}, {'range': {'fechaProcesado': {'gte': '2023-10-03 17:16:00', 'lte': '2023-11-03 17:16:00'}}}]}}
{'bool': {'must': [{'match': {'idTest': '400'}}, {'range': {'fechaProcesado': {'gte': '2023-10-03 17:16:00', 'lte': '2023-11-03 17:16:00'}}}]}}
{'bool': {'must': [{'match': {'idTest': '112'}}, {'range': {'fechaProcesado': {'gte': '2023-10-03 17:16:00', 'lte': '2023-11-03 17:16:00'}}}]}}
{'bool': {'must': [{'match': {'idTest': '714'}}, {'range': {'fechaProcesado': {'gte': '2023-10-03 17:16:00', 'lte': '2023-11-03 17:16:00'}}}]}}
{'bool': {'must': [{'match': {'idTest': '221'}}, {'range': {'fechaProcesado': {'gte': '2023-10-03 17:16:00', 'lte': '2023-11-03 17:16:00'}}}]}}
{'bool': {'must': [{'match': {'idTest': '121'}}, {'range': {'fechaProcesado': {'gte': '2023-10-03 17:16:00', 'lte': '2023-11-03 17:16:00

In [114]:
data_df['idError'] =  data_df['idError'].fillna(0)
data_df['pathError'] = data_df['pathError'].fillna("")
data_df['errorMsg'] = data_df['errorMsg'].fillna("")
data_df['urlFailed'] = data_df['urlFailed'].fillna("")
data_df['errorSel'] = data_df['errorSel'].fillna("")
data_df['elementError'] = data_df['elementError'].fillna("")

data_df

Unnamed: 0,tipo,nombrePaso,idRobot,nombreRobot,idGrupTest,fecha,idPaso,harFile,fechaProcesado,idTest,tiempoPaso,nombreTest,id,idError,pathError,errorMsg,urlFailed,errorSel,elementError
0,Result,RobotSeleniumPlantillaNuevaCaixa,9993,RobotSeleniumPlantillaNuevaCaixa,2,2023-10-25 13:00:06,1,20231025130113_99802.har,2023-10-25 13:00:06,99802,4.804,LANPROVA_TEST_HBPOR-20_Atencion_Cliente_Reclam...,20231025130006_1_9993_99802,0,,,,,
1,GroupedResult,RobotSeleniumPlantillaNuevaCaixa,9993,RobotSeleniumPlantillaNuevaCaixa,2,2023-10-25 13:00:06,9999,,2023-10-25 13:00:06,99802,4.804,LANPROVA_TEST_HBPOR-20_Atencion_Cliente_Reclam...,20231025130006_9999_9993_99802,0,,,,,
2,Result,RobotSeleniumPlantillaNuevaCaixa,9993,RobotSeleniumPlantillaNuevaCaixa,2,2023-10-25 13:06:26,1,20231025130733_99802.har,2023-10-25 13:06:26,99802,2.667,LANPROVA_TEST_HBPOR-20_Atencion_Cliente_Reclam...,20231025130626_1_9993_99802,0,,,,,
3,GroupedResult,RobotSeleniumPlantillaNuevaCaixa,9993,RobotSeleniumPlantillaNuevaCaixa,2,2023-10-25 13:06:26,9999,,2023-10-25 13:06:26,99802,2.667,LANPROVA_TEST_HBPOR-20_Atencion_Cliente_Reclam...,20231025130626_9999_9993_99802,0,,,,,
4,GroupedResult,RobotSeleniumPlantillaNuevaCaixa,9993,RobotSeleniumPlantillaNuevaCaixa,2,2023-10-25 13:57:03,9999,,2023-10-25 13:57:03,99802,2.429,LANPROVA_TEST_HBPOR-20_Atencion_Cliente_Reclam...,20231025135703_9999_9993_99802,0,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5060,Result,RobotSeleniumPlantillaNuevaCaixa,9993,RobotSeleniumPlantillaNuevaCaixa,228,2023-11-03 17:57:41,1,,2023-11-03 17:57:41,99802,3.060,LANPROVA_TEST_HBPOR-20_Atencion_Cliente_Reclam...,20231103175741_1_9993_99802,0,,,,,
5061,Result,RobotSeleniumPlantillaNuevaCaixa,9993,RobotSeleniumPlantillaNuevaCaixa,228,2023-11-03 17:58:01,5,,2023-11-03 17:58:01,99802,0.601,LANPROVA_TEST_HBPOR-20_Atencion_Cliente_Reclam...,20231103175801_5_9993_99802,0,,,,,
5062,Result,RobotSeleniumPlantillaNuevaCaixa,9993,RobotSeleniumPlantillaNuevaCaixa,235,2023-11-03 18:12:00,4,,2023-11-03 18:12:00,99802,3.778,LANPROVA_TEST_HBPOR-20_Atencion_Cliente_Reclam...,20231103181200_4_9993_99802,0,,,,,
5063,GroupedResult,RobotSeleniumPlantillaNuevaCaixa,9993,RobotSeleniumPlantillaNuevaCaixa,235,2023-11-03 18:12:03,9999,,2023-11-03 18:12:03,99802,12.918,LANPROVA_TEST_HBPOR-20_Atencion_Cliente_Reclam...,20231103181203_9999_9993_99802,1,,,,,


In [119]:
for hit in result["aggregations"]["unique_idTest"]['buckets'][0:1]:
        idTest = hit['key']
        query = {
            "query": {
                "bool": {
                  "must": [
                    {
                      "match": {
                        "idTest": str(idTest)
                      }
                    },
                    {
                      "range": {
                        "fechaProcesado": {
                          "gte": previous_month,
                          "lte": current_date 
                        }
                      }
                    }
                  ]
                }
            }
        }
        data = run_elastic_query(es, query, index_name)
        print(len(data))
        data_df = pd.DataFrame(data)

61958


In [121]:
len(data_df)

61958

In [122]:
for hit in result["aggregations"]["unique_idTest"]['buckets'][0:1]:
        idTest = hit['key']
        query = {
            "query": {
                "bool": {
                  "must": [
                    {
                      "match": {
                        "idTest": str(idTest)
                      }
                    },
                    {
                      "range": {
                        "fechaProcesado": {
                          "gte": previous_month,
                          "lte": current_date 
                        }
                      }
                    }
                  ]
                }
            }
        }
        print(query)
        data = run_elastic_query(es, query, index_name)
        
        if(len(data) > 0):
            data_df = pd.DataFrame(data)
            data_df['fecha'] = data_df['fecha'].apply(lambda x: datetime.strptime(x, '%Y%m%d%H%M%S'))
            data_df['tiempoPaso'] = data_df['tiempoPaso'].astype(float)
            
            data_df['idError'] =  data_df['idError'].fillna(0)
            data_df['pathError'] = data_df['pathError'].fillna("")
            data_df['errorMsg'] = data_df['errorMsg'].fillna("")
            data_df['urlFailed'] = data_df['urlFailed'].fillna("")
            data_df['errorSel'] = data_df['errorSel'].fillna("")
            data_df['elementError'] = data_df['elementError'].fillna("")
            data_df['idError'] = data_df['tiempoPaso'].astype(int)
            data_df = data_df.rename(columns={'type': 'tipo'})

            '''for record in data['hits']['hits']:
                if record['_source']['type'].lower() != "har":
                    new_record = {'tipo': record['_source']['type']}
                    new_record.update(record['_source'])
                    
                    #if (not check_required_files(new_record)):
                    #    print("Required files not found")
                    #    sys.exit()
                  
                    new_record['fecha'] = datetime.strptime(new_record['fecha'], '%Y%m%d%H%M%S')
        

                    data_df.append(new_record)
            
            data_df = pd.DataFrame(data_df)
            '''
        else:
            print("Data (hits.hits) not found")
            sys.exit()
        test_results =  data_df[data_df['tipo'] == "Result"]
        test_last_grouped_result = data_df[data_df['tipo'] == "GroupedResult"].sort_values('fecha', ascending=False)
        test_last_grouped_result.reset_index(drop=True,inplace=True)
        if (test_last_grouped_result.shape[0]==0):
            aggregation_dict = {'idTest':idTest, 'activo': False}
        else:
            last_execution_date = test_last_grouped_result['fecha'][0]
            last_execution_duration = round(test_last_grouped_result['tiempoPaso'][0],2)
            aggregation_dict = get_aggregated_data_for_test(test_results)
            aggregation_dict['fechaUltimaEjecucion'] = str(last_execution_date).replace(" ", "T")
            aggregation_dict['duracionUltimaEjecucion'] = str(last_execution_duration)
        #res = es.index(index='aggregations-asm', body=aggregation_dict, id=idTest)
        print("res",aggregation_dict)

{'query': {'bool': {'must': [{'match': {'idTest': '263'}}, {'range': {'fechaProcesado': {'gte': '2023-10-03 18:55:00', 'lte': '2023-11-03 18:55:00'}}}]}}}
res {'idTest': '263', 'calculoHora': '92.98%', 'calculoDia': '96.31%', 'calculoMes': '96.89%', 'totalCorrectoHora': 53, 'totalCorrectoDia': 3316, 'totalCorrectoMes': 56995, 'totalErrorHora': 4, 'totalErrorDia': 127, 'totalErrorMes': 1830, 'totalHora': 57, 'totalDia': 3443, 'totalMes': 58825, 'fechaCalculo': '2023-11-03T18:55:00', 'activo': True, 'fechaUltimaEjecucion': '2023-11-03T18:15:55', 'duracionUltimaEjecucion': '12.5'}


In [123]:
filtered_by_date = test_results[(test_results['fecha'] <= current_date) & (test_results['fecha'] >= previous_hour)]
filtered_by_date

Unnamed: 0,tipo,fecha,nombrePaso,tiempoPaso,idPaso,idTest,nombreTest,fechaProcesado,idRobot,idGrupTest,nombreRobot,id,harFile,errorMsg,pathError,urlFailed,errorSel,elementError,idError
61837,Result,2023-11-03 17:58:43,RobotSeleniumSilkBPI,0.102,1,263,BETA_HBLO-02_Fitxers_Firefox,2023-11-03 17:58:43,22,3411,RobotSeleniumSilkBPI,20231103175843_1_22_263,,,,,,,0
61838,Result,2023-11-03 17:58:53,RobotSeleniumSilkBPI,0.865,3,263,BETA_HBLO-02_Fitxers_Firefox,2023-11-03 17:58:53,22,3411,RobotSeleniumSilkBPI,20231103175853_3_22_263,,,,,,,0
61841,Result,2023-11-03 17:59:19,RobotSeleniumSilkBPI,0.489,12,263,BETA_HBLO-02_Fitxers_Firefox,2023-11-03 17:59:19,22,3411,RobotSeleniumSilkBPI,20231103175919_12_22_263,,,,,,,0
61843,Result,2023-11-03 17:59:31,RobotSeleniumSilkBPI,0.547,19,263,BETA_HBLO-02_Fitxers_Firefox,2023-11-03 17:59:31,22,3411,RobotSeleniumSilkBPI,20231103175931_19_22_263,,,,,,,0
61845,Result,2023-11-03 17:58:56,RobotSeleniumSilkBPI,0.484,5,263,BETA_HBLO-02_Fitxers_Firefox,2023-11-03 17:58:56,22,3411,RobotSeleniumSilkBPI,20231103175856_5_22_263,,,,,,,0
61847,Result,2023-11-03 17:58:59,RobotSeleniumSilkBPI,0.534,6,263,BETA_HBLO-02_Fitxers_Firefox,2023-11-03 17:58:59,22,3411,RobotSeleniumSilkBPI,20231103175859_6_22_263,,,,,,,0
61859,Result,2023-11-03 18:07:51,RobotSeleniumSilkBPI,0.547,16,263,BETA_HBLO-02_Fitxers_Firefox,2023-11-03 18:07:51,22,3417,RobotSeleniumSilkBPI,20231103180751_16_22_263,,,,,,,0
61860,Result,2023-11-03 18:07:54,RobotSeleniumSilkBPI,0.498,18,263,BETA_HBLO-02_Fitxers_Firefox,2023-11-03 18:07:54,22,3417,RobotSeleniumSilkBPI,20231103180754_18_22_263,,,,,,,0
61861,Result,2023-11-03 18:07:07,RobotSeleniumSilkBPI,0.974,2,263,BETA_HBLO-02_Fitxers_Firefox,2023-11-03 18:07:07,22,3417,RobotSeleniumSilkBPI,20231103180707_2_22_263,,,,,,,0
61862,Result,2023-11-03 18:07:36,RobotSeleniumSilkBPI,0.549,10,263,BETA_HBLO-02_Fitxers_Firefox,2023-11-03 18:07:36,22,3417,RobotSeleniumSilkBPI,20231103180736_10_22_263,,,,,,,0


In [76]:
def check_required_files (dictionary:dict):
    required_files = ["tipo", "fecha"]
    if all(key in dictionary for key in dictionary):
        return True
    else:
        return False
    
def get_unique_idTest_from_elastic(index_name):
    aggs = {
    "unique_idTest": {
      "terms": {
        "field": "idTest.keyword"
          }
        }
     }
    result = es.search(index=index_name, size = 0, aggs=aggs)
    return result

def compute_aggregation(test_results, previuos_date):
    filtered_by_date = test_results[(test_results['fecha'] <= current_date) & (test_results['fecha'] >= previuos_date)]
    if not 'idError' in filtered_by_date.columns:
        filtered_by_date['idError'] = 0
        
    total_success_filtered_by_date = len(filtered_by_date[filtered_by_date['idError'] == 0])
    #total_success_filtered_by_date = len(filtered_by_date[filtered_by_date['errorMsg'] == ""])

    total_number_test_by_date = len(filtered_by_date)

    if(total_success_filtered_by_date == 0 | total_number_test_by_date == 0):
        division_success_total = 0
        total_error_filtered_by_date = 0
    else:
        division_success_total = (total_success_filtered_by_date / total_number_test_by_date) * 100
        total_error_filtered_by_date = total_number_test_by_date - total_success_filtered_by_date
        if division_success_total.is_integer():
            division_success_total = int(division_success_total)
        else:
            division_success_total = round(division_success_total, 2)
    
    percent_success = f'{division_success_total}%'
    
    result = {"porcentaje_correcto":percent_success, 
              "correcto": total_success_filtered_by_date, 
              "erroneo": total_error_filtered_by_date, 
              "total" : total_number_test_by_date }
    
    return result

def get_aggregated_data_for_test(test_results:str):
    aggregated_by_hour = compute_aggregation(test_results, previous_hour)
    aggregated_by_day = compute_aggregation(test_results, previous_day)
    aggregated_by_month = compute_aggregation(test_results, previous_month)
        
    aggregation_dict = {'idTest': str(test_results['idTest'].iloc[0]),
                        'calculoHora': aggregated_by_hour['porcentaje_correcto'],
                        'calculoDia': aggregated_by_day['porcentaje_correcto'],
                        'calculoMes': aggregated_by_month['porcentaje_correcto'],
                        'totalCorrectoHora': aggregated_by_hour['correcto'],
                        'totalCorrectoDia': aggregated_by_day['correcto'],
                        'totalCorrectoMes': aggregated_by_month['correcto'],
                        'totalErrorHora': aggregated_by_hour['erroneo'],
                        'totalErrorDia': aggregated_by_day['erroneo'],
                        'totalErrorMes': aggregated_by_month['erroneo'],
                        'totalHora': aggregated_by_hour['total'],
                        'totalDia': aggregated_by_day['total'],
                        'totalMes': aggregated_by_month['total'],
                        'fechaCalculo': str(current_date).replace(" ", 'T')
                       }
    if aggregation_dict['totalMes'] == "0":
        aggregation_dict['activo'] = False
    else:
        aggregation_dict['activo'] = True
    return aggregation_dict

def compute_aggregations_to_elastic(unique_idTest_list, index_name):
    for hit in unique_idTest_list["aggregations"]["unique_idTest"]['buckets']:
        idTest = hit['key']
        query ={
            "bool": {
              "must": [
                {
                  "match": {
                    "idTest": str(idTest)
                  }
                },
                {
                  "range": {
                    "fechaProcesado": {
                      "gte": previous_month,
                      "lte": current_date 
                    }
                  }
                }
              ]
            }
        }
        data = es.search(index=index_name, query=query)
        
        if(data['hits']['hits']):
            data_df = pd.DataFrame([d['_source'] for d in data['hits']['hits']])
            data_df['fecha'] = data_df['fecha'].apply(lambda x: datetime.strptime(x, '%Y%m%d%H%M%S'))
            data_df['tiempoPaso'] = data_df['tiempoPaso'].astype(float)
            data_df = data_df.rename(columns={'type': 'tipo'})

        else:
            print("Data (hits.hits) not found")
            continue
        test_results =  data_df[data_df['tipo'] == "Result"]
        test_last_grouped_result = data_df[data_df['tipo'] == "GroupedResult"].sort_values('fecha', ascending=False)
        test_last_grouped_result.reset_index(drop=True,inplace=True)

        if (test_last_grouped_result.shape[0]==0):
            aggregation_dict = {'idTest':idTest, 'activo': False}
        else:
            
            last_execution_date = test_last_grouped_result['fecha'][0]
            last_execution_duration = round(test_last_grouped_result['tiempoPaso'][0],2)
            aggregation_dict = get_aggregated_data_for_test(test_results)
            aggregation_dict['fechaUltimaEjecucion'] = str(last_execution_date).replace(" ", "T")
            aggregation_dict['duracionUltimaEjecucion'] = str(last_execution_duration)
        print("aggregation_dict",aggregation_dict)
        res = es.index(index='aggregations-asm', body=aggregation_dict, id=idTest)
        print("res",res)


In [81]:

#date_format = "%Y%m%d%H%M00" # Sin precision de segundos
date_format="%Y-%m-%d %H:%M:00"
current_date = datetime.now().strftime(date_format)
current_date = datetime.strptime(current_date, date_format)

previous_hour = (current_date + dateutil.relativedelta.relativedelta(hours=-1)).strftime(date_format)
previous_day = (current_date + dateutil.relativedelta.relativedelta(days=-1)).strftime(date_format)
previous_month = (current_date + dateutil.relativedelta.relativedelta(months=-1)).strftime(date_format)
current_date = current_date.strftime(date_format)
index_name = "raw-asm"

#unique_idTest_list = get_unique_idTest_from_elastic(index_name)

#compute_aggregations_to_elastic(unique_idTest_list, index_name)


