# funcion para obtener informacion sin limite de datos

In [74]:
import requests
import json
from requests.auth import HTTPBasicAuth
import time
import pandas as pd
import re

def elasticScroll(elasticParameters, query, pages):
    # parametros de salida
    # parametros del indice
    elasticURL = elasticParameters["elasticURL"]
    elasticIndex = elasticParameters["elasticIndex"]
    elasticUser = elasticParameters["elasticUser"]
    elasticPassword = elasticParameters["elasticPassword"]
    
    if(len(elasticURL)==0) or (len(elasticIndex)==0) or (len(elasticUser)==0) or (len(elasticPassword)==0):
        raise Exception("Revisa los parametros")
    # se define la url que apunta al indice de elastic
    url_search = f"{elasticURL}/{elasticIndex}/_search?scroll=1m"
    # se ejecuta la query
    response = requests.get(url_search, json=query, auth=HTTPBasicAuth(elasticUser, elasticPassword))
    # retorna una lista con el resultado de la query
    search = json.loads(response.text)
    # guardamos el scroll id correspondiente a la query
    scroll_id = search["_scroll_id"]
    # url scroll
    url_scroll = f"{elasticURL}/_search/scroll"
    scroll_query = {
                "scroll": "1m",
                "scroll_id": f"{scroll_id}"
            }

    # condiciones iniciales
    scroll_search = {"hits":{"hits":1}}
    if pages:
        # hay paginacion
        # condiciones iniciales
        from_ = pages["from"]
        size_ = pages["size"]
        count = len(search["hits"]["hits"])

        while scroll_search["hits"]["hits"] and count < from_ + size_:
            scroll_response = requests.get(url_scroll, json=scroll_query, auth=HTTPBasicAuth(elasticUser, elasticPassword))
            scroll_search = json.loads(scroll_response.text)
            if not scroll_search["hits"]["hits"]:
                continue
            else:
                search["hits"]["hits"].extend(scroll_search["hits"]["hits"])
            count += len(scroll_search["hits"]["hits"])

        search["hits"]["hits"] = search["hits"]["hits"][from_:from_+size_+1]

    else:
        # Se devuelven todos los resultados
        while scroll_search["hits"]["hits"]:
            scroll_response = requests.get(url_scroll, json=scroll_query,
                                           auth=HTTPBasicAuth(elasticUser, elasticPassword))
            scroll_search = json.loads(scroll_response.text)
            if not scroll_search["hits"]["hits"]:
                continue
            else:
                search["hits"]["hits"].extend(scroll_search["hits"]["hits"])
    # Se elina el campo scroll del scroll_body
    del scroll_query["scroll"]
    # Se elimina el scroll de elasticsearch para liberar memoria
    delete = requests.delete(url_scroll, json=scroll_query, auth=HTTPBasicAuth(elasticUser, elasticPassword))
    return search

In [58]:
# definiendo parametros de entrada

In [59]:
# datos de conexion a elastic
elasticParameters = {"elasticURL": "https://es-dev.e-contact.cl"
                    , "elasticIndex": "lea_metadata-events-banco_de_chile" 
                    , "elasticUser": "jcalderon"
                    , "elasticPassword": "jcalderon123"
                    }

# query custom para obtener fechas
query = {
  "query": {
    "bool": {
      "must": [],
      "filter": [
        {
          "match_all": {}
        },
        {
            
          "range": {
            "interactionData.dateTimeUTC": {
              "format": "strict_date_optional_time",
              "gte": "2022-05-26T19:53:53.044Z",
              "lte": "2022-05-31T22:33:03.831Z"
            }
          }
        }
      ],
      "should": [],
      "must_not": []
    }
  }
}

# total de paginas
pages =  {
    "from": 0,
    "size": 10000
}

# consulta a indice

In [60]:
response = elasticScroll(elasticParameters, query, pages)

# transformar response (dict) a dataframe

In [61]:
from pandas import DataFrame, json_normalize
df = json_normalize(json.loads(json.dumps(response))["hits"]["hits"])
# df.to_excel("D:\\proyectos\\jupyter\\proyectos\\lea\\reportsV1\\index-cencosud.xlsx")

In [76]:
  # convertir fecha en formato "YYYY/MM/DD hh:mm:ss" a "YYYY/MM/DD"
df['interactionDataDateUTC'] = pd.to_datetime(df['_source.interactionData.dateTimeUTC']).dt.date

