<div class="alert alert-info" role="alert">
    <h2>Importaciones</h2>
    <hr>
    <p>
        <ul>
            <li><b>Database: </b> El manejador de la base de datos utilizada
            <li><b>Pandas: </b> Pandas para manejar dataframes
            <li><b>Logs: </b> Guardar las ocurrencias en Logs
            <li><b>Utils: </b> Utilidades, para manejar temas de fechas y cosas por el estilo
        </ul>
    </p>
</div>

In [1]:
###
# Imports
###

#DB
import cx_Oracle

#Pandas
import pandas as pd

#Logs
import logging

#Utils
import os
import csv
import time
import requests
import json
from datetime import datetime

<div class="alert alert-info" role="alert">
    <h2>Métodos y Configuraciones</h2>
    <hr>
    <p>
        <ul>
            <li>Clase: Configuaración de la conexión
            <li>Métodos: Todos los bloques de código para la implementación
        </ul>
    </p>
</div>

In [2]:
class config_bd:
    """Clase para la configuración de la conexión con nuestra base de datos
    @Param: None
    @Return: None"""
    username = 'user_name'
    password = 'password$'
    host = 'host' #10.136.51.71
    port = 123984128734781234
    dbname = 'base_name'
    sdi = 'sdi'
    encoding = 'UTF-8'

In [3]:
def bd_connect(logger):
    """Método que genera la conexión con la base de datos, hacemos uso de los atributos de nuestra clase creada, se encierra en un ciclo While
    hasta que pueda generarse una conexión a la BD exitosamente, retornando la conexión
    @Param: None
    @Return: cx_Oracle.Connection()"""
    dns = cx_Oracle.makedsn(config_bd.host, config_bd.port, config_bd.sdi)
    while True:
        try:
            con = cx_Oracle.Connection(config_bd.username,config_bd.password,dns)
            now = datetime.now()
            dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
            logger.info(f"Conexión exitosa, versión: {con.version}, fecha: {dt_string}")
            break
        except Exception as e:
            now = datetime.now()
            dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
            logger.error(f"No se puede conectar a la base de datos, fecha: {dt_string}", exc_info=True)
    return con

def bd_query(query,con):
    """Método para guardar la extracción de la información con Pandas DF
    @Param:
        - query: Sentencia SQL
        - con: cx_Oracle.Connection()
    @Return: -pandas.DataFrame"""
    df = pd.read_sql(sql=query,con=con)
    return df

def save_file(df,name):
    """Método hecho para guardar la informaicón del SQL consultado
    @Param:
        - df: pandas.DataFrame
        - name: str
    @Return:
        - name: str"""
    #now = datetime.now()
    #dt_string = now.strftime("%d_%m_%Y-%H_%M_%S")
    #name = "TriggreredFile_"+dt_string+".csv"
    df.to_csv(name,quoting=csv.QUOTE_ALL,index=False,encoding="utf-8-sig")
    return name

def bd_close(con):
    """Método hecho para cerrar la conexión a la base de datos
    @Param: 
        -con: cx_Oracle.Connection()
    @Return:
        -None"""
    con.close()
    return None

def get_df(logger,con,df_r):
    """Método hecho para crear el dataframe
    @Param:
        -con: cx_Oracle.Connection()
        -df_r: pandas.DataFrame
    @Return:
        - df_to_save: pandas.DataFrame"""
    query = """query"""
    
    df_query = bd_query(query,con)
    logger.info('Exito al bajar el dataframe de la base de datos')
    df_query = df_query.rename(
        columns={
            'columnA':'ColumnaA'
        }
    )
    df_to_save = df_r.append(df_query)
    df_to_save.drop_duplicates(subset='SusbcriberKey',keep='last',inplace=True)
    logger.info('Exito al bajar el eliminar duplicados')
    name = save_file(df_to_save,'202006_VIAJACERTIFICADOS.csv')
    logger.info('Exito al guardar el DF')
    return df_to_save

In [4]:
def get_Marketing_Cloud_Token(logger):
    """Método hecho para obtener el Token a traés de un API en Marketing Cloud de Salse Force
    @Param: None
    @Return: 
        - if-try: token
        - if-except: None"""
    url = 'https://mcdwhf4n-2nwwf522xrd26m85dr4.auth.marketingcloudapis.com/v2/token' #Url MKTCloud
    data = {
        "grant_type": "client_credentials",
        "client_id":"id",
        "client_secret":"secret"
    } # Json data para enviar por Post
    while True:
        try:
            rq = requests.post(url=url,data=data).json() #Obtenemos la consulta del POST en formato JSON
            token = rq['access_token'] #Accedemos al parámetro "access_token"
            logger.info(f'{token} exito al obtener el token') #Insertamos en el archivo Log el exito en la conexión
            break
        except Exception as e:
            logger.error("Exception occurred", exc_info=True) #Agregamos la excepción al archivo log
            time.sleep(600)
    return rq,token #Retornmanos el Token

