In [None]:
%run /Utils/Functions/core

In [None]:
import requests
import concurrent.futures
import time
import sys
import os
import re
from time import sleep, mktime
from uuid import uuid4
import json
from requests.auth import HTTPBasicAuth
from datetime import datetime, timedelta, timezone
from IPython.display import clear_output

In [None]:
## Cria as variáveis de LOG
format_log = '%Y-%m-%d %H:%M:%S'
dtInicio = datetime.today() - timedelta(hours=3)
dtInicio_format = dtInicio.strftime(format_log)
tipo_log = 'API'
camada = 'landing'
emissor = '<org>'
atividade = '<activity_desc>'
origem = 'RestService'
destino = 'AzureBlobFS'
execUrl = ' '

try:
    infos = json.loads(dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson()) # captura as informações do job que executa o notebook
    orgId = infos['tags']['orgId']
    runId = infos['tags']['multitaskParentRunId']
    jobId = infos['tags']['jobId']
    if orgId == '2960871991268730': # Monta a URL caso seja o ID do ambiente de DEV
        execUrl = f'https://adb-{orgId}.10.azuredatabricks.net/?o={orgId}#job/{jobId}/run/{runId}' # cria a url de execução do 
    else: # Monta a URL caso seja o ID do ambiente de PROD
        execUrl = f'https://adb-{orgId}.15.azuredatabricks.net/?o={orgId}#job/{jobId}/run/{runId}' # cria a url de execução do 
except:
    print('Campo de URL não pode ser identificado!')

In [None]:
%sql
use catalog prod;

In [None]:
dbutils.widgets.text("dt_ingestao", "")

dt_ingestao = getArgument("dt_ingestao").upper().strip()

location_landing = spark.sql("show external locations").select("url").where("name = 'landing-area'").collect()[0][0]
location_flat_file = spark.sql("show external locations").select("url").where("name = 'flatfile-area'").collect()[0][0]

print(location_landing)
print(location_flat_file)

In [None]:
# Formata o dt_ingestao
format_timestamp = '%Y-%m-%d %H:%M:%S.%f'
dt_ingestao = datetime.now() if dt_ingestao == "" else datetime.strptime(dt_ingestao, format_timestamp)

# Pega o horário atual e tira 3 horas para converter para BRT (UTC -3)
# Tira 5 minutos por conta de um limite da API
dt_fim = dt_ingestao - timedelta(hours=3) - timedelta(minutes=5)
dt_inicio = dt_fim - timedelta(days=1)

# Força as horas, minutos e segundos em 0
dt_inicio = datetime(dt_inicio.year, dt_inicio.month, dt_inicio.day, 0, 0, 0)

# Separa as datas no formato UNIX
dt_fim_unix = int(mktime(dt_fim.timetuple()))
dt_inicio_unix = int(mktime(dt_inicio.timetuple()))

# dt_inicio_unix = 1546300800

# Transforma as datas em STRING utilizando um formato de timestamp
dt_fim = dt_fim.strftime(format_timestamp)
dt_inicio = dt_inicio.strftime(format_timestamp)

print(f"dt_inicio: {dt_inicio} | unix: {dt_inicio_unix}")
print(f"dt_fim   : {dt_fim} | unix: {dt_fim_unix}")

In [None]:
domain = "linkedin"
subdomain = "api"
threads = 10
baseUrl = f"https://{subdomain}.{domain}.com"
token = dbutils.secrets.get('scope-vault-data', 'linkedin-api-token')

headers = {'Authorization': 'Bearer ' + token, 'LinkedIn-Version': '202306'}

dir_flatfile = f'{location_flat_file}/<org>/linkedin'
dir_flatfile_ro = f'/dbfs/FileSotre/flat-files/.../<org>/linkedin'

print(baseUrl)

In [None]:
# Pega apenas cargas que possuem valor preenchido no campo "ds_Url" para não trazer tabelas que são geradas a partir de outro evento na landing.
df_param = fn_ConsultaJdbc("""
    SELECT pca.*
    FROM ctl.ADF_Parametro_Carga_API pca
    WHERE pca.fl_ativo = 1
    and nm_Sistema = 'linkedin'
    and ds_Url is not null
""")

data_param = df_param.collect()

display(df_param)

###FUNÇÕES UTILIZADAS PARA AS TABELAS AD_ANALYTICIS_BY_CAMPAIGN E AD_ANALYTICIS_BY_CREATIVE 

In [None]:
def get_key(d):
    pivot_values = d['pivotValues']
    start_date = d['dateRange']['start']
    end_date = d['dateRange']['end']
    key = (pivot_values, start_date['day'], start_date['month'], start_date['year'], end_date['day'], end_date['month'], end_date['year'])
    return key

def mergeDict(data, result_dict):
    for dados in data["elements"]:
        key = get_key(dados)
        result_dict[key] = {**result_dict.get(key, {}), **dados}

