In [142]:
import pandas as pd
import numpy as np
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pandas as pd
from datetime import datetime
from google.cloud import storage
import matplotlib.pyplot as plt

In [2]:
# Classes e funções necessáriaspara o funcionamento do código:
def verificacao_texto(data_frame, coluna, tamanho_texto: None, numeros: bool):
    '''
    Função para identificar problemas em colunas que contenham apenas letras e números
    '''
    problemas = []
    for i in range(len(data_frame)):
        texto = str(data_frame.loc[i, coluna])
        if numeros == False:
            if texto.isalpha() == False:
                problemas.append(data_frame.loc[i, coluna])
            elif tamanho_texto != None:
                if len(texto) != tamanho_texto:
                    problemas.append(data_frame.loc[i, coluna])
        elif numeros == True:
            if texto.isalnum() == False:
                problemas.append(data_frame.loc[i, coluna])
            elif tamanho_texto != None:
                if len(texto) != tamanho_texto:
                    problemas.append(data_frame.loc[i, coluna])
    # Imprimindo os problemas que deverão ser corrigidos:
    df_problemas = pd.DataFrame(problemas, columns=["corrigir"])
    df_problemas = pd.DataFrame(df_problemas.corrigir.unique(), columns=["Corrigir:"])

    if len(df_problemas) > 0:
        print(df_problemas)
    else:
        print("Nenhum problema detectado nesta coluna")
    print(f"Verificação da coluna {coluna} concluída")


def verificacao_tipo(data_frame, coluna, tipo: type):
    '''
    Função para verificar problemas relacionado ao tipo de dados que a coluna deve possuir
    '''
    problemas = []
    for i in range(len(data_frame)):
        try:
            tipo(data_frame.loc[i, coluna])
        except Exception:
            problemas.append(data_frame.loc[i, coluna])
    df_problemas = pd.DataFrame(problemas, columns=["corrigir"])
    df_problemas = pd.DataFrame(df_problemas.corrigir.unique(), columns=["Corrigir: "])
    print("--------------------------------------------------------------------")
    print(f"Verificando a coluna {coluna}: ")
    if len(df_problemas) > 0:
        print(df_problemas)
    else:
        print("Nenhum problema detectado nessa coluna")
    print(f"Verificação da coluna {coluna} concluída")
    return df_problemas


def verificacao_valor_padrao(data_frame, coluna):
    '''
    Função para identificar os valores únicos presentes em uma coluna, útil para caso de colunas que possuem valores padronizados, como por exemplo 'SIM' e 'NÃO'
    '''
    unicos = data_frame[coluna].unique()
    print("---------------------------------------------------------------------")
    print(f"Verificando valores únicos da coluna {coluna}: ")
    print(unicos)
    print("Verificação concluída")


def verificacao_data(data_frame, coluna, formato: str):
    '''
    Função para verificar se todos os valores de uma coluna correspondem a data do formato especificado
    '''
    problemas = []
    # Verificando se tratam-se de datas:
    for i in range(len(data_frame)):
        try:
            datetime.now() - datetime.strptime(data_frame.loc[i, coluna], formato)
        except Exception:
            if data_frame.loc[i, coluna] != 'NULO':
                problemas.append(data_frame.loc[i, coluna])
    df_problemas = pd.DataFrame(problemas, columns=["Corrigir:"])
    print("---------------------------------------------------------------------")
    print(f"Verificando a coluna {coluna}: ")
    if len(df_problemas) > 0:
        print(df_problemas)
    else:
        print("Não há nenhum problema para corrigir")
    print(f"Verificação da coluna {coluna} concluída! ")

class Conector_mysql:
    def __init__(self, host, user, password, db):
        self.host = host
        self.user = user
        self.password = password
        self.db = db
    
    def envia_mysql(self, dfs, table):
        self.dfs = dfs
        self.table = table
        self.dfs.write.format("jdbc")\
                .option('url', f'jdbc:mysql://{self.host}/{self.db}')\
                .option('driver', 'com.mysql.cj.jdbc.Driver')\
                .option("numPartitions", "10") \
                .option("user",self.user)\
                .option("password", self.password)\
                .option("dbtable", self.db + "." + self.table)\
                .mode("append").save()
    
    def ler_mysql(self, table, spark_conection):
        self.table = table
        self.spark_conection = spark_conection
        self.df = self.spark_conection.read.format("jdbc")\
            .option('url', f'jdbc:mysql://{self.host}/{self.db}')\
            .option('driver', 'com.mysql.cj.jdbc.Driver')\
            .option("user",self.user)\
            .option("password", self.password)\
            .option("dbtable", self.db + "." + self.table).load()
        return self.df

In [4]:
'''
Conectando com a SpakSession
'''

spark = ( SparkSession.builder
                        .master("local")
                        .appName("sparksql")
                        .config("spark.ui.port", "4050")
                        .config("spark.jars", 'https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar')
                        .getOrCreate()
        )

# Verificando inconsistências da DF Arrecadacao

In [5]:
# Conectando com o mySQL
conexao_mysql = Conector_mysql("34.72.50.43", "root", ">}Dzh.=}YhZ#(G>s", "original")

# Puxando os dados do MySQL
dfs_arrecadacao = conexao_mysql.ler_mysql('arrecadacao', spark)
dfs_barragens = conexao_mysql.ler_mysql('barragens', spark)
dfs_autuacao = conexao_mysql.ler_mysql('autuacao', spark)
dfs_beneficiada = conexao_mysql.ler_mysql('beneficiada', spark)
dfs_distribuicao = conexao_mysql.ler_mysql('distribuicao', spark)
dfs_municipio = conexao_mysql.ler_mysql('municipio', spark)
dfs_pib = conexao_mysql.ler_mysql('pib', spark)
dfs_dados_populacao = conexao_mysql.ler_mysql('dados_populacao', spark)

In [6]:
# Transformando DataFrame Arrecadação Spark em DataFrame Pandas
dfp_arrecadacao = dfs_arrecadacao.toPandas()

In [7]:
# Analisando dados
dfp_arrecadacao.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1657294 entries, 0 to 1657293
Data columns (total 13 columns):
 #   Column                    Non-Null Count    Dtype  
---  ------                    --------------    -----  
 0   index                     1657294 non-null  int64  
 1   Ano                       1657294 non-null  int32  
 2   Mês                       1657294 non-null  int32  
 3   Processo                  1552074 non-null  float64
 4   AnoDoProcesso             1552074 non-null  float64
 5   Tipo_PF_PJ                1657294 non-null  object 
 6   CPF_CNPJ                  1657208 non-null  object 
 7   Substância                1657294 non-null  object 
 8   UF                        1655722 non-null  object 
 9   Município                 1655722 non-null  object 
 10  QuantidadeComercializada  1642254 non-null  object 
 11  UnidadeDeMedida           1656589 non-null  object 
 12  ValorRecolhido            1657294 non-null  object 