# convertir fecha en formato "YYYY/MM/DD hh:mm:ss" a "YYYY/MM/DD hh:00:00"
df['interactionDataDateHourUTC'] = pd.to_datetime(df['_source.interactionData.dateTimeUTC'].apply(str).str.slice(start = 0, stop = 13))

# Obtener nombre de columnas
lstColsNames2Norm = df.columns.values

    # Elimina _source. en el nombre de los campos
lstColsNames2Norm = [w.replace('_source.', '') for w in lstColsNames2Norm]

   # Crea lista con nombre de campos con metadata de la llamada
lstCallData = [x for x in lstColsNames2Norm if x.startswith('interactionData.')]
lstCallData = [w.replace('interactionData.', '') for w in lstCallData]

  # Crea lista con nombre de campos con metadata de cliente
lstClientData = [x for x in lstColsNames2Norm if x.startswith('clientData.')]
lstClientData = [w.replace('clientData.', '') for w in lstClientData]

     # Crea lista con nombre de campos con resultado de la llamada
lstResultData = [x for x in lstColsNames2Norm if x.startswith('resultData.')]
lstResultData = [w.replace('resultData.', '') for w in lstResultData]

# Crea lista con nombre de campos con resultado de la llamada
lstResultData = [x for x in lstColsNames2Norm if x.startswith('summary.')]
lstResultData = [re.sub("([.])\s*([a-zA-Z])", lambda p: p.group(0).upper(), w.replace('summary.', '')).replace('.', '') for w in lstResultData]
  
    
# Elimina clientData., callData., y summary.
lstColsNames2Norm = [w.replace('clientData.', '') for w in lstColsNames2Norm]
lstColsNames2Norm = [w.replace('callData.', '') for w in lstColsNames2Norm]
lstColsNames2Norm = [w.replace('interactionData.', '') for w in lstColsNames2Norm]
lstColsNames2Norm = [w.replace('outboundInteractionData.', '') for w in lstColsNames2Norm]
lstColsNames2Norm = [re.sub("([.])\s*([a-zA-Z])", lambda p: p.group(0).upper(), w.replace('summary.', '')).replace('.', '') for w in lstColsNames2Norm] 

df.columns = lstColsNames2Norm

In [77]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 44 columns):
 #   Column                                         Non-Null Count  Dtype         
---  ------                                         --------------  -----         
 0   _index                                         10000 non-null  object        
 1   _type                                          10000 non-null  object        
 2   _id                                            10000 non-null  object        
 3   _score                                         10000 non-null  float64       
 4   interactionId                                  10000 non-null  object        
 5   dateTimeUTC                                    10000 non-null  object        
 6   channelType                                    10000 non-null  object        
 7   serviceName                                    10000 non-null  object        
 8   direction                                      10000 non-

# Primer formato
Estructura de agendas_Derivacion

> **NOMBRE COLUMNA**  <br />
call center <br />
campaña <br />
rut cliente <br />
Fecha de gestion <br />
Agendas <br />
Sucursal <br />
Hora de agenda <br />
Usuario agente  <br />


In [6]:
df1 = df[['_source.Datos_Cencosud.CALLCENTER'
          , '_source.Datos_Servicios.Nombre_Campana'
          , '_source.Datos_Demograficos.Rut_Cliente'
          , '_source.Datos_Cencosud.RUT'
          , '_source.Datos_Resultado.Fecha_Contacto'
          , '_source.Datos_Resultado.Agendas'
          ,'_source.Datos_Resultado.Espacio_Financiero'
          , '_source.Datos_Resultado.Hora_Agendada',
          '_source.Datos_Resultado.Usuario_Agente'
         ]]
df1.columns = ['Datos_Cencosud.CALLCENTER'
          , 'Datos_Servicios.Nombre_Campana'
          , 'Datos_Demograficos.Rut_Cliente'
          , 'Datos_Cencosud.RUT'
          , 'Datos_Resultado.Fecha_Contacto'
          , 'Datos_Resultado.Agendas'
          , 'Datos_Resultado.Espacio_Financiero'
          , 'Datos_Resultado.Hora_Agendada'
          , 'Datos_Resultado.Usuario_Agente']

# print(df1.head(n = 2))

