In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
# Define uma variável chamada 'path' que contém o caminho para uma pasta específica vinda da rawzone
# A pasta é localizada dentro de 'PASTA_INTERNA_DA_EXTRACAO' e se chama 'Central de Projetos'
path = 'PASTA_INTERNA_DA_EXTRACAO/Central de Projetos'


In [0]:
# %run é uma "magic command" que executa um script no ambiente atual
# Chamada do caderno "handler" e suas funções
%run ../../handler

In [0]:
# %run é uma "magic command" que executa um script no ambiente atual
# Chamada do caderno sharepointAPI_CP
%run ../../sharepointAPI_CP

In [0]:
# Conexão com a API do Sharepoint_CP pegando subpáginas direto da página "Ações" dos projetos
cp_shareAPI = SharepointAPI_CP()
subpages = cp_shareAPI.get_subpages('CentraldeProjetos')

In [0]:
# Conexão e leitura do arquivo "Projetos" dentro de: yourdatalake\rawzone\DIRETORIO_DA_EXTRACAO\EXTRACAO\PASTA_INTERNA_DA_EXTRACAO/Project
project = Handler('EXTRACAO', 'PASTA_INTERNA_DA_EXTRACAO/Project')
project.read_from('rawzone')
project_df = project.df.withColumnRenamed("ID_Projeto", 'IddeProjeto')

#display(project_df)

In [0]:
# Obtém dados de ações de uma API, cria um DataFrame e define o esquema para os dados.

# Chama o método `get_info_by_list_new` da API `cp_shareAPI` para recuperar informações sobre ações.
# Parâmetros:
# - 'Ações': especifica que o tipo de dado a ser retornado é relacionado a ações.
# - subpages: uma variável contendo uma lista ou subpáginas adicionais para filtrar a busca.
# - ProjUID=True: indica que a busca deve incluir um identificador único (UID) do projeto.
acoes_data = cp_shareAPI.get_info_by_list_new('Ações', subpages, ProjUID=True)

# Cria uma lista de chaves/colunas com base nos dados retornados pela API.
# Considera-se que `acoes_data` é uma lista de dicionários, e acessa-se o primeiro item [0] 
# para obter suas chaves (nomes de colunas).
rows = list(acoes_data[0].keys())

# Define um esquema para um DataFrame do Spark usando `StructType` e `StructField` com base nas colunas obtidas.
# Cada campo é do tipo `StringType`, e o terceiro argumento `True` indica que os campos podem ter valores nulos.
schema = StructType([StructField(i, StringType(), True) for i in rows])

# Cria um DataFrame usando a API `cp_shareAPI.create_df`.
# Parâmetros:
# - acoes_data: os dados de ações obtidos da API.
# - rows: as colunas extraídas dos dados.
# - schema: o esquema definido para os dados, determinando a estrutura e tipos das colunas.
acoes_df = cp_shareAPI.create_df(acoes_data, rows, schema)

In [0]:
# Integra identificadores de projeto ao DataFrame de ações. A função `get_proj_ids` adiciona ou mapeia
# os identificadores de projeto (IDs) ao `acoes_df` a partir de outro DataFrame, `project_df`.
# Parâmetros:
# - acoes_df: o DataFrame de ações criado anteriormente.
# - project_df: um DataFrame contendo informações dos projetos, incluindo seus identificadores (IDs).
acoes_df = get_proj_ids(acoes_df, project_df)

In [0]:
# Cria uma nova coluna 'New_IddeProjeto' com base em uma condição
acoes_df = acoes_df.withColumn('New_IddeProjeto',
when(col('ID_Projeto') == '', col('IddeProjeto')) # Verifica se a coluna 'ID_Projeto' está vazia
.otherwise(col('ID_Projeto'))) # Se 'ID_Projeto' não estiver vazia, mantém o valor original

# Remove as colunas originais 'ID_Projeto' e 'IddeProjeto'
acoes_df = acoes_df.drop("ID_Projeto", "IddeProjeto") \
.withColumnRenamed("New_IddeProjeto", 'ID_Projeto') # Renomeia 'New_IddeProjeto' para 'ID_Projeto', substituindo a coluna original

In [0]:
# Faz um distinct count dos registros no DataFrame
acoes_df.distinct().count()

In [0]:
# Escreve a extração em um arquivo "Ações" na rawzone no caminho: yourdatalake\silverzone\Central_Projetos
v_acoes = Handler('Ações', path)
v_acoes.df = acoes_df
v_acoes.write_on('rawzone')