In [22]:
!pip install pandas openpyxl xlrd gcsfs fsspec

Defaulting to user installation because normal site-packages is not writeable


In [23]:
private_key = "ornate-spring-379820-00ec27d088ba.json"
project_id = 'ornate-spring-379820'

In [24]:
gcs_bucket = "bq-bucketgabrielcordeiro-1"
gcs_folder = "raw_data"
file_name = "01_Pib_Municipios_tabelas_completas.xlsx/01_Pib_Municipios_tabelas_completas.xlsx"
gcs_path = f"gs://{gcs_bucket}/{gcs_folder}/{file_name}"

In [25]:
from pyspark.sql import SparkSession
import pandas as pd
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--driver-memory 2g pyspark-shell'
pd.set_option('display.max_rows', None)

spark = SparkSession.builder.appName('MyPySparkJob').getOrCreate()

In [26]:
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile",private_key)

 ## Dataset 1

In [27]:
gcs_bucket = "bq-bucketgabrielcordeiro-1"
gcs_folder = "raw_data"
filename = "01_Pib_Municipios_tabelas_completas.xlsx/01_Pib_Municipios_tabelas_completas.xlsx"

In [28]:
df = pd.read_excel(f'gs://{gcs_bucket}/{gcs_folder}/{filename}',
                   engine="openpyxl", 
                   sheet_name="Tabela 2", 
                   storage_options={"token": private_key})

### Tabela 2 - Top 30 Maiores Municipios por PIB

#### Functions

In [29]:
# Transformar a primeira linha de cada Dataframe no Header
def firstLineToHeader(df):
    new_header = df.iloc[0]
    df = df[1:]
    df.columns = new_header
    df.columns = df.columns.str.replace('\n', ' ')
    df.columns = df.columns.str.replace('- ', '')
    df = df.reset_index(drop=True)
    return df

# Resolve o Problema de colunas com nomes duplicados
def differDuplicatedNameColumns(df):
    cols=pd.Series(df.columns)
    for dup in df.columns[df.columns.duplicated(keep=False)]: 
        cols[df.columns.get_loc(dup)] = ([dup + '.' + str(d_idx) if d_idx != 0 else dup 
                                        for d_idx in range(df.columns.get_loc(dup).sum())])
    df.columns=cols
    return df.columns

def extractAndAppendUF(df):
    df[['Municípios e respectivas Unidades da Federação', 'UF']] = df['Municípios e respectivas Unidades da Federação']\
        .str.extract('^(.*?) \((.*?)\)$', expand=True)
    return df

#### Transformation

In [30]:
# Limpar multi Index
df = df.dropna(how='any')
df = df.reset_index(drop=True)

# Dividir a tabela em Dataframes por região
tabela2_norte_maior = df.iloc[0:31].reset_index(drop=True)
tabela2_nordeste_maior = df.iloc[31:63].reset_index(drop=True)
tabela2_sudeste_maior = df.iloc[63:94].reset_index(drop=True)
tabela2_sul_maior = df.iloc[94:125].reset_index(drop=True)
tabela2_centroeste_maior = df.iloc[125:156].reset_index(drop=True)

# Atribuindo Header e Extraindo Sigla do Estado
tabela2_norte_maior = firstLineToHeader(tabela2_norte_maior)
tabela2_norte_maior = extractAndAppendUF(tabela2_norte_maior)
tabela2_norte_maior['regiao'] = 'Norte'

tabela2_nordeste_maior = firstLineToHeader(tabela2_nordeste_maior)
tabela2_nordeste_maior = tabela2_nordeste_maior.drop(0).reset_index(drop=True) # Limpando Linha Extra contendo "Nordeste"
tabela2_nordeste_maior = extractAndAppendUF(tabela2_nordeste_maior)
tabela2_nordeste_maior['regiao'] = 'Nordeste'

tabela2_sudeste_maior = firstLineToHeader(tabela2_sudeste_maior)
tabela2_sudeste_maior = extractAndAppendUF(tabela2_sudeste_maior)
tabela2_sudeste_maior['regiao'] = 'Sudeste'

tabela2_sul_maior = firstLineToHeader(tabela2_sul_maior)
tabela2_sul_maior = extractAndAppendUF(tabela2_sul_maior)
tabela2_sul_maior['regiao'] = 'Sul'

tabela2_centroeste_maior = firstLineToHeader(tabela2_centroeste_maior)
tabela2_centroeste_maior = extractAndAppendUF(tabela2_centroeste_maior)
tabela2_centroeste_maior['regiao'] = 'Centro-Oeste'

In [31]:
spark_tabela2_norte_maior = spark.createDataFrame(tabela2_norte_maior)
spark_tabela2_nordeste_maior = spark.createDataFrame(tabela2_nordeste_maior)
spark_tabela2_sudeste_maior = spark.createDataFrame(tabela2_sudeste_maior)
spark_tabela2_sul_maior = spark.createDataFrame(tabela2_sul_maior)
spark_tabela2_centroeste_maior = spark.createDataFrame(tabela2_centroeste_maior)



