In [1]:
# Bibliotecas - API
import requests

# Bibliotecas - Gráficos
import numpy as np
import pandas as pd
import missingno as msno
import plotly.express as px
import plotly.offline as pyo
import plotly.graph_objs as go
import matplotlib.pyplot as plt
import plotly.figure_factory as ff

# Bibliotecas - Dash
import dash
from dash import dcc
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output

# Outras
import warnings
warnings.filterwarnings('ignore')


#_________________________Cria a url de requisição da API_Aneel______________________________#
# https://dadosabertos.aneel.gov.br/dataset/relacao-de-empreendimentos-de-geracao-distribuida



# A api não pediu autenticação, para replicações futuras observar
# API Keys
# OAuth: aplicação de terceiros sem login
# Token: autenticação de usuários
# Certificados



# definições na criação do URL
url_base = "https://dadosabertos.aneel.gov.br/api/action/datastore_search?"
resource_id="b1bd71e7-d0ad-4214-9053-cbd58e9564a7"
limit_param = 1000000
timeout_seconds = 100
# offset: É útil para obter grandes conjuntos de dados em lotes menores
# sort: especifica em que ordem os resultados devem ser retornados (ascendente/descendente)
# q: restringe a requisicap à palavras chaves
# filters: filtra valores com base em falores por multiplas colunas
# fields: especifica os campos da tabela
# groups: agrupa os resultados por campos




def criar_url(resource_id, limit):
    resource_param = "resource_id=" + resource_id
    limit_param = "limit=" + str(limit)

    url = url_base + resource_param + "&" + limit_param
    return url
url = criar_url(resource_id, limit_param)

# Obter os dados da API
def get_data(url, timeout_seconds):
    try:
        # Requisição GET com o tempo limite definido
        response = requests.get(url, timeout=timeout_seconds)

        #  Verifica erros HTTP
        response.raise_for_status()

    except requests.exceptions.RequestException as e:
        print("Erro ao obter os dados:", e)
        return None

    # Resposta HTTP=200, indicando que a requisição foi bem sucedida
    if response.status_code == 200:
        # Em caso de sucesso, obtenha os dados em formato JSON e imprima "Requisição bem sucedida"
        data = response.json()
        print("Requisição bem sucedida")

        # Verificar resposta da API
        if data["success"]:
            # Gera lista de registros e a lista de colunas
            records = data["result"]["records"]
            columns = data["result"]["fields"]

            # Alguns dados não estavam presentes na tabela, realizei um teste de captação para as colunas "Bandeira" e "Nome_agente" na lista de colunas
            columns.append({"id": "Bandeira", "type": "text"})
            columns.append({"id": "Nome_agente", "type": "text"})

            # Cria o DataFrame
            df = pd.DataFrame.from_records(records, columns=[c["id"] for c in columns])

            # Retorne o DataFrame criado
            return df
        
            # Em caso de sucesso sem terotno de dados
        else:
            # Em caso de erro, imprir uma mensagem de aviso e retorne None
            print("A API não retornou dados.")
            return None
            
            # Se o código de status da resposta HTTP for diferente de 200, trate o erro de acordo com o código
    else:
        if response.status_code == 400:
            print("Requisição mal formada.")
        elif response.status_code == 401:
            print("Não autorizado.")
        elif response.status_code == 403:
            print("Acesso proibido.")
        elif response.status_code == 404:
            print("Recurso não encontrado.") 
        elif response.status_code == 500:
            print("Erro interno do servidor.")
        else:
            print("Erro desconhecido ao obter os dados.")

        return None

# Chame a função get_data para obter um DataFrame com os dados da API
df = get_data(url, timeout_seconds)



The dash_core_components package is deprecated. Please replace
`import dash_core_components as dcc` with `from dash import dcc`



The dash_html_components package is deprecated. Please replace
`import dash_html_components as html` with `from dash import html`



Requisição bem sucedida


In [3]:
import matplotlib.pyplot as plt
import missingno as msno

# Atualizando formato de colunas
def atualiza_formato_colunas(df):
    df = df.astype({
        "NumCNPJDistribuidora": np.int64,
        "CodClasseConsumo": np.int64,
        "CodSubGrupoTarifario": np.int64,
        "codUFibge": np.float64,
        "codRegiao": np.float64,
        "CodMunicipioIbge": np.float64,
        "QtdUCRecebeCredito": np.int64,
    })
    return df
df = atualiza_formato_colunas(df)