dtypes: float64(2), int32(2), in

In [8]:
dfp_arrecadacao.isna().sum()

index                            0
Ano                              0
Mês                              0
Processo                    105220
AnoDoProcesso               105220
Tipo_PF_PJ                       0
CPF_CNPJ                        86
Substância                       0
UF                            1572
Município                     1572
QuantidadeComercializada     15040
UnidadeDeMedida                705
ValorRecolhido                   0
dtype: int64

In [9]:
dfp_arrecadacao.duplicated().sum()

0

In [10]:
dfp_arrecadacao.Ano.unique()


array([2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012,
       2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022],
      dtype=int32)

In [11]:
dfp_arrecadacao.Mês.unique()

array([ 8,  5,  6,  7,  9, 10, 11, 12,  1,  2,  3,  4], dtype=int32)

In [12]:
verificacao_tipo(dfp_arrecadacao, 'Processo', int)
verificacao_texto(dfp_arrecadacao, 'Tipo_PF_PJ', None, False)
verificacao_valor_padrao(dfp_arrecadacao, 'Substância')
verificacao_valor_padrao(dfp_arrecadacao, 'UF')
verificacao_valor_padrao(dfp_arrecadacao, 'Município')
verificacao_valor_padrao(dfp_arrecadacao, 'QuantidadeComercializada')
verificacao_valor_padrao(dfp_arrecadacao, 'QuantidadeComercializada')
verificacao_valor_padrao(dfp_arrecadacao, 'UnidadeDeMedida')
verificacao_valor_padrao(dfp_arrecadacao, 'ValorRecolhido')

--------------------------------------------------------------------
Verificando a coluna Processo: 
   Corrigir: 
0         NaN
Verificação da coluna Processo concluída
  Corrigir:
0         -
Verificação da coluna Tipo_PF_PJ concluída
---------------------------------------------------------------------
Verificando valores únicos da coluna Substância: 
['BASALTO' 'AREIA' 'MINÉRIO DE FERRO' 'ARGILA P/CER. VERMELH'
 'ÁGUA MINERAL' 'AREIA FINA' 'GRANITO P/ BRITA' 'FOLHELHO ARGILOSO'
 'CALCÁRIO' 'CASCALHO' 'AREIA FLUVIAL' 'AREIA COMUM' 'BASALTO P/ BRITA'
 'ÁGUA MINERAL ALC. BIC' 'ARGILA' 'QUARTZITO FRIÁVEL' 'MÁRMORE'
 'ARGILA COMUM' 'GRANITO' 'FILITO' 'SAIBRO' 'BRITA DE GRANITO' 'NÍQUEL'
 'AREIA LAVADA' 'CHARNOQUITO' 'ARGILA VERMELHA' 'BASALTO P/ REVESTIMENTO'
 'FOSFATO' 'GIPSITA' 'QUARTZITO' 'ARGILA REFRATÁRIA' 'ARGILA CAULÍNICA'
 'ARENITO' 'CALCÁRIO CALCÍTICO' 'GNAISSE P/ BRITA' 'MINÉRIO DE OURO'
 'CAULIM' 'FELDSPATO' 'FONÓLITO' 'CALCÁRIO DOLOMÍTICO' 'PEDRA CALCÁRIA'
 'PEDRA SÃO TOMÉ' 

In [None]:
# Tratamentos:
dfp_arrecadacao.drop_duplicates()
dfp_arrecadacao.replace(to_replace='ARGILA P/CER. VERMELH', value='ARGILA P/CER. VERMELHA', inplace=True)
dfp_arrecadacao.replace(to_replace='-', value=np.nan, inplace=True)
dfp_arrecadacao.fillna(np.nan)
dfp_arrecadacao.replace(to_replace='None', value=np.nan, inplace=True)
dfp_arrecadacao.drop(['CPF_CNPJ', 'index'], axis=1, inplace=True)
dfp_arrecadacao['QuantidadeComercializada'] = dfp_arrecadacao['QuantidadeComercializada'].str.replace(',', '.')
dfp_arrecadacao['ValorRecolhido'] = dfp_arrecadacao['ValorRecolhido'].str.replace(',', '.')
dfp_arrecadacao['QuantidadeComercializada'] = dfp_arrecadacao['QuantidadeComercializada'].astype(float)
dfp_arrecadacao['ValorRecolhido'] = dfp_arrecadacao['ValorRecolhido'].astype(float)
dfp_arrecadacao['UnidadeDeMedida'].replace(to_replace='m3', value='Metros Cubicos', inplace=True)
dfp_arrecadacao['UnidadeDeMedida'].replace(to_replace='t', value='Toneladas', inplace=True)
dfp_arrecadacao['UnidadeDeMedida'].replace(to_replace='l', value='Litros', inplace=True)
dfp_arrecadacao['UnidadeDeMedida'].replace(to_replace='g', value='Gramas', inplace=True)
dfp_arrecadacao['UnidadeDeMedida'].replace(to_replace='m2', value='Metros Quadrados', inplace=True)
dfp_arrecadacao['UnidadeDeMedida'].replace(to_replace='ct', value='Quilates', inplace=True)
dfp_arrecadacao.replace(to_replace='OURO', value='MINÉRIO DE OURO', inplace=True)
dfp_arrecadacao.replace(to_replace='FERRO', value='MINÉRIO DE FERRO', inplace=True)
dfp_arrecadacao.replace(to_replace='COBRE', value='MINÉRIO DE COBRE', inplace=True)

# Verificando inconsistências na dataframe df_pib

In [13]:
# Transformando DataFrame PIB Spark em DataFrame Pandas
dfp_pib = dfs_pib.toPandas()


In [14]:
dfp_pib.describe()

Unnamed: 0,index,ano,id_municipio,pib,impostos_liquidos,va,va_agropecuaria,va_industria,va_servicos,va_adespss
count,94616.0,94616.0,94616.0,94616.0,94616.0,94616.0,94616.0,94616.0,94616.0,94616.0
mean,47307.5,2010.003002,3253181.0,728734300.0,105231500.0,623502800.0,33135670.0,153183600.0,332972300.0,104211300.0
std,27313.430872,4.898931,984455.8,8059832000.0,1489081000.0,6596790000.0,63566960.0,1094854000.0,4885391000.0,1058767000.0
min,0.0,2002.0,1100015.0,-19046430.0,-15088400.0,-510593100.0,-2298910.0,-2897193000.0,372278.0,1446664.0
25%,23653.75,2006.0,2512101.0,39057840.0,1478442.0,37299480.0,6121093.0,1904200.0,8765398.0,12865550.0
50%,47307.5,2010.0,3146255.0,89715930.0,4477020.0,84678480.0,15196490.0,6322138.0,23626570.0,25689120.0
75%,70961.25,2014.0,4119103.0,255503200.0,18117850.0,235662500.0,35326770.0,35201650.0,80641280.0,59089900.0
max,94615.0,2018.0,5300108.0,714683400000.0,127154300000.0,587529100000.0,2482540000.0,66893050000.0,485428800000.0,101792800000.0


In [15]:
print(dfp_pib.isna().sum())

index                0
ano                  0
id_municipio         0
pib                  0
impostos_liquidos    0
va                   0
va_agropecuaria      0
va_industria         0
va_servicos          0
va_adespss           0
dtype: int64


In [16]:
dfp_pib.duplicated().sum()


0

In [17]:
print(dfp_pib.dtypes)


index                int64
ano                  int32
id_municipio         int32
pib                  int64
impostos_liquidos    int64
va                   int64
va_agropecuaria      int64
va_industria         int64
va_servicos          int64
va_adespss           int64
dtype: object


# Verificando inconsistências na DF Barragens

In [18]:
# Transformando Dataframe Barragens Spark em Dataframe Pandas
dfp_barragens = dfs_barragens.toPandas()

In [19]:
# Analisando dados
dfp_barragens.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 906 entries, 0 to 905
Data columns (total 95 columns):
 #   Column                                                            Non-Null Count  Dtype 
---  ------                                                            --------------  ----- 
 0   index                                                             906 non-null    int64 
 1   ID                                                                906 non-null    int32 
 2   Nome                                                              906 non-null    object
 3   Empreendedor                                                      906 non-null    object
 4   CPF_CNPJ                                                          906 non-null    object
 5   UF                                                                906 non-null    object
 6   Município                                                         906 non-null    object
 7   Latitude                                    

In [20]:
dfp_barragens.isna().sum()
print(dfp_barragens.dtypes)

index                          int64
ID                             int32
Nome                          object
Empreendedor                  object
CPF_CNPJ                      object
                               ...  
Impacto sócio-econômico       object
Data da Finalização da DCE    object
Motivo de Envio               object
RT/Declaração                 object
RT/Empreendimento             object
Length: 95, dtype: object


In [21]:
verificacao_tipo(dfp_barragens, 'N pessoas afetadas a jusante em caso de rompimento da barragem', float)
verificacao_valor_padrao(dfp_barragens, 'Empreendedor')
verificacao_valor_padrao(dfp_barragens, 'UF')
verificacao_valor_padrao(dfp_barragens, 'Município')
verificacao_valor_padrao(dfp_barragens, 'Categoria de Risco - CRI')
verificacao_valor_padrao(dfp_barragens, 'Nível de Emergência')
verificacao_valor_padrao(dfp_barragens, 'Tipo de Barragem de Mineração')
verificacao_valor_padrao(dfp_barragens, 'Vida útil prevista da Barragem (anos)')
verificacao_valor_padrao(dfp_barragens, 'Estrutura com o Objetivo de Contenção')
verificacao_valor_padrao(dfp_barragens, 'Minério principal presente no reservatório')
verificacao_valor_padrao(dfp_barragens, 'N pessoas afetadas a jusante em caso de rompimento da barragem')

--------------------------------------------------------------------
Verificando a coluna N pessoas afetadas a jusante em caso de rompimento da barragem: 
                                 Corrigir: 
0                                     1-100
1                                         -
2                                    875,00
3                                  1.100,00
4                                  1.125,00
..                                      ...
163                              729.000,00
164                            5.648.000,00
165                          154.000.000,00
166                           12.200.000,00
167  Projeto executivo ou "como construído"

[168 rows x 1 columns]
Verificação da coluna N pessoas afetadas a jusante em caso de rompimento da barragem concluída
---------------------------------------------------------------------
Verificando valores únicos da coluna Empreendedor: 
['MOSAIC FERTILIZANTES P&K LTDA.' 'MINERACAO SAO FRANCISCO DE ASSIS LTDA'
 '

# Verificando inconsistencias no dataframe dados_populacao

In [22]:
# Transformando DataFrame Dados População Spark em DataFrame Pandas
dfp_dados_populacao = dfs_dados_populacao.toPandas()

In [23]:
# Analisando dados
print(dfp_dados_populacao.dtypes)

index                                              int64
Ano                                                int64
Esperança de Vida ao Nascer                      float64
Esperança de Vida ao Nascer - Homens             float64
Esperança de Vida ao Nascer - Mulheres           float64
Homens                                             int64
Mulheres                                           int64
Nascimentos                                        int64
População total                                    int64
Razão de Dependência                             float64
Razão de Dependência - Idosos 65 ou mais anos    float64
Razão de Dependência - Jovens 0 a 14 anos        float64
Taxa Bruta de Mortalidade                        float64
Taxa Bruta de Natalidade                         float64
Taxa de Crescimento Geométrico                    object
Taxa de Fecundidade Total                        float64
Taxa de Mortalidade Infantil                     float64
Taxa de Mortalidade Infantil - 

In [25]:
dfp_dados_populacao.head()

Unnamed: 0,index,Ano,Esperança de Vida ao Nascer,Esperança de Vida ao Nascer - Homens,Esperança de Vida ao Nascer - Mulheres,Homens,Mulheres,Nascimentos,População total,Razão de Dependência,...,Taxa Bruta de Mortalidade,Taxa Bruta de Natalidade,Taxa de Crescimento Geométrico,Taxa de Fecundidade Total,Taxa de Mortalidade Infantil,Taxa de Mortalidade Infantil - Homens,Taxa de Mortalidade Infantil - Mulheres,uf,Índice de Envelhecimento,Óbitos
0,0,2010,71.68,68.46,75.4,384406,380919,16489,765325,66.22,...,5.09,21.55,x,2.45,22.08,24.17,19.88,AC,11.8,3899


In [24]:
verificacao_valor_padrao(dfp_dados_populacao, 'Ano')
verificacao_valor_padrao(dfp_dados_populacao, 'Esperança de Vida ao Nascer')
verificacao_valor_padrao(dfp_dados_populacao, 'Esperança de Vida ao Nascer - Homens')
verificacao_valor_padrao(dfp_dados_populacao, 'Esperança de Vida ao Nascer - Mulheres')
verificacao_valor_padrao(dfp_dados_populacao, 'Homens')
verificacao_valor_padrao(dfp_dados_populacao, 'Mulheres')
verificacao_valor_padrao(dfp_dados_populacao, 'Nascimentos')
verificacao_valor_padrao(dfp_dados_populacao, 'População total')
verificacao_valor_padrao(dfp_dados_populacao, 'Razão de Dependência')
verificacao_valor_padrao(dfp_dados_populacao, 'Razão de Dependência - Idosos 65 ou mais anos')
verificacao_valor_padrao(dfp_dados_populacao, 'Razão de Dependência - Jovens 0 a 14 anos')
verificacao_valor_padrao(dfp_dados_populacao, 'Taxa Bruta de Mortalidade')
verificacao_valor_padrao(dfp_dados_populacao, 'Taxa Bruta de Natalidade')
verificacao_valor_padrao(dfp_dados_populacao, 'Taxa de Crescimento Geométrico')
verificacao_valor_padrao(dfp_dados_populacao, 'Taxa de Fecundidade Total')
verificacao_valor_padrao(dfp_dados_populacao, 'Taxa de Mortalidade Infantil')
verificacao_valor_padrao(dfp_dados_populacao, 'Taxa de Mortalidade Infantil - Homens')
verificacao_valor_padrao(dfp_dados_populacao, 'Taxa de Mortalidade Infantil - Mulheres')
verificacao_valor_padrao(dfp_dados_populacao, 'Índice de Envelhecimento')
verificacao_valor_padrao(dfp_dados_populacao, 'Óbitos')
verificacao_valor_padrao(dfp_dados_populacao, 'uf')

---------------------------------------------------------------------
Verificando valores únicos da coluna Ano: 
[2010]
Verificação concluída
---------------------------------------------------------------------
Verificando valores únicos da coluna Esperança de Vida ao Nascer: 
[71.68]
Verificação concluída
---------------------------------------------------------------------
Verificando valores únicos da coluna Esperança de Vida ao Nascer - Homens: 
[68.46]
Verificação concluída
---------------------------------------------------------------------
Verificando valores únicos da coluna Esperança de Vida ao Nascer - Mulheres: 
[75.4]
Verificação concluída
---------------------------------------------------------------------
Verificando valores únicos da coluna Homens: 
[384406]
Verificação concluída
---------------------------------------------------------------------
Verificando valores únicos da coluna Mulheres: 
[380919]
Verificação concluída
------------------------------------------

# Verificando inconsistências na DF municipio

In [32]:
# Transformando o dataframe municipio spark em Dataframe Pandas
dfp_municipio = dfs_municipio.toPandas()

In [33]:
# Verificando as colunas:
verificacao_valor_padrao(dfp_municipio,  "UF")
verificacao_tipo(dfp_municipio, "COD. UF", int)
verificacao_texto(dfp_municipio, "COD. UF", 2, True)
verificacao_tipo(dfp_municipio, "COD. MUNIC", int)
verificacao_texto(dfp_municipio, "NOME DO MUNICÍPIO", None, False)
verificacao_tipo(dfp_municipio, "POPULAÇÃO ESTIMADA", float)

---------------------------------------------------------------------
Verificando valores únicos da coluna UF: 
['RO' 'AC' 'AM' 'RR' 'PA' 'AP' 'TO' 'MA' 'PI' 'CE' 'RN' 'PB' 'PE' 'AL'
 'SE' 'BA' 'MG' 'ES' 'RJ' 'SP' 'PR' 'SC' 'RS' 'MS' 'MT' 'GO' 'DF']
Verificação concluída
--------------------------------------------------------------------
Verificando a coluna COD. UF: 
Nenhum problema detectado nessa coluna
Verificação da coluna COD. UF concluída
Nenhum problema detectado nesta coluna
Verificação da coluna COD. UF concluída
--------------------------------------------------------------------
Verificando a coluna COD. MUNIC: 
Nenhum problema detectado nessa coluna
Verificação da coluna COD. MUNIC concluída
                  Corrigir:
0     Alta Floresta D'Oeste
1         Colorado do Oeste
2             Costa Marques
3           Espigão D'Oeste
4             Guajará-Mirim
...                     ...
2536   Terezópolis de Goiás
2537           Três Ranchos
2538    Valparaíso de Goiás
2539 

Unnamed: 0,Corrigir:
0,548.952(1)
1,44.873(2)
2,13.482(3)
3,17.193(4)
4,13.462(5)
5,47.685(6)
6,33.981(7)
7,2.255.903
8,116.439(8)
9,26.566(9)


In [34]:
dfp_municipio.describe()

Unnamed: 0,index,COD. UF,COD. MUNIC
count,5570.0,5570.0,5570.0
mean,2784.5,32.377738,15816.982585
std,1608.064831,9.833862,15997.29978
min,0.0,11.0,13.0
25%,1392.25,25.0,4507.25
50%,2784.5,31.0,10400.5
75%,4176.75,41.0,20853.0
max,5569.0,53.0,72202.0


In [35]:
dfp_municipio.duplicated().sum()

0

In [36]:
dfp_municipio.isna().sum()

index                 0
UF                    0
COD. UF               0
COD. MUNIC            0
NOME DO MUNICÍPIO     0
POPULAÇÃO ESTIMADA    0
dtype: int64

## Plotagens Preliminares

In [None]:
# Arrecadacao
plt.figure(figsize=(10,10))
plt.title("Total Arrecadado por ano")
plt.plot(dfp_arrecadacao["Ano"], dfp_arrecadacao["ValorRecolhido"])
plt.xlabel("Ano")
plt.ylabel("Número de Ocorrências")
plt.show()

# Analisando os dados depois de Tratados

## Spark.sql

### Arrecadacao

In [45]:
'''
Puxando o o data frame tratado do bucket
'''
df_arrecadacao = spark.read.csv("gs://soulcode-mineracao/tratados/arrecadacao.csv", sep=',', encoding='utf-8', header=True)
df_arrecadacao = df_arrecadacao.withColumnRenamed("Substância", "Substancia").withColumnRenamed("Mês", "Mes").withColumnRenamed("Município", "Municipio")

In [46]:
df_arrecadacao.columns

['Ano',
 'Mes',
 'Processo',
 'AnoDoProcesso',
 'Tipo_PF_PJ',
 'Substancia',
 'UF',
 'Municipio',
 'QuantidadeComercializada',
 'UnidadeDeMedida',
 'ValorRecolhido']

In [47]:
'''
Cria ou substitui uma visão temporária para o DataFrame

O tempo de vida dessa tabela temporária está vinculado à 
SparkSession que foi usada para criar o DataFrame.

'''

df_arrecadacao.createOrReplaceTempView('arrecadacao')

In [48]:
'''
> COUNT
Contando a quantidade de linhas do Dataframe
'''
spark.sql('''SELECT COUNT (*)
          FROM arrecadacao
          ''').show()

+--------+
|count(1)|
+--------+
| 1657294|
+--------+



In [49]:
'''
> DISTINCT
Consultando apenas os valores distintos
Aplicando, como exemplo, a coluna 'Ano'.
'''

spark.sql('''SELECT DISTINCT Ano
          FROM arrecadacao
          ORDER BY Ano
          ''').show(30)

+----+
| Ano|
+----+
|2002|
|2003|
|2004|
|2005|
|2006|
|2007|
|2008|
|2009|
|2010|
|2011|
|2012|
|2013|
|2014|
|2015|
|2016|
|2017|
|2018|
|2019|
|2020|
|2021|
|2022|
+----+



In [50]:
'''
> SOMA
Consulta para a soma da quantidade comercializada por estado da view 'teste',
agrupadas por estado e ordenadas do maior ao menor valor da soma da
quantidade comercializada em ordem decrescente
'''
spark.sql('''SELECT UF, cast(SUM(QuantidadeComercializada) as DECIMAL(15,2)) as QuantidadeComercializada
             FROM arrecadacao 
             GROUP BY UF
             ORDER BY QuantidadeComercializada DESC
             ''').show(30)

+----+------------------------+
|  UF|QuantidadeComercializada|
+----+------------------------+
|  GO|        9183313766101.26|
|  MG|        8768631954947.87|
|  SP|        6364884182577.36|
|  PB|         720245324293.12|
|  AP|         464456059814.76|
|  RS|         189459061922.14|
|  DF|         122644869084.48|
|  RJ|         120209532126.80|
|  MA|          93153116399.30|
|  SC|          71489575584.17|
|  BA|          47709341949.63|
|  MS|          30415871547.41|
|  PA|          24468919173.55|
|  PR|          16775349626.24|
|  AM|          15155884589.54|
|  PE|          10203767508.34|
|  RN|          10196596252.01|
|  SE|           9416274536.25|
|  AL|           8536400669.33|
|  CE|           7068395217.58|
|  TO|           6334523495.39|
|  ES|           3103633530.43|
|  PI|           2902046048.37|
|  RO|           1635857770.09|
|  RR|            331384919.76|
|  AC|            216741034.70|
|null|             54910567.93|
|  MT|                    null|
+----+--

In [51]:
'''
> MÉDIA
Consulta para a média da quantidade comercializada por estado da view 'teste',
agrupadas por estado e ordenadas do maior ao menor valor da média da
quantidade comercializada em ordem decrescente
'''

spark.sql('''SELECT UF, cast(AVG(QuantidadeComercializada) as DECIMAL(15,2)) as QuantidadeComercializada
             FROM arrecadacao 
             GROUP BY UF
             ORDER BY QuantidadeComercializada DESC
             ''').show(30)

+----+------------------------+
|  UF|QuantidadeComercializada|
+----+------------------------+
|  MT|            179977695.06|
|  GO|            123844451.48|
|  AP|             87139973.70|
|  MG|             34020702.54|
|  PB|             32344410.11|
|  DF|             24937956.30|
|  SP|             23575740.74|
|  MA|              4868205.72|
|  AM|              1597374.01|
|  RJ|              1552573.19|
|  RS|              1138042.64|
|  AL|              1108335.58|
|  MS|               867587.19|
|  RN|               854988.79|
|  BA|               850615.85|
|  SE|               713030.03|
|  TO|               510313.66|
|  PA|               485436.64|
|  SC|               443637.84|
|  PE|               388922.38|
|  CE|               215335.73|
|  PI|               154594.40|
|  PR|               127531.38|
|  RR|                94304.19|
|  AC|                81328.72|
|  RO|                58234.23|
|  ES|                43304.50|
|null|                35563.84|
+----+--

In [None]:
'''
> SOMA
Consulta para a soma do valor recolhido da view 'teste',
agrupadas por ano e uf e ordenadas por ano e soma do
valor recolhido em ordem decrescente
'''

spark.sql('''SELECT Ano, UF, cast(SUM(ValorRecolhido) as DECIMAL(15,2)) as ValorRecolhido
             FROM arrecadacao 
             GROUP BY Ano, UF
             ORDER BY Ano, ValorRecolhido DESC
             ''').show(200)

In [None]:
'''
> MÉDIA
Consulta para a média do valor recolhido da view 'teste',
agrupadas por ano e uf e ordenadas por ano e média do
valor recolhido em ordem decrescente
'''

spark.sql('''SELECT Ano, UF, cast(AVG(ValorRecolhido) as DECIMAL(15,2)) as ValorRecolhido
             FROM arrecadacao 
             GROUP BY Ano, UF
             ORDER BY Ano, ValorRecolhido DESC
             ''').show(200)

In [None]:
spark.sql("SELECT DISTINCT unidadedemedida from arrecadacao").show()

In [4]:
'''
Consulta dos 10 minérios mais comercializados, por unidade de medida
'''

spark.sql("SELECT Substancia, Cast(SUM(QUANTIDADECOMERCIALIZADA) as DECIMAL(25,3)) as SomaQuantidadeComercializada_m3 "
          "FROM arrecadacao WHERE UnidadeDeMedida = 'Metros Cubicos' GROUP BY(Substancia) ORDER BY SUM(QUANTIDADECOMERCIALIZADA) DESC").show(10)

spark.sql("SELECT Substancia, Cast(SUM(QUANTIDADECOMERCIALIZADA) as DECIMAL(25,3)) as SomaQuantidadeComercializada_m2 "
          "FROM arrecadacao WHERE UnidadeDeMedida = 'Metros Quadrados' GROUP BY(Substancia) ORDER BY SUM(QUANTIDADECOMERCIALIZADA) DESC").show(20)

spark.sql("SELECT Substancia, Cast(SUM(QUANTIDADECOMERCIALIZADA) as DECIMAL(25,3)) as SomaQuantidadeComercializada_null "
          "FROM arrecadacao WHERE UnidadeDeMedida is NULL GROUP BY(Substancia) ORDER BY SUM(QUANTIDADECOMERCIALIZADA) DESC").show(20)

spark.sql("SELECT Substancia, Cast(SUM(QUANTIDADECOMERCIALIZADA) as DECIMAL(25,3)) as SomaQuantidadeComercializada_T "
          "FROM arrecadacao WHERE UnidadeDeMedida = 'Toneladas' GROUP BY(Substancia) ORDER BY SUM(QUANTIDADECOMERCIALIZADA) DESC").show(10)

spark.sql("SELECT Substancia, Cast(SUM(QUANTIDADECOMERCIALIZADA) as DECIMAL(25,3)) as SomaQuantidadeComercializada_L "
          "FROM arrecadacao WHERE UnidadeDeMedida = 'Litros' GROUP BY(Substancia) ORDER BY SUM(QUANTIDADECOMERCIALIZADA) DESC").show(10)

spark.sql("SELECT Substancia, Cast(SUM(QUANTIDADECOMERCIALIZADA) as DECIMAL(25,3)) as SomaQuantidadeComercializada_g "
          "FROM arrecadacao WHERE UnidadeDeMedida = 'Gramas' GROUP BY(Substancia) ORDER BY SUM(QUANTIDADECOMERCIALIZADA) DESC").show(10)

spark.sql("SELECT Substancia, Cast(SUM(QUANTIDADECOMERCIALIZADA) as DECIMAL(25,3)) as SomaQuantidadeComercializada_kg "
          "FROM arrecadacao WHERE UnidadeDeMedida = 'Kg' GROUP BY(Substancia) ORDER BY SUM(QUANTIDADECOMERCIALIZADA) DESC").show(10)

spark.sql("SELECT Substancia, Cast(SUM(QUANTIDADECOMERCIALIZADA) as DECIMAL(25,3)) as SomaQuantidadeComercializada_K "
          "FROM arrecadacao WHERE UnidadeDeMedida = 'Quilates' GROUP BY(Substancia) ORDER BY SUM(QUANTIDADECOMERCIALIZADA) DESC").show(10)

AnalysisException: 'Table or view not found: arrecadacao; line 1 pos 112'

In [5]:
'''
Investigando os minérios em que a quantidade de medida é kg
'''
spark.sql("SELECT * FROM arrecadacao WHERE UnidadeDeMedida like '%Quilo%' ").show()

AnalysisException: 'Table or view not found: arrecadacao; line 1 pos 14'

In [6]:
'''
Investigando os minérios que possuem mais imposto
'''
spark.sql("SELECT Substancia, Cast(SUM(QUANTIDADECOMERCIALIZADA) as DECIMAL(25,3)) as SomaQuantidadeComercializada, "
          "CAST(SUM(ValorRecolhido) as DECIMAL(25,3)) as mediaValorRecolhido, CAST(SUM(ValorRecolhido)/SUM(QuantidadeComercializada) as DECIMAL(10,2)) as impostoPorTonelada "
          "FROM arrecadacao WHERE UnidadeDeMedida = 'Toneladas' GROUP BY(Substancia) ORDER BY impostoPorTonelada DESC").show(10)

spark.sql("SELECT Substancia, Cast(SUM(QUANTIDADECOMERCIALIZADA) as DECIMAL(25,3)) as SomaQuantidadeComercializada, "
          "CAST(SUM(ValorRecolhido) as DECIMAL(25,3)) as mediaValorRecolhido, CAST(SUM(ValorRecolhido)/SUM(QuantidadeComercializada) as DECIMAL(10,2)) as impostoPorQuilate "
          "FROM arrecadacao WHERE UnidadeDeMedida = 'Quilates' GROUP BY(Substancia) ORDER BY impostoPorQuilate DESC").show(10)

spark.sql("SELECT Substancia, Cast(SUM(QUANTIDADECOMERCIALIZADA) as DECIMAL(25,3)) as SomaQuantidadeComercializada, "
          "CAST(SUM(ValorRecolhido) as DECIMAL(25,3)) as mediaValorRecolhido, CAST(SUM(ValorRecolhido)/SUM(QuantidadeComercializada) as DECIMAL(10,2)) as impostoPorMetroQuad "
          "FROM arrecadacao WHERE UnidadeDeMedida = 'Metros Quadrados' GROUP BY(Substancia) ORDER BY impostoPorMetroQuad DESC").show(10)

spark.sql("SELECT Substancia, Cast(SUM(QUANTIDADECOMERCIALIZADA) as DECIMAL(25,3)) as SomaQuantidadeComercializada, "
          "CAST(SUM(ValorRecolhido) as DECIMAL(25,3)) as mediaValorRecolhido, CAST(SUM(ValorRecolhido)/SUM(QuantidadeComercializada) as DECIMAL(10,2)) as impostoPorMetroCub "
          "FROM arrecadacao WHERE UnidadeDeMedida = 'Metros Cubicos' GROUP BY(Substancia) ORDER BY impostoPorMetroCub DESC").show(10)

spark.sql("SELECT Substancia, Cast(SUM(QUANTIDADECOMERCIALIZADA) as DECIMAL(25,3)) as SomaQuantidadeComercializada, "
          "CAST(SUM(ValorRecolhido) as DECIMAL(25,3)) as mediaValorRecolhido, CAST(SUM(ValorRecolhido)/SUM(QuantidadeComercializada) as DECIMAL(10,2)) as impostoPorLitro "
          "FROM arrecadacao WHERE UnidadeDeMedida = 'Litros' GROUP BY(Substancia) ORDER BY impostoPorLitro DESC").show(10)

spark.sql("SELECT Substancia, Cast(SUM(QUANTIDADECOMERCIALIZADA) as DECIMAL(25,3)) as SomaQuantidadeComercializada, "
          "CAST(SUM(ValorRecolhido) as DECIMAL(25,3)) as mediaValorRecolhido, CAST(SUM(ValorRecolhido)/SUM(QuantidadeComercializada) as DECIMAL(10,2)) as impostoPorGramas "
          "FROM arrecadacao WHERE UnidadeDeMedida = 'Gramas' GROUP BY(Substancia) ORDER BY impostoPorGramas DESC").show(10)

AnalysisException: 'Table or view not found: arrecadacao; line 1 pos 272'

### PIB

In [52]:
df_pib = spark.read.csv('gs://soulcode-mineracao/tratados/pib.csv', inferSchema=True, header=True, encoding='utf-8')

In [53]:
df_pib.createOrReplaceTempView('pib')

In [54]:
df_pib.columns

['ano',
 'id_municipio',
 'pib',
 'impostos_liquidos',
 'va',
 'va_agropecuaria',
 'va_industria',
 'va_servicos',
 'va_adespss']

In [55]:
spark.sql("SELECT ano, SUM(pib), SUM(impostos_liquidos), sum(va), sum(va_agropecuaria), sum(va_industria), sum(va_servicos), sum(va_adespss) FROM pib GROUP BY ano ORDER BY ano").show()

+----+-------------+----------------------+-------------+--------------------+-----------------+----------------+---------------+
| ano|     sum(pib)|sum(impostos_liquidos)|      sum(va)|sum(va_agropecuaria)|sum(va_industria)|sum(va_servicos)|sum(va_adespss)|
+----+-------------+----------------------+-------------+--------------------+-----------------+----------------+---------------+
|2002|1488787276039|          218572609012|1270214667015|         81515198966|     334907570007|    644403214045|   209388684020|
|2003|1717950386056|          247233152001|1470717233978|        105949164973|     396568543017|    732543684039|   235655841993|
|2004|1957751224028|          295769100994|1661982123059|        110912703012|     475863218000|    815527583034|   259678618984|
|2005|2170584502984|          327768754273|1842818402007|        100957547036|     524686244007|    922044704993|   295129905954|
|2006|2409449916019|          360159937992|2049289978009|        105294010981|     5672814

In [56]:
spark.sql("SELECT id_municipio, SUM(pib), SUM(impostos_liquidos), sum(va), sum(va_agropecuaria), sum(va_industria), sum(va_servicos), sum(va_adespss) FROM pib GROUP BY id_municipio ORDER BY SUM(impostos_liquidos) DESC").show()

+------------+-------------+----------------------+-------------+--------------------+-----------------+----------------+---------------+
|id_municipio|     sum(pib)|sum(impostos_liquidos)|      sum(va)|sum(va_agropecuaria)|sum(va_industria)|sum(va_servicos)|sum(va_adespss)|
+------------+-------------+----------------------+-------------+--------------------+-----------------+----------------+---------------+
|     3550308|7665059415725|         1393893909426|6271165506297|           358199113|     847621578137|   4952149715780|   471036013269|
|     3304557|3688917912360|          830073114061|2858844798301|           795436389|     455037435209|   1899649121127|   503362805574|
|     5300108|2454731764824|          330618608760|2124113156063|          8060086541|     127995157170|   1034273221201|   953784691149|
|     4106902| 976174788039|          181891684423| 794283103619|           113572013|     172746483400|    537040891916|    84382156290|
|     1302603| 824412695626|      

In [57]:
df_pib.printSchema()

root
 |-- ano: integer (nullable = true)
 |-- id_municipio: integer (nullable = true)
 |-- pib: long (nullable = true)
 |-- impostos_liquidos: long (nullable = true)
 |-- va: long (nullable = true)
 |-- va_agropecuaria: long (nullable = true)
 |-- va_industria: long (nullable = true)
 |-- va_servicos: long (nullable = true)
 |-- va_adespss: long (nullable = true)



### Barragens

In [83]:
df_barragens = spark.read.csv('gs://soulcode-mineracao/tratados/barragens.csv', inferSchema=True, header=True, encoding='utf-8')
dfp_barragens = df_barragens.toPandas()

In [84]:
dfp_barragens.head()

Unnamed: 0,Empreendedor,UF,Município,Categoria de Risco - CRI,Nível de Emergência,Tipo de Barragem de Mineração,Vida útil prevista da Barragem (anos),Estrutura com o Objetivo de Contenção,Minério principal presente no reservatório,N pessoas afetadas a jusante em caso de rompimento da barragem
0,MOSAIC FERTILIZANTES P&K LTDA.,SE,ROSÁRIO DO CATETE,Não se aplica,Sem emergência,Barragem/Barramento/Dique,34.0,Rejeitos,Sais,100.0
1,MINERACAO SAO FRANCISCO DE ASSIS LTDA,PA,SÃO FÉLIX DO XINGU,Não se aplica,Sem emergência,Barragem/Barramento/Dique,,Rejeitos,Areia,
2,MAGNESITA MINERACAO S.A. Filial: MAGNESITA MIN...,BA,SANTALUZ,Não se aplica,Sem emergência,Barragem/Barramento/Dique,14.0,Rejeitos,Cromita,875.0
3,MAGNESITA MINERACAO S.A. Filial: MAGNESITA MIN...,BA,SANTALUZ,Não se aplica,Sem emergência,Barragem/Barramento/Dique,14.0,Rejeitos,Cromita,1100.0
4,MAGNESITA MINERACAO S.A. Filial: MAGNESITA MIN...,BA,SANTALUZ,Não se aplica,Sem emergência,Barragem/Barramento/Dique,14.0,Rejeitos,Cromita,1125.0


In [127]:
df_barragens = df_barragens.withColumnRenamed('N pessoas afetadas a jusante em caso de rompimento da barragem', 'nAfetadosRompimento')\
                           .withColumnRenamed('Estrutura com o Objetivo de Contenção', 'estruturaContencao')\
                           .withColumnRenamed('Vida útil prevista da Barragem (anos)', 'vidaUtilAnos')\
                           .withColumnRenamed('Nível de Emergência', 'nivelEmergencia')\
                           .withColumnRenamed('Categoria de Risco - CRI', 'categoriaRiscoCRI')\
                           .withColumnRenamed('Município', 'municipio')

In [128]:
df_barragens.createOrReplaceTempView('barragens')

In [129]:
df_barragens.printSchema()

root
 |-- Empreendedor: string (nullable = true)
 |-- UF: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- categoriaRiscoCRI: string (nullable = true)
 |-- nivelEmergencia: string (nullable = true)
 |-- Tipo de Barragem de Mineração: string (nullable = true)
 |-- vidaUtilAnos: double (nullable = true)
 |-- estruturaContencao: string (nullable = true)
 |-- Minério principal presente no reservatório: string (nullable = true)
 |-- nAfetadosRompimento: double (nullable = true)



In [105]:
'''
Os 10 estados com maior número de habitantes com risco de serem afetados por rompimento de barragens
'''

spark.sql("SELECT UF, CAST(SUM(nAfetadosRompimento) as DECIMAL(30,2)) as somaAfetadosRompimento FROM barragens GROUP BY UF ORDER BY somaAfetadosRompimento DESC ").show(10)

+---+----------------------+
| UF|somaAfetadosRompimento|
+---+----------------------+
| MG|         1637078647.27|
| PA|          208944531.94|
| GO|           82920100.00|
| BA|           44544955.00|
| RO|           35757479.56|
| SP|            9736054.56|
| MT|            8293207.49|
| AP|            1870800.00|
| MS|            1333226.40|
| TO|              39000.00|
+---+----------------------+
only showing top 10 rows



In [90]:
'''
Verificando a vida média da vida útil das barragens por estado
'''

spark.sql("SELECT UF, CAST(AVG(vidaUtilAnos) as decimal(20,2)) as mediaVidaUtilAnos FROM barragens GROUP BY UF ORDER BY mediaVidaUtilAnos").show()

+---+-----------------+
| UF|mediaVidaUtilAnos|
+---+-----------------+
| PI|             null|
| PB|             null|
| RO|             4.57|
| MA|             5.33|
| BA|             5.96|
| RS|             6.60|
| SC|            12.46|
| AM|            12.67|
| SP|            13.04|
| AL|            14.00|
| AP|            14.31|
| MT|            14.98|
| GO|            15.43|
| MS|            17.56|
| PA|            18.40|
| TO|            22.17|
| MG|            22.67|
| PR|            24.67|
| SE|            32.33|
| RJ|            34.00|
+---+-----------------+



In [121]:
spark.sql("SELECT DISTINCT nivelEmergencia FROM barragens").show()

spark.sql("SELECT DISTINCT categoriaRiscoCRI FROM barragens").show()


+------------------+
|   nivelEmergencia|
+------------------+
|    Sem emergência|
|Emergência Nivel 3|
|Emergência Nivel 2|
|Emergência Nivel 1|
+------------------+

+-----------------+
|categoriaRiscoCRI|
+-----------------+
|             Alta|
|            Média|
|    Não se aplica|
|            Baixa|
+-----------------+



In [107]:
spark.sql("SELECT UF, COUNT(nivelEmergencia) as numBarragensSemEmergencia, CAST(SUM(nAfetadosRompimento) as DECIMAL(30,2)) FROM barragens WHERE nivelEmergencia = 'Sem emergência' GROUP BY UF ORDER BY numBarragensSemEmergencia DESC ").show()

spark.sql("SELECT UF, COUNT(nivelEmergencia) as numBarragensEmergencia3, CAST(SUM(nAfetadosRompimento) as DECIMAL(30,2)) FROM barragens WHERE nivelEmergencia = 'Emergência Nivel 3' GROUP BY UF ORDER BY numBarragensEmergencia3 DESC ").show()

spark.sql("SELECT UF, COUNT(nivelEmergencia) as numBarragensEmergencia2, CAST(SUM(nAfetadosRompimento) as DECIMAL(30,2)) FROM barragens WHERE nivelEmergencia = 'Emergência Nivel 2' GROUP BY UF ORDER BY numBarragensEmergencia2 DESC ").show()

spark.sql("SELECT UF, COUNT(nivelEmergencia) as numBarragensEmergencia1, CAST(SUM(nAfetadosRompimento) as DECIMAL(30,2)) FROM barragens WHERE nivelEmergencia = 'Emergência Nivel 1' GROUP BY UF ORDER BY numBarragensEmergencia1 DESC ").show()

+---+-------------------------+-----------------------------------------------+
| UF|numBarragensSemEmergencia|CAST(sum(nAfetadosRompimento) AS DECIMAL(30,2))|
+---+-------------------------+-----------------------------------------------+
| MG|                      314|                                  1530238706.12|
| MT|                      141|                                     6193207.49|
| PA|                      113|                                   208876031.94|
| BA|                       81|                                    44544955.00|
| SP|                       67|                                     9736054.56|
| RO|                       36|                                    35757479.56|
| GO|                       22|                                    82920100.00|
| MS|                       18|                                     1333226.40|
| AP|                       17|                                     1870800.00|
| AM|                       15|         

In [134]:
spark.sql("SELECT UF, municipio, Empreendedor, categoriaRiscoCRI, nivelEmergencia FROM barragens WHERE nivelEmergencia <> 'Sem emergência' or (categoriaRiscoCRI = 'Alta')").show(100)

+---+--------------------+--------------------+-----------------+------------------+
| UF|           municipio|        Empreendedor|categoriaRiscoCRI|   nivelEmergencia|
+---+--------------------+--------------------+-----------------+------------------+
| MG|           RIO ACIMA|Massa Falida de M...|             Alta|Emergência Nivel 1|
| MG|           RIO ACIMA|Massa Falida de M...|             Alta|Emergência Nivel 1|
| AP|PEDRA BRANCA DO A...|DEV MINERACAO S.A...|             Alta|Emergência Nivel 1|
| MT|              POCONÉ|   NORMA ARGES OLIVA|             Alta|Emergência Nivel 1|
| MT|NOSSA SENHORA DO ...|     SERGIO DA SILVA|             Alta|Emergência Nivel 1|
| MT|              POCONÉ|João de Pinho Nov...|             Alta|    Sem emergência|
| MT|              POCONÉ|Ulisses José Dorileo|             Alta|    Sem emergência|
| MG|           NOVA LIMA|           VALE S.A.|             Alta|Emergência Nivel 1|
| MG|     BARÃO DE COCAIS|           VALE S.A.|             Alta|

In [140]:
'''
Número de barragens em risco e de afetados pelo rompimento por município
'''

spark.sql("SELECT UF, municipio, COUNT(municipio) as numBarragensRisco, CAST(SUM(nAfetadosRompimento) as DECIMAL(30,2)) as somaAfetadosRompimento FROM barragens WHERE nivelEmergencia <> 'Sem emergência' or categoriaRiscoCRI = 'Alta' GROUP BY uf, municipio ORDER BY somaAfetadosRompimento DESC").show(100)

+---+--------------------+-----------------+----------------------+
| UF|           municipio|numBarragensRisco|somaAfetadosRompimento|
+---+--------------------+-----------------+----------------------+
| MG|          OURO PRETO|                8|           61805641.44|
| MG|             MARIANA|                3|           23501100.00|
| MG|           NOVA LIMA|                9|           13620258.00|
| MG|     BARÃO DE COCAIS|                3|            7178241.71|
| MT|              POCONÉ|                3|            2383500.00|
| MT|NOSSA SENHORA DO ...|                2|            2100000.00|
| MG|          BRUMADINHO|                2|             729000.00|
| PA|              MARABÁ|                1|              68500.00|
| MG|             ITABIRA|                3|               5100.00|
| MG|           BELO VALE|                1|                500.00|
| MG|         CATAS ALTAS|                1|                100.00|
| MG|           ITABIRITO|                1|    

### Dados_populacao

In [141]:
df_dados_populacao = spark.read.csv('gs://soulcode-mineracao/tratados/dados_populacao.csv', inferSchema=True, header=True, encoding='utf-8')
dfp_dados_populacao = df_dados_populacao.toPandas()

In [None]:
df_dados_populacao.createOrReplaceTempView('dados_populacao')