In [32]:
from pyspark.sql.functions import lit

# adiciona uma coluna 'source' para identificar a origem de cada dataframe
df1 = spark_tabela2_norte_maior.withColumn('source', lit('df1'))
df2 = spark_tabela2_nordeste_maior.withColumn('source', lit('df2'))
df3 = spark_tabela2_sudeste_maior.withColumn('source', lit('df3'))
df4 = spark_tabela2_sul_maior.withColumn('source', lit('df4'))
df5 = spark_tabela2_centroeste_maior.withColumn('source', lit('df5'))

# Unir os dataframes
tabela2 = df1.union(df2).union(df3).union(df4).union(df5).drop('source')

new_header = ['nome_municipio', 'posicao_30_maiores_municipios', 'pib_concorrente', 
              'participacao_porcentagem_municipios_grande_regiao', 
              'participacao_acumulada_municípios_grande_regiao', 'UF', 'regiao']

# Renomear colunas dataframe
for i in range(len(new_header)):
    old_name = tabela2.columns[i]
    new_name = new_header[i]
    tabela2 = tabela2.withColumnRenamed(old_name, new_name)

### Tabela 9 - Participação dos top5 municipios no PIB por estado

In [33]:
df = pd.read_excel(f'gs://{gcs_bucket}/{gcs_folder}/{filename}',
                   engine="openpyxl", 
                   sheet_name="Tabela 9", 
                   storage_options={"token": private_key})

#### functions & dict

In [34]:
def addDashInNumberHeaders(df):
    new_columns = []
    for column_name in df.columns:
        if column_name.isnumeric():
            new_columns.append('_' + column_name)
        else:
            new_columns.append(column_name)
    return new_columns

def extrair_sigla_estado(valor):
    partes = valor.split(" (")
    nome_estado = partes[0]
    return uf_dict.get(nome_estado)

def extrair_estado(valor):
    partes = valor.split(" (")
    nome_estado = partes[0]
    return nome_estado

uf_dict = {'Acre': 'AC', 'Alagoas': 'AL', 'Amapá': 'AP', 'Amazonas': 'AM', 'Bahia': 'BA',
           'Ceará': 'CE', 'Distrito Federal': 'DF', 'Espírito Santo': 'ES', 'Goiás': 'GO',
           'Maranhão': 'MA', 'Mato Grosso': 'MT', 'Mato Grosso do Sul': 'MS', 'Minas Gerais': 'MG',
           'Pará': 'PA', 'Paraíba': 'PB', 'Paraná': 'PR', 'Pernambuco': 'PE', 'Piauí': 'PI',
           'Rio de Janeiro': 'RJ', 'Rio Grande do Norte': 'RN', 'Rio Grande do Sul': 'RS',
           'Rondônia': 'RO', 'Roraima': 'RR', 'Santa Catarina': 'SC', 'São Paulo': 'SP',
           'Sergipe': 'SE', 'Tocantins': 'TO'}

#### transformation

In [35]:
new_header = df.iloc[4]
df = df[4:]
df = df.reset_index(drop=True)
df.columns = new_header
df = df[1:28]

In [36]:
# Tentar converter os headers para Inteiro e remover os "/n"
for coluna in df.columns:
    try:
        df.rename(columns={coluna: int(coluna)}, inplace=True)
    except ValueError:
        pass
    try:
        df.rename(columns={coluna: coluna.replace('\n', '')}, inplace=True)
    except:
        pass

df.columns = df.columns.fillna('x')
df = df.rename_axis(index=None, columns=None)
df = df.reset_index(drop=True)

In [37]:
df.columns = differDuplicatedNameColumns(df)

df.columns = df.columns.astype(str)
header_19 = 'Participação no número de municípios da Unidade da Federação 2019 (%)'
header_20 = 'Participação no total da população da Unidade da Federação 2019 (%)'

df = df.rename(columns={'x': 'estado', 'x.1': header_19, 'x.2': header_20, '2019(2)': '2019'})
df = df.iloc[:, :-2] # remove as duas ultimas colunas

df["UF"] = df["estado"].apply(extrair_sigla_estado) # Cria a Coluna UF
df["estado"] = df["estado"].apply(extrair_estado)
df.columns = addDashInNumberHeaders(df)
#df.columns = df.columns.astype(str)

tabela9 = df.copy()

## Dataset 2

In [38]:
filename = '02_Recursos_para_a_gestao_20210817/Tab_8.xls'

df = pd.read_excel(f'gs://{gcs_bucket}/{gcs_folder}/{filename}',
                   storage_options={"token": private_key})

### functions & dict

In [39]:
def differDuplicatedNameColumns(df):
    cols=pd.Series(df.columns)
    for dup in df.columns[df.columns.duplicated(keep=False)]: 
        cols[df.columns.get_loc(dup)] = ([dup + '.' + str(d_idx) if d_idx != 0 else dup 
                                        for d_idx in range(df.columns.get_loc(dup).sum())])
    df.columns=cols
    return df.columns

