In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, mean, stddev

In [3]:
MAX_MEMORY = "5g"

spark = SparkSession \
    .builder \
    .appName("Projeto 1 BigData") \
    .config("spark.executor.memory", MAX_MEMORY) \
    .config("spark.driver.memory", MAX_MEMORY) \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/26 23:22:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
raw_df = spark.read.csv("./dados_caged_2022.csv", header=True, sep=",")

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [5]:
raw_df.show(5)

+----+---+---+------------+------------+----------------+------------------+--------+---------+-----------------+-----+-----------------+--------+----+---------------+--------------------+-----------------+----------------+-------------------------------+--------------------------+--------------+-------------------------------+------------------+-----------------+--------------------+
| ano|mes| uf|id_municipio|cnae_2_secao|cnae_2_subclasse|saldo_movimentacao|cbo_2002|categoria|grau_de_instrucao|idade|horas_contratuais|raca_cor|sexo|tipo_empregador|tipo_estabelecimento|tipo_movimentacao|tipo_deficiencia|indicador_trabalho_intermitente|indicador_trabalho_parcial|salario_mensal|tamanho_estabelecimento_janeiro|indicador_aprendiz|origem_informacao|indicador_fora_prazo|
+----+---+---+------------+------------+----------------+------------------+--------+---------+-----------------+-----+-----------------+--------+----+---------------+--------------------+-----------------+----------------+-

# Parsing Data

In [6]:
# Definindo os mapas de parsing
sexo_parse_map = {'1': 'HOMEM', '3': 'MULHER', '9': 'NAO_IDENTIFICADO'}
tipo_empregador_parse_map = {'0': 'CNPJ', '2': 'CPF', '9': 'NAO_IDENTIFICADO'}
saldo_mov_parse_map = {'1': 'ADMITIDO', '-1': 'DEMITIDO'}

# Função para aplicar parsing nos DataFrames
def parse_dataframe(df):
    df = df.drop('id_municipio', 'cbo_2002', 'categoria', 'raca_cor', 'tipo_movimentacao', 
                 'tipo_deficiencia', 'indicador_trabalho_intermitente', 'indicador_trabalho_parcial', 
                 'tamanho_estabelecimento_janeiro', 'indicador_aprendiz', 'origem_informacao', 
                 'indicador_fora_prazo', 'tipo_estabelecimento')

    df = df.withColumn('sexo', when(col('sexo') == '1', 'HOMEM')
                                .when(col('sexo') == '3', 'MULHER')
                                .when(col('sexo') == '9', 'NAO_IDENTIFICADO'))
    
    df = df.withColumn('tipo_empregador', when(col('tipo_empregador') == '0', 'CNPJ')
                                         .when(col('tipo_empregador') == '2', 'CPF')
                                         .when(col('tipo_empregador') == '9', 'NAO_IDENTIFICADO'))

    df = df.withColumn('saldo_movimentacao', when(col('saldo_movimentacao') == '1', 'ADMITIDO')
                                             .when(col('saldo_movimentacao') == '-1', 'DEMITIDO'))

    return df

# Definindo o arquivo de entrada e saída
output_filename = './dados_caged_2022_parsed.csv'

# Aplicando a função de parsing
parsed_df = parse_dataframe(raw_df)

In [7]:
parsed_df.show(5)

+----+---+---+------------+----------------+------------------+-----------------+-----+-----------------+-----+---------------+--------------+
| ano|mes| uf|cnae_2_secao|cnae_2_subclasse|saldo_movimentacao|grau_de_instrucao|idade|horas_contratuais| sexo|tipo_empregador|salario_mensal|
+----+---+---+------------+----------------+------------------+-----------------+-----+-----------------+-----+---------------+--------------+
|2023|  4| SP|           H|         4930202|          DEMITIDO|                6|   37|             44.0|HOMEM|           CNPJ|       2060.89|
|2023|  4| SP|           H|         4930202|          DEMITIDO|                7|   28|             44.0|HOMEM|           CNPJ|       2428.16|
|2023|  4| SP|           H|         4930202|          DEMITIDO|                7|   24|             44.0|HOMEM|           CNPJ|        1898.0|
|2023|  4| SP|           H|         4930202|          DEMITIDO|                7|   27|             44.0|HOMEM|           CNPJ|       2361.87|