# segundo formato
## Formato Archivo de Agendas Web
> **NOMBRE COLUMNA**  <br />
NRO  <br />
COD_CALLCENTER <br />
FEC_CONTACTO <br />
HORA_CONTACTO <br />
COD_CAMPAÑA <br />
CAMPAÑA <br />
CODIGO_EJECUTIVO <br />
NOMBRE_PUNTO <br />
COD_PUNTO <br />
RUT <br />
NOMBRE_CLIE <br />
DIRECCION <br />
FONOS_UNO <br />
FONOS_DOS <br />
~~ESTADO_CALL~~ <br />
~~OFERTA_VIG~~ <br />
MONTO_OFERTA <br />
HORA_VISITA <br />
FECHA_VISITA <br />
Glosa <br />

**Nota:** el texto tachado (~~tachado~~) esta pendiente

In [7]:
df2 = df[['_source.Datos_Resultado.Codigo_CallCenter'
          , '_source.Datos_Resultado.Fecha_Contacto'
          , '_source.Datos_Resultado.Hora_Contacto'
          , '_source.Datos_Servicios.Nombre_Campana'
          , '_source.Datos_Servicios.Nombre_Campana'
          , '_source.Datos_Resultado.Codigo_Ejecutivo'
          , '_source.Datos_Resultado.Espacio_Financiero'
          , '_source.Datos_Resultado.Codigo_Espacio_Financiero'
          , '_source.Datos_Demograficos.Rut_Cliente'
          , '_source.Datos_Cencosud.RUT'
          , '_source.Datos_Demograficos.Nombre_Cliente'
          , '_source.Datos_Demograficos.Nombre_Cliente2'
          , '_source.Datos_Demograficos.Apellido_Paterno'
          , '_source.Datos_Demograficos.Apellido_Materno'
          , '_source.Datos_Cencosud.DIRECCION_PAR'
          , '_source.Datos_Demograficos.Telefono_Cliente'
          , '_source.Datos_Llamada.Telefono_Cliente'
          , '_source.Datos_Cencosud.T_OFERTA_SAE'
          , '_source.Datos_Resultado.Fecha_Agendada'
          , '_source.Datos_Resultado.Hora_Agendada'
          , '_source.Datos_Resultado.Glosa'
         ]]

df2.columns = ['Datos_Resultado.Codigo_CallCenter'
               , 'Datos_Resultado.Fecha_Contacto'
               , 'Datos_Resultado.Hora_Contacto'
               , 'Datos_Servicios.Codigo_Campana'
               , 'Datos_Servicios.Nombre_Campana'
               , 'Datos_Resultado.Codigo_Ejecutivo'
               , 'Datos_Resultado.Espacio_Financiero'
               , 'Datos_Resultado.Codigo_Espacio_Financiero'
               , 'Datos_Demograficos.Rut_Cliente'
               , 'Datos_Cencosud.RUT'
               , 'Datos_Demograficos.Nombre_Cliente'
               , 'Datos_Demograficos.Nombre_Cliente2'
               , 'Datos_Demograficos.Apellido_Paterno'
               , 'Datos_Demograficos.Apellido_Materno'
               , 'Datos_Cencosud.DIRECCION_PAR'
               , 'Datos_Demograficos.Telefono_Cliente'
               , 'Datos_Llamada.Telefono_Cliente'
               , 'Datos_Cencosud.T_OFERTA_SAE'
               , 'Datos_Resultado.Fecha_Agendada'
               , 'Datos_Resultado.Hora_Agendada'
               , 'Datos_Resultado.Glosa'
         ]
# print(df2.head(n = 2))

# Pendientes
1. Incorporar funcion para pasar por parametro la fecha para un día - 1
2. Generar 3° formato ITF_EJEMPLO
4. Upload a SFTP de archivos.
5. Manejo de errores custom.

In [43]:
import paramiko
def WFOfunctionSftp(remote_server, ssh_user, ssh_password, local_filepath, remote_path, port_number=1022):
    sftp = None
    transport = None

    try:
        # Create transport instance and setup SFTP connection
        transport = paramiko.Transport((remote_server, port_number))
        transport.connect(None, ssh_user, ssh_password)
        sftp = paramiko.SFTPClient.from_transport(transport)

        # Upload file to remote destination
        sftp.put(local_filepath, remote_path)
    except Exception as e:
        print(f'Failed to transfer files: {e}')
        if sftp:
            sftp.close()
        if transport:
            transport.close()




# sftp_upload_file ()

In [44]:
WFOfunctionSftp("sftp.a365.com.pe", "usr_entel_reports", "eR231fBt2g3ir34fgt$QW", "D:/nifi-app.log", "/Datos/nifi-app.log")

In [18]:

#Compara archivos generados

import filecmp, shutil, os