In [40]:
uf_dict = {'Acre': 'AC', 'Alagoas': 'AL', 'Amapá': 'AP', 'Amazonas': 'AM', 'Bahia': 'BA',
           'Ceará': 'CE', 'Distrito Federal': 'DF', 'Espírito Santo': 'ES', 'Goiás': 'GO',
           'Maranhão': 'MA', 'Mato Grosso': 'MT', 'Mato Grosso do Sul': 'MS', 'Minas Gerais': 'MG',
           'Pará': 'PA', 'Paraíba': 'PB', 'Paraná': 'PR', 'Pernambuco': 'PE', 'Piauí': 'PI',
           'Rio de Janeiro': 'RJ', 'Rio Grande do Norte': 'RN', 'Rio Grande do Sul': 'RS',
           'Rondônia': 'RO', 'Roraima': 'RR', 'Santa Catarina': 'SC', 'São Paulo': 'SP',
           'Sergipe': 'SE', 'Tocantins': 'TO'}

new_header = ['nome_estado', 'total_municipios_pagam_tributacao', 'tributo_iluminacao_publica', 
              'tributo_coleta_de_lixo', 'tributo_incendio', 'tributo_limpeza_publica', 
              'poder_da_policia', 'outros_tributos', 'nao_cobra_tributo']

### transformation

In [41]:
df = df[6:-2] # Seleciona area util do Dataframe
df.columns = differDuplicatedNameColumns(df)
df = df.rename(columns=dict(zip(df.columns, new_header))) # Renomeia o Header com new_header

In [42]:
mask = df['nome_estado'].isin(['Norte', 'Nordeste', 'Sudeste', 'Sul', 'Centro-Oeste'])
df = df[~mask]

df = df.reset_index(drop=True)
df = df.dropna()

df['UF'] = df['nome_estado'].map(uf_dict)

tabela8 = df.copy()

## Dataset 3

In [43]:
filename = '03_Cadastro_de_Empresas/tabelas_xlsx/Unidades_Locais/Tabela 12.xlsx'

df = pd.read_excel(f'gs://{gcs_bucket}/{gcs_folder}/{filename}',
                   storage_options={"token": private_key})

### functions & dict

In [44]:
def extrair_sigla_estado(valor):
    # Divide o valor em duas partes, separadas por " - "
    partes = valor.split(" - ")
    # A segunda parte contém a sigla do estado
    sigla_estado = partes[1]
    # Retorna a sigla do estado
    return sigla_estado

def extrair_municipio(valor):
    partes = valor.split(" - ")
    municipio = partes[0]
    return municipio

new_header = ['nome_municipio', 'classificacao_atividade', 'numero_unidades_locais', 'pessoal_ocupado_total',
              'pessoal_ocupado_assalariado','pessoal_assalariado_medio', 'salarios_e_outras_remuneracoes', 
              'Salario_medio_mensal_minimos', 'salario_medio_mensal_valor']

### transformation

In [45]:
df = df[5:-7] # Selecionar area util do dataframe
df = df.replace({'X': 0, 'x': 0, '-': 0}) # Limpar valores invalidos
df = df.rename(columns=dict(zip(df.columns, new_header))) # Renomeia o Header com new_header
df = df.dropna()

In [46]:
df['UF'] = df['nome_municipio'].apply(extrair_sigla_estado) # Cria a Coluna UF
df['nome_municipio'] = df['nome_municipio'].apply(extrair_municipio) # Permanece o municipio
df = df.reset_index(drop=True)

tabela12 = df.copy()

## Convert to Parquet

In [47]:
tabela8_spark = spark.createDataFrame(tabela8)
tabela12_spark = spark.createDataFrame(tabela12)



In [48]:
gcs_folder = 'transformed_data'
gcs_bucket = "bq-bucketgabrielcordeiro-1"

tabela2.write.format("parquet").mode("overwrite").save(f'gs://{gcs_bucket}/{gcs_folder}/01_tabela2.parquet')
tabela8_spark.write.mode('overwrite').parquet(f'gs://{gcs_bucket}/{gcs_folder}/02_tabela8.parquet')
tabela12_spark.write.mode('overwrite').parquet(f'gs://{gcs_bucket}/{gcs_folder}/03_tabela12.parquet')

                                                                                

In [49]:
tabela2.show()

+-----------------+-----------------------------+--------------------+-------------------------------------------------+-----------------------------------------------+---+------+
|   nome_municipio|posicao_30_maiores_municipios|     pib_concorrente|participacao_porcentagem_municipios_grande_regiao|participacao_acumulada_municípios_grande_regiao| UF|regiao|
+-----------------+-----------------------------+--------------------+-------------------------------------------------+-----------------------------------------------+---+------+
|           Manaus|                           1º| 8.486742337470977E7|                               20.186138650270156|                             20.186138650270156| AM| Norte|
|            Belém|                           2º|3.2405322901894543E7|                                7.707767186665282|                              27.89390583693544| PA| Norte|
|      Parauapebas|                           3º| 2.303584688047918E7|                              