# Removing Outliers

In [8]:
# Função para calcular Z-score e remover outliers
def remove_outliers(df, column):
    # Calculando a média e desvio padrão
    stats = df.select(mean(col(column)).alias('mean'), stddev(col(column)).alias('stddev')).collect()[0]
    mean_value = stats['mean']
    stddev_value = stats['stddev']

    # Adicionando a coluna z_score
    df = df.withColumn('z_' + column, (col(column) - mean_value) / stddev_value)

    # Filtrando os outliers
    df = df.filter((col('z_' + column) <= 3) | col(column).isNull())

    return df

parsed_df = remove_outliers(parsed_df, 'salario_mensal')

                                                                                

In [9]:
parsed_df.drop('z_salario_mensal')

# Salvando o DataFrame transformado em um novo CSV
parsed_df.write.csv(output_filename, header=True, mode='overwrite')

                                                                                

In [10]:
cb_df = spark.read.csv('./cesta_basica_2022.csv', header=True, sep=',') 

cb_df.show(5)

+-----+-------+------------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+----+
|Sigla| Código|   Município|2022.01|2022.02|2022.03|2022.04|2022.05|2022.06|2022.07|2022.08|2022.09|2022.10|2022.11|2022.12|_c15|
+-----+-------+------------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+----+
|   AC|1200013|  Acrelândia|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|NULL|
|   AC|1200054|Assis Brasil|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|NULL|
|   AC|1200104|   Brasiléia|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|NULL|
|   AC|1200138|      Bujari|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|NULL|
|   AC|1200179|    Capixaba|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NULL|   NUL

24/05/26 23:25:16 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Sigla, Código, Município, 2022.01, 2022.02, 2022.03, 2022.04, 2022.05, 2022.06, 2022.07, 2022.08, 2022.09, 2022.10, 2022.11, 2022.12, 
 Schema: Sigla, Código, Município, 2022.01, 2022.02, 2022.03, 2022.04, 2022.05, 2022.06, 2022.07, 2022.08, 2022.09, 2022.10, 2022.11, 2022.12, _c15
Expected: _c15 but found: 
CSV file: file:///home/hellhat/Documents/facul/BigData/cesta_basica_2022.csv


In [11]:
cb_df = cb_df.drop('_c15')
cb_df = cb_df.filter(cb_df["`2022.01`"].isNotNull())

In [12]:
monthly_columns = [f"`2022.{str(i).zfill(2)}`" for i in range(1, 13)]
monthly_columns_no_parsing = [f"2022.{str(i).zfill(2)}" for i in range(1, 13)]


# Calculate the mean of the monthly columns
cb_df = cb_df.withColumn("media_cesta_basica", sum(col(c) for c in monthly_columns) / len(monthly_columns))

# Drop the original monthly columns
cb_df = cb_df.drop(*monthly_columns_no_parsing)

In [13]:
cb_df = cb_df.drop('Código', 'Município')
cb_df.show(30)
cb_df.write.csv('parsed_cesta_basica.csv', header=True, mode='overwrite')

+-----+------------------+
|Sigla|media_cesta_basica|
+-----+------------------+
|   BA| 566.3316666666667|
|   CE|             631.8|
|   DF| 699.4575000000001|
|   ES| 701.6608333333334|
|   GO|          668.6175|
|   MG|          661.0075|
|   MS| 713.1366666666667|
|   PA| 613.7741666666667|
|   PB|            563.41|
|   PE| 576.2008333333334|
|   PR| 691.9408333333332|
|   RJ|             730.0|
|   RN| 579.3108333333332|
|   RS|          747.3175|
|   SC| 751.2283333333334|
|   SE| 529.0558333333335|
|   SP| 762.2308333333331|
+-----+------------------+



In [15]:
full_df = parsed_df.join(cb_df, on=parsed_df['uf'] == cb_df['Sigla'], how='left')
full_df.show(5)