#obtiene la información de estado 
tup_est1 = filecmp._sig(os.stat("D:/Archivo_Disco_D.txt"))
tup_est2 = filecmp._sig(os.stat("C:/Users/jcalderon/Documents/Archivo_Disco_C.txt"))

#Imprime la información de estado 
print("Archivo_Disco_D,txt:", tup_est1)
print("Archivo_Disco_C,txt:", tup_est2)

# A continuación, se realizan dos comparaciones superficiales
# y otras dos examinando el contenido; y se muestra el resultado

print(filecmp.cmp("D:/Archivo_Disco_D.txt", "C:/Users/jcalderon/Documents/Archivo_Disco_C.txt", shallow=True))  # False
print(filecmp.cmp("D:/Archivo_Disco_D.txt", "C:/Users/jcalderon/Documents/Archivo_Disco_C.txt", shallow=False))  # False



Archivo_Disco_D,txt: (32768, 6, 1647950939.5051517)
Archivo_Disco_C,txt: (32768, 12, 1647951830.7059765)
False
False


In [56]:
import paramiko,filecmp, shutil, os
def WFOfunctionComparativa(remote_server, ssh_user, ssh_password, local_filepath, remote_path, port_number=1022):
    sftp = None
    transport = None

    try:
        # Create transport instance and setup SFTP connection
        transport = paramiko.Transport((remote_server, port_number))
        transport.connect(None, ssh_user, ssh_password)
        sftp = paramiko.SFTPClient.from_transport(transport)
        
        tup_est1 = filecmp._sig(os.stat(remote_path))
        #tup_est1 = str(sftp) + tup_est0
        tup_est2 = filecmp._sig(os.stat(local_filepath))
        tup_est3 = sftp + tup_est1
        
        #Imprime la información de estado 
        #print("Archivo Remoto:", tup_est1)
        #print("Archivo Local:", tup_est3)

        # A continuación, se realizan dos comparaciones superficiales
        # y otras dos examinando el contenido; y se muestra el resultado

       # print(filecmp.cmp(remote_path, local_filepath, shallow=True))  # False
       # print(filecmp.cmp(remote_path, local_filepath, shallow=False))  # Fals

        # Upload file to remote destination
        var = sftp.open(tup_est1)
        print(var)
    except Exception as e:
        print(f'Fallo la comparativa: {e}')
        if sftp:
            sftp.close()
        if transport:
            transport.close()


In [66]:
WFOfunctionComparativa("sftp.a365.com.pe", "usr_entel_reports", "eR231fBt2g3ir34fgt$QW", r"D:/Archivo_Disco_D.txt", r"/Datos/Archivo_Disco_C.txt")

Fallo la comparativa: [WinError 123] El nombre de archivo, el nombre de directorio o la sintaxis de la etiqueta del volumen no son correctos: '<paramiko.sftp_client.SFTPClient object at 0x000001A6B1522C10>'


In [65]:
import paramiko
def WFOfunctionComparativa(remote_server, ssh_user, ssh_password, local_filepath, remote_path, port_number=1022):
    sftp = None
    transport = None

    try:
        # Create transport instance and setup SFTP connection
        transport = paramiko.Transport((remote_server, port_number))
        transport.connect(None, ssh_user, ssh_password)
        sftp = paramiko.SFTPClient.from_transport(transport)

        # Upload file to remote destination
        filecmp.cmp(local_filepath, remote_path, shallow=True)
        sftp.put(local_filepath, remote_path)
    except Exception as e:
        print(f'Failed to transfer files: {e}')
        if sftp:
            sftp.close()
        if transport:
            transport.close()

In [88]:
import paramiko

# Abrir el transporte

host = "sftp.a365.com.pe"
port = 1022
transport = paramiko.Transport((host, port))

# Autenticarse en el servidor
# SOLO COMO DEMO, NUNCA SE DEBEN PONER CLAVES EN EL CÓDIGO

username = "usr_entel_reports"
password = "eR231fBt2g3ir34fgt$QW"
transport.connect(username = username, password = password)

# Creación del canal sftp

sftp = paramiko.SFTPClient.from_transport(transport)

# Recorrer los ficheros existentes en el home del usuario
for name in sftp.listdir("/Datos/"):
    if name.endswith(".*.csv"):
        # Si termina en .c, lo leemos
        with sftp.file(name) as f:
            data = f.read()
            # Mostramos cuántos bytes hemos leido
            print(f"El fichero {name} tiene {len(data)} bytes")

# Cerrar el canal y después el transporte
sftp.close()
transport.close()