In [None]:
def get_tables(param, baseUrl, dt_inicio_unix, dtIngestao, location_landing, headers):
    end_point_page = None
    try:
        format_timestamp = '%Y-%m-%d %H:%M:%S.%f'
        dtIngestao = str(dtIngestao)
        if param.vl_Ultimo_Incremento is not None:
            vlUltimoIncremento = datetime.strptime(param.vl_Ultimo_Incremento, format_timestamp)
            vlUltimoIncremento = int(mktime(vlUltimoIncremento.timetuple()))
        else:
            vlUltimoIncremento = dt_inicio_unix

        vlUltimoIncremento = str(vlUltimoIncremento)

        # Landing
        if param.vl_Schedule_Carga != 0:
            dir_landing = f"{location_landing}/{param.ds_Diretorio_Landing}/{param.ds_Nome_Arquivo_Landing}/{dtIngestao[0:4]}/{dtIngestao[5:7]}/{dtIngestao[8:10]}/{dtIngestao[11:13]}".lower()
        else:
            dir_landing = f"{location_landing}/{param.ds_Diretorio_Landing}/{param.ds_Nome_Arquivo_Landing}/{dtIngestao[0:4]}/{dtIngestao[5:7]}/{dtIngestao[8:10]}".lower()

        # URL para o request
        if param.nm_Item_Origem not in ('ad_analytics_by_campaign', 'ad_analytics_by_creative'):
            organization_id = '10481512' 
            start_time = '1675468800000' #DEPOIS CONFERIR
            url = baseUrl + param.ds_Url.replace('@organization_id', organization_id).replace('@start_time', start_time)

            response = requests.get(url=url, headers=headers)

        # Cria uma lista vazia para armazenar as respostas de cada request
        result_data = []

        # limite de registros por resposta da api (valor fixo)
        # esse valor deve ser definido de acordo com a documentação da API
        api_limit = 1000

        # número máximo de erros permitodos antes de parar a execução (tentativas)
        max_errors = 10

        # tempo para esperar para tentar novamente em caso de erro (em segundos)
        error_sleep = 30

        # Faz o primeiro Request na API
        # response = requests.get(url=url, headers=headers)

        # Verifica o status do request
        if param.nm_Item_Origem not in ('ad_analytics_by_campaign', 'ad_analytics_by_creative', 'campaigns_associated_with_conversion'):
            if response.status_code == 200:
                print(f"Primeiro request feito com sucesso.")
                data = response.json()
                if param.nm_Definition is not None:
                    campo_data = param.nm_Definition.split(',')
                    if len(campo_data) == 2:

                        for dados in data[campo_data[0]]:
                            result_data.extend(dados[campo_data[1]])
                    else:
                        result_data.extend(data[campo_data[0]])
                
                elif param.nm_Item_Origem == 'organization':
                    result_data.append(data)
        
        elif param.nm_Item_Origem in ('campaigns_associated_with_conversion'):
            if response.status_code == 200:
                print(f"Primeiro request feito com sucesso.")
                data = response.json()

                for dados in data['elements']:
                    for chave, valor in dados.items():
                        if isinstance(valor, list) and chave in 'associatedCampaigns':
                            for items in valor:
                                result_data.append(items)

        # Daqui em diante, esse elif foi escrito porque existe uma limitação de colunas que podem ser passadas no endpoint dessas duas tabelas. Nesse processo são feitas as duas requisições com os campos divididos em duas variáveis. Em seguida os dados são salvos em variáveis que serão iteradas e mergeadas utilizando o campo pivotValues (Campo onde está o ID) salvando as tabelas em uma lista

        elif param.nm_Item_Origem in ('ad_analytics_by_campaign', 'ad_analytics_by_creative'):
            print("ENTRANDO AQUI LN 75")
            result_dict = {}

            headers = {'Authorization': 'Bearer ' + token, 'LinkedIn-Version': '202306', "X-Restli-Protocol-Version": "2.0.0"}

            fields = ["actionClicks,adUnitClicks,cardClicks,cardImpressions,clicks,comments,companyPageClicks,conversionValueInLocalCurrency,costInLocalCurrency,costInUsd,dateRange,externalWebsitePostClickConversions,externalWebsitePostViewConversions,follows,fullScreenPlays,impressions,landingPageClicks,leadGenerationMailContactInfoShares,pivotValues","likes,oneClickLeads,pivotValues,dateRange,reactions,sends,shares,textUrlClicks,totalEngagements,videoCompletions,videoFirstQuartileCompletions,videoMidpointCompletions,videoStarts,videoThirdQuartileCompletions,videoViews,viralClicks,viralCommentLikes,viralComments,viralCompanyPageClicks","viralExternalWebsiteConversions,viralExternalWebsitePostClickConversions,viralExternalWebsitePostViewConversions,viralFollows,viralFullScreenPlays,viralImpressions,viralLandingPageClicks,viralLikes,viralOneClickLeadFormOpens,viralOneClickLeads,viralOtherEngagements,viralReactions,viralShares,viralTotalEngagements,viralVideoCompletions,viralVideoFirstQuartileCompletions,viralVideoMidpointCompletions,viralVideoStarts,pivotValues,dateRange","viralVideoThirdQuartileCompletions,viralVideoViews,pivotValues,dateRange,companyPageClicks"]

            for campos in fields:

                url = f"{baseUrl}{param.ds_Url}&fields={campos}"
                response = requests.get(url, headers=headers)
                data = response.json()

                for dados in data['elements']:
                    for chave, valor in dados.items():
                        if isinstance(valor, list):
                            string_tratada = valor[0].split(':')
                            dados[chave] = string_tratada[3]

                mergeDict(data, result_dict)
                for chaves, valores in result_dict.items():
                    result_data.append(valores)
            
        if len(result_data) == 0: # caso a quantidade de registros seja 0 (vazio), encerra a execução para evitar erros no LOG
            return

        #Nesse ponto, é criado uma lógica para fatiar uma específica string, para alguns campos e algumas tabelas (ambos definidos na linha 64)
        if param.nm_Item_Origem not in ('ad_analytics_by_campaign', 'ad_analytics_by_creative'):
            

            nm_tabelas = ("campaign","campaign_group","campaigns_associated_with_conversion","conversion","creative","follower_stats","page_stats","share_stats","video_ad")

            nm_campos = ("account","campaign","conversion","organizationalEntity", "campaignGroup", "organization","contentReference", "owner")

            for dados in result_data:
                strings = ("urn:li", "urn:lla")
                for chave, valor in dados.items():
                    if isinstance(valor, str) and valor.startswith(strings):
                        if param.nm_Item_Origem in nm_tabelas and chave in nm_campos: #CAMPOS e TABELAS definidas onde a string será fatiada
                            padrao = re.compile(r'\b(\w+)\b')
                            palavras = re.findall(padrao, valor)

                            item_fatiado = palavras[3]
                            dados[chave] = item_fatiado

        
        fn_SaveJson(result_data, dir_landing, str(param.ds_Nome_Arquivo_Landing).lower())
        
        fn_AtualizaUltimoIncremento_API(param.id_Parametro_Carga_API, dtIngestao)
    
        ## LOG SUCESSO
        dtFim = datetime.today() - timedelta(hours=3)
        dtFim_format = dtFim.strftime(format_log)
        duracao = int((dtFim-dtInicio).total_seconds()) #captura a duração subtraindo o dtFim pelo dtInicio
        dsParametro = str(param.asDict()) #captura todos os parâmetros
        notebook = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
        
        try:
            pipeline = param.nm_Pipeline
        except:
            pipeline = os.path.basename(notebook)

        if end_point_page is not None:
            query = end_point_page
        else:
            query = ' '
            
        parametros = {"tipo_log": tipo_log,"id_parametro": param.id_Parametro_Carga_API, "camada": camada, "dtInicio": dtInicio_format, "dtFim": dtFim_format, "pipeline": pipeline, "atividade": atividade, "notebook": notebook, "origem": origem, "destino": destino, "sistema": param.nm_Sistema, "emissor": emissor, "duracao": duracao, "query": query, "dsParametro": dsParametro, "execUrl": execUrl}

        fn_LogSucceeded(parametros, dt_ingestao.strftime(format_log))

        return f"Carga da tabela {param.ds_Nome_Arquivo_Landing} finalizada com sucesso."

    except Exception as error:
        ## LOG ERRO
        dtFim = datetime.today() - timedelta(hours=3)
        dtFim_format = dtFim.strftime(format_log)
        duracao = int((dtFim-dtInicio).total_seconds()) #captura a duração subtraindo o dtFim pelo dtInicio
        dsParametro = str(param.asDict()) #captura todos os parâmetros
        notebook = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
        try:
            pipeline = param.nm_Pipeline
        except:
            pipeline = os.path.basename(notebook)

        if end_point_page is not None:
            query = end_point_page
        else:
            query = ' '

        if hasattr(error, 'code'): # captura o código do erro, caso possua o atributo 'code'
            error_code = error.code
        else:
            error_code = 'NULL'
            
        parametros = {"tipo_log": tipo_log,"id_parametro": param.id_Parametro_Carga_API, "camada": camada, "dtInicio": dtInicio_format, "dtFim": dtFim_format, "pipeline": pipeline, "atividade": atividade, "notebook": notebook, "origem": origem, "destino": destino, "sistema": param.nm_Sistema, "emissor": emissor, "duracao": duracao, "query": query, "dsParametro": dsParametro, "cd_erro": error_code, "erro": str(error), "execUrl": execUrl}

        fn_LogFailed(parametros, dt_ingestao.strftime(format_log))

In [None]:
# Cria uma lista para armazenar todas as tarefas
tasks = []

# Cria uma instância do ThreadPoolExecutor com threads definidas (max_workers)
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
    # Percorre todas as pastas do diretório de origem
    for row in data_param:
        # Executa a função fn_StreamFromFolder_csv em uma thread do ThreadPoolExecutor
        task = executor.submit(get_tables, *(row, baseUrl, dt_inicio_unix, dt_fim, location_landing, headers))
                               
        # Adiciona a tarefa à lista de tarefas
        tasks.append(task)

# Aguarda a conclusão de todas as tarefas
_ = concurrent.futures.wait(tasks, return_when='ALL_COMPLETED')

In [None]:
for task in tasks:
    try:
        print(task.result(),'\n')
    except Exception as error:
        print(error)
        pass