# análise de vazios
def analyze_dataframe(df):
    # Análise de colunas duplicadas
    duplicated_cols = df.columns[df.columns.duplicated(keep=False)]
    df_duplicated_col = df[duplicated_cols].sum()
    print('Colunas duplicadas: ', df_duplicated_col.tolist())

    # Análise de linhas duplicadas
    duplicated_rows = df.duplicated(keep=False)
    df_duplicated_line = duplicated_rows.sum()
    print('Linhas duplicadas: ', df_duplicated_line.tolist())

    # Análise de valores nulos
    na_tot = df.isna().sum().sort_values(ascending=False)
    na_perc = (df.isna().sum() / df.shape[0] * 100).round(2).sort_values(ascending=False)
    na = pd.concat([na_tot, na_perc], axis=1, keys=['+', '%'])
    print(na.head(10))
analyze_dataframe(df)

def check_duplicates(df):
    # Verifica colunas duplicadas
    duplicated_cols = df.columns[df.columns.duplicated(keep=False)]
    num_duplicated_cols = len(duplicated_cols)
    
    # Verifica linhas duplicadas
    duplicated_rows = df.duplicated(keep=False)
    num_duplicated_rows = duplicated_rows.sum()
    
    # Retorna o resultado
    return num_duplicated_cols, num_duplicated_rows
check_duplicates(df)


def visualizar_nulos(df):
    sorted_df = df.sort_values(by='NumCoordEEmpreendimento')
    fig, ax = plt.subplots(figsize=(10, 6))

    # Cria a matriz de visualização de nulos com o eixo x em rotação vertical e tamanho de fonte menor
    matriz_nulos = msno.matrix(sorted_df, ax=ax)
    ax.set_xticklabels(ax.get_xticklabels(), rotation=90, fontsize=8)
    plt.show()
#visualizar_nulos(df)


# Adicionando as colunas Ano, Mes e Ano_Mes
def adiciona_colunas(df):
    df['DatetimeIndex'] = pd.DatetimeIndex(df['DthAtualizaCadastralEmpreend'])
    df['Mes'] = pd.DatetimeIndex(df['DthAtualizaCadastralEmpreend']).month
    df['Ano'] = pd.DatetimeIndex(df['DthAtualizaCadastralEmpreend']).year
    df['Ano_Mes'] = pd.to_datetime(df['DthAtualizaCadastralEmpreend']).dt.strftime('%Y-%m')
    return df
adiciona_colunas(df)

df_clean = df.dropna(subset=['NumCPFCNPJ']).copy()


Colunas duplicadas:  []
Linhas duplicadas:  0
                             +       %
Nome_agente              32000  100.00
Bandeira                 32000  100.00
NumCoordNSub             31965   99.89
NumCoordESub             31965   99.89
NomSubEstacao            31965   99.89
NumCoordNEmpreendimento  27814   86.92
NumCoordEEmpreendimento  27814   86.92
codRegiao                   49    0.15
codUFibge                   49    0.15
CodEmpreendimento            2    0.01


In [28]:
# Adicionando as colunas Ano, Mes e Ano_Mes
def adiciona_colunas(df):
    df['DatetimeIndex'] = pd.DatetimeIndex(df['DthAtualizaCadastralEmpreend'])
    df['Mes'] = pd.DatetimeIndex(df['DthAtualizaCadastralEmpreend']).month
    df['Ano'] = pd.DatetimeIndex(df['DthAtualizaCadastralEmpreend']).year
    df['Ano_Mes'] = pd.to_datetime(df['DthAtualizaCadastralEmpreend']).dt.strftime('%Y-%m')
    return df
adiciona_colunas(df)


df_clean = df.dropna(subset=['NumCPFCNPJ']).copy()

# estudos da serie temporal
ts = df_clean.groupby(['Ano_Mes', 'SigUF','SigTipoConsumidor', 'DscClasseConsumo']).agg({'NumCPFCNPJ': 'nunique'}).reset_index()
ts = ts.sort_values('Ano_Mes')
ts = ts.set_index('Ano_Mes')
ts_clean = ts.drop(['SigUF', 'SigTipoConsumidor','DscClasseConsumo'], axis=1)
ts_clean = ts_clean.groupby('Ano_Mes').sum()



#______________cria parametros
import statsmodels.api
from statsmodels.tsa.seasonal import seasonal_decompose
resultado = seasonal_decompose(ts_clean['NumCPFCNPJ'], period = 1)
tendencia = resultado.trend
sazonalidade = resultado.seasonal
residuo = resultado.resid

ts_clean.index
ts_clean.to_excel("ts_clean.xlsx")

In [25]:
#______________tendencia
fig_tendencia = go.Figure()
# Adicionar linha da tendência
fig_tendencia.add_trace(go.Scatter(x=ts_clean.index, y=tendencia, mode='lines', name='Tendência'))
# Configurar o layout
fig_tendencia.update_layout(
    xaxis_title='Ano_Mes',
    yaxis_title='NumCPFCNPJ',
    title='Tendência'
)