## Importar datos de Salesforce


In [None]:
%run DEV/Utils/LakehouseFunctions  

In [None]:
def get_access_token():
    """
    Genera un access token para conectarse a la plataforma de Veeva

    Devuelve:
    response (json): JSON con la respuesta a la llamada de la API. Si la llamada es satisfactoria, dentro del JSON se encuentra el access token en la clave "access_token" 
    """
    # Ref: https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/intro_understanding_oauth_endpoints.htm
    # Auth URL
    df_veeva = pd.read_sql("SELECT * FROM Origins WHERE Name = 'Veeva'", engine_conn)
    endpoint = df_veeva["Endpoint"][0]
    auth_url = f"{endpoint}/services/oauth2/token"

    consumer_key     = dbutils.secrets.get(keyvault_name, df_veeva["Username"][0]) 
    consumer_secret  = dbutils.secrets.get(keyvault_name, df_veeva["Secret"][0])

    # POST request for access token
    response = requests.post(auth_url, data = {
                        'grant_type': 'client_credentials',
                        'client_id':consumer_key,
                        'client_secret':consumer_secret
                        })

    return response

In [None]:
def build_select_all_query(response, table_source, filter_date = True, n_days=7, filter_where="", column_name = "LastModifiedDate", version='v48.0'):
    """
    Construye la query para Salesforce según su lógica. Dependiendo de los parámetros que se pasan a la función esta se construye de diferente manera.

    Parametros:
    response (json): JSON de la respuesta de la función "get_access_token". Contiene el token de acceso, entre otros.
    table_source (string): Contiene el nombre de la tabla origen de Salesforce de donde se van a obtener los datos.
    filter_date (bool): Valor por defecto -> True. Condición que hace que la query tenga en cuenta los últimos ndays.
    n_days (int): Valor por defecto -> 7. Número de días a traer en la query. Necesita que filter_date esté en True.
    filter_where (string): Valor por defecto -> "". Su función es especificar la clausula WHERE.
    column_name (string): Valor por defecto -> "LastModifiedDate". Permite escoger sobre que columna se compara la fecha.
    version (string): Valor por defecto -> "v48.0". Especifica la versión que aplicar en la query.

    Devuelve:
    query (string): String construida segun todo lo que se ha pasado por parametro.
    """
    
    #Read access token
    json_res = response.json()
    access_token = json_res['access_token']
    auth = {'Authorization':'Bearer ' + access_token}
    instance_url = json_res['instance_url']

    query = '/services/data/'+version+'/sobjects/'+table_source+'/describe/'
    url = instance_url + query
    req = requests.get(url, headers=auth).json()
    query = '/services/data/'+version+'/query/?q=SELECT+'
    i = 0
    for f in req["fields"]:
        query = query+f["name"]+',+'
        i+=1
    query = query[:-2]+"+FROM+"+table_source

    if filter_date and filter_where:  query = query+"+WHERE+(+"+column_name+"+=+LAST_N_DAYS:"+str(n_days)+"+)"+"+AND+(+"+filter_where+"+)"
    elif filter_date and not filter_where: query = query+"+WHERE+"+column_name+"+=+LAST_N_DAYS:"+str(n_days)
    elif not filter_date and filter_where: query = query+"+WHERE+"+filter_where

    return query

In [None]:
def get_data_from_veeva(response, query, table_destination, columns=[]):
    """
    Obtiene los datos de Veeva usando el access_token y la query construida en "build_select_all_query".

    Parametros:
    response (json): JSON de la respuesta de la función "get_access_token". Contiene el token de acceso, entre otros.
    query (string): String construida segun todo lo que se ha pasado por parametro en la función "build_select_all_query".
    table_destination (string): Nombre de la tabla donde se van a guardar los datos importados de Veeva.
    columns (lista de strings): Valor por defecto -> Vacía. Define las columnas que se quieren obtener del dataframe. Si no se pasa el parametro, se obtienen todas las columnas.

    Devuelve:
    Nada
    """
    
    access_token = response.json()['access_token']
    auth = {'Authorization':'Bearer ' + access_token}
    instance_url = response.json()['instance_url']
    
    begin_time=time.time()
    url = instance_url + query #Create and execute the query
    req = requests.get(url, headers=auth).json()
    df = pd.DataFrame(req["records"])
    if 'attributes' in df: df=df.drop("attributes", axis=1)
    if columns: df=df[columns]
    contador=len(df.index)
    if not df.empty:
        df = df.applymap(str).replace(to_replace='None', value= '')
        write_raw_data(spark.createDataFrame(df), table_destination)
        #df=df.replace(to_replace='None', value= None, regex=True)
        #df=df.replace(to_replace='null', value= '')
        #df = df.replace('', pd.NA, regex=True)
        #sparkDF=sparkDF.replace('None',lit(None).cast(StringType()))
        #display(sparkDF)
        #df.to_sql(table_destination, conn, if_exists="append", index=False)
        print("\n "+table_destination+": Insertando "+str(contador)+" registros... (TOTAL: "+str(contador)+" registros | TIEMPO: "+str(format(time.time()-begin_time, ".2f"))+"s)")

    while "nextRecordsUrl" in req:
        nru = instance_url + req["nextRecordsUrl"]
        req = requests.get(nru, headers=auth).json()
        df = pd.DataFrame(req["records"])
        if 'attributes' in df: df=df.drop("attributes", axis=1)
        if columns: df=df[columns]
        n_reg=len(df.index)
        if not df.empty:
            df = df.applymap(str).replace(to_replace='None', value= '')
            write_raw_data(spark.createDataFrame(df), table_destination)
            contador+=n_reg
            if contador % 1000 == 0:
                print("\n "+table_destination+": Insertando "+str(n_reg)+" registros... (TOTAL: "+str(contador)+" registros | TIEMPO: "+str(format(time.time()-begin_time, ".2f"))+"s)")
                
    print("\n Tabla "+table_destination+" insertada correctamente ("+str(contador)+" registros)")

In [None]:
# Query a BBDD Catálogo para obtener los datasets y origenes necesarios
df_datasets = pd.read_sql("SELECT Datasets.Id as DatasetId, Datasets.Name as DatasetName, Query, Keys, ChangeTrackingVersion, TableName FROM Origins LEFT JOIN Datasets on Origins.Id = Datasets.OriginId WHERE Origins.Name = 'Veeva' and IsActive = 1", engine_conn)

# Llamada a la función para obtener el access_token
response = get_access_token()

# Definición de variables para la función que crea la query (Carga incremental y datos de los últimos 20 días)
Incremental=True
n_days=20

# Se obtienen datos de cada uno de los datasets obtenidos en catálogo
for _, table in df_datasets.iterrows():
    try:
        # Se borran los datos de raw para tener una subida limpia (Incremental)
        delete_raw_data(table['TableName'])

        # Se construye la query con la información del dataset y los parametros definidos antes
        query =  build_select_all_query(response,table['DatasetName'], Incremental, n_days)
        
        # Se obtienen los datos usando la query construida
        get_data_from_veeva(response, query, table['TableName'])

        # Se guardan los datos importados en raw en la tabla final de forma incremental
        write_standardized_data(table['TableName'], Incremental, keys=table['Keys'])
        print("Tabla cargada correctamente: ", table['TableName'])
    except Exception as e:
        print("Error writing table: "+table['TableName'], str(e))