def push_Marketing_Cloud_Data(logger,token,df):
    """Método hecho para enviar los datos de la consulta de SQL a el repositorio de MKTCloud
    @Param:
        -token: str
        -df: pandas.DataFrame
    @Return: None"""
    url = 'https://mcdwhf4n-2nwwf522xrd26m85dr4.rest.marketingcloudapis.com/data/v1/async/dataextensions/key:key/rows?Content-Type=application/json'
    #Url
    header = {'Authorization': 'Bearer ' + token} #Header - Bearer + Token
    data = json.loads(df.to_json(orient='records')) #Obtenemos los json de los datos
    data_j = {
        "items":data
    } #Agregamos el json de los datos a este otro json
    try:
        rq = requests.post(url=url,json=data_j,headers=header) #Enviamos la información
        logger.info(f'{rq.status_code} exito al enviar los datos') #Gardamos el exito Login
        return rq
    except Exception as e:
        logger.error("Exception occurred", exc_info=True) #Agregamos el error

<div class="alert alert-info" role="alert">
    <h2>Implementación</h2>
    <hr>
    <p>
        <ul>
            <li> Ejecución del problema
        </ul>
    </p>
</div>

In [None]:
if __name__ == "__main__":
    logging.basicConfig(filename="MarketingCloud.log",
                            filemode='a',
                            format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
                            datefmt='%H:%M:%S',
                            level=logging.DEBUG)
    logging.info('Arranca el archivo') #Primer agregado al login con un info Arranca el archivo""
    while True: #Ciclo infinito para agregar la informaicon de la consulta periodicamente
        if os.path.isfile('./202006_VIAJACERTIFICADOS.csv'): #Existe el archivo? 
            df = pd.read_csv('202006_VIAJACERTIFICADOS.csv',low_memory=False) #Leelo
            df = df.sort_values(by='FECHA').reset_index(drop=True)
            last_index = df.shape[0]
            con = bd_connect(logging) #Conexión a la BD
            if con != None:
                df = get_df(logging,con,df) #Obtenemos la consulta
                df = df.sort_values(by='FECHA').reset_index(drop=True)
                for splits in range(last_index,df.shape[0],5000):
                    logging.info("Conexión existosa")
                    logging.info(f"Último índice {last_index} VS nuevo último índice {df.shape[0]}")
                    aux = df.iloc[splits:splits+5000]
                    logging.info(f"Tamaño del envio: {aux.shape[0]}")
                    _,token = get_Marketing_Cloud_Token(logging) #Obtenemos el token
                    rq = push_Marketing_Cloud_Data(logging,token,aux) #Enviamos los datos
                    logging.info(rq.status_code)
                    time.sleep(60)
                bd_close(con) #Cerramos la conexión
                logging.info('Proceso completado con exito, arranca espera de 10 minutos')
                now = datetime.now()
                fecha = now.strftime("%d/%m/%Y %H:%M:%S")
                logging.info(f"Informaión enviada {fecha}")
                print(f"Informaión enviada {fecha}")
                time.sleep(600) #Espera de 10 minutos
            else: continue
        else: #Si no hay archivo
            df_skeleton = pd.DataFrame(
                {
                    'SusbcriberKey':[],
                    'MEMBER_NUMBER':[],
                    'CLIENT TYPE':[],
                    'Nombre':[],
                    'Apellido_Paterno':[],
                    'Email':[],
                    'PHONE':[],
                    'PROMOTION ID':[],
                    'Hotel':[],
                    'RATE_TYPE':[], 
                    'FECHA':[],
                    'STATUS_CODE':[]
                }
            )
            save_file(df_skeleton,name='202006_VIAJACERTIFICADOS.csv') #Crea un archivo esqueleto y guaradalo

Informaión enviada 03/08/2020 13:54:05
Informaión enviada 03/08/2020 14:22:33
Informaión enviada 03/08/2020 14:33:41
Informaión enviada 03/08/2020 14:44:48
Informaión enviada 03/08/2020 14:55:56
Informaión enviada 03/08/2020 15:07:04
Informaión enviada 03/08/2020 15:19:18
Informaión enviada 03/08/2020 15:30:26
Informaión enviada 03/08/2020 15:44:43
Informaión enviada 03/08/2020 16:39:28
Informaión enviada 03/08/2020 17:15:23
Informaión enviada 03/08/2020 17:26:31
Informaión enviada 03/08/2020 17:51:01
Informaión enviada 03/08/2020 18:02:09
Informaión enviada 03/08/2020 18:15:02
Informaión enviada 03/08/2020 18:26:52
Informaión enviada 03/08/2020 18:37:59
Informaión enviada 03/08/2020 18:56:30
Informaión enviada 03/08/2020 19:12:42
Informaión enviada 03/08/2020 20:28:34
Informaión enviada 03/08/2020 20:41:27
Informaión enviada 03/08/2020 21:11:17
Informaión enviada 03/08/2020 21:52:17
Informaión enviada 03/08/2020 22:12:07
Informaión enviada 03/08/2020 22:23:14
Informaión enviada 03/08/

In [None]:
df_1 = pd.read_csv('202006_VIAJACERTIFICADOS.csv')
for splits in range(0,df_1.shape[0],5000):
    aux_1 = df_1.iloc[splits:splits+5000]
    print(f"Tamaño del envio: {aux_1.shape[0]}")
    _,token = get_Marketing_Cloud_Token() #Obtenemos el token
    rq = push_Marketing_Cloud_Data(token,aux_1) #Enviamos los datos
    print(rq.status_code)
    time.sleep(60)