+----+---+---+------------+----------------+------------------+-----------------+-----+-----------------+-----+---------------+--------------+--------------------+-----+------------------+
| ano|mes| uf|cnae_2_secao|cnae_2_subclasse|saldo_movimentacao|grau_de_instrucao|idade|horas_contratuais| sexo|tipo_empregador|salario_mensal|    z_salario_mensal|Sigla|media_cesta_basica|
+----+---+---+------------+----------------+------------------+-----------------+-----+-----------------+-----+---------------+--------------+--------------------+-----+------------------+
|2023|  4| SP|           H|         4930202|          DEMITIDO|                6|   37|             44.0|HOMEM|           CNPJ|       2060.89|-4.35810612401561...|   SP| 762.2308333333331|
|2023|  4| SP|           H|         4930202|          DEMITIDO|                7|   28|             44.0|HOMEM|           CNPJ|       2428.16|-3.97319863508181...|   SP| 762.2308333333331|
|2023|  4| SP|           H|         4930202|          D

In [17]:
full_df = full_df.drop('Sigla')

# Playing with the dataset

In [22]:
from pyspark.sql.functions import avg


# Calcular a média salarial por estado (UF)
media_salarial_por_estado = full_df.groupBy("uf") \
    .agg(avg("salario_mensal").alias("media_salarial")) \
    .orderBy("media_salarial", ascending=False)

# Mostrar os resultados
media_salarial_por_estado.show(30)



+---+------------------+
| uf|    media_salarial|
+---+------------------+
| SP|2923.3711841226173|
| PI|2807.2746384990223|
| AL|2641.3364385080627|
| CE|2515.6744243826147|
| MA|2373.9459337012436|
| RJ|2317.5292203898243|
| AM|2293.5330043869867|
| RN|2291.5329806441164|
| SE|2224.3690922307865|
| DF|2068.0411921843065|
| SC| 2055.297253757904|
| RS|2046.8056555505402|
| MT| 2029.660055532179|
| PB|2019.5937423265405|
| PA|1989.9101474046513|
| MG|1974.3258626969548|
| PR| 1963.952018026921|
| PE|1950.8765826423116|
| ES|1871.8810979755474|
| TO|1835.5229983862853|
| BA|1832.1725148991773|
| MS|1818.1947309079478|
| RR|1774.1887298324473|
| GO|1771.6566836370903|
| RO| 1744.896178846981|
| AP|1711.5313078735744|
| AC|1642.9652361871943|
+---+------------------+



                                                                                

In [24]:
# Calcular a média salarial por seção CNAE
media_salarial_por_cnae = full_df.groupBy("cnae_2_secao").agg(avg("salario_mensal").alias("media_salarial"))

# Ordenar pela média salarial de forma decrescente e pegar a seção CNAE com maior salário
maior_salario_secao_cnae = media_salarial_por_cnae.orderBy("media_salarial", ascending=False)

maior_salario_secao_cnae.show(20)

# Mostrar o resultado
print(f"Seção CNAE com maior salário: {maior_salario_secao_cnae['cnae_2_secao']} com média salarial de {maior_salario_secao_cnae['media_salarial']}")



+------------+------------------+
|cnae_2_secao|    media_salarial|
+------------+------------------+
|           U| 6447.229938009049|
|           D|  4621.93107799412|
|           K| 4191.072622292891|
|           O| 3864.180997402517|
|           J|  3657.15807580429|
|           B| 3532.379130789599|
|           P| 2981.962419645588|
|           R| 2937.194098428671|
|           C| 2853.465672203824|
|           M| 2693.738254587626|
|           F|2649.9833073296613|
|           A|2335.2600468079977|
|           Q| 2308.726861584374|
|           E|2238.4984828087922|
|           S| 2211.294496907875|
|           H|2193.0501905286446|
|           Z|2171.2659045454548|
|           L| 2106.570432684665|
|           N|1900.1737681577804|
|           G|1829.1474373848505|
+------------+------------------+
only showing top 20 rows

Seção CNAE com maior salário: Column<'cnae_2_secao'> com média salarial de Column<'media_salarial'>


