# Importação de bibliotecas

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Criando SparkSession

In [6]:
spark = (
    SparkSession.builder
      .master('local')
      .appName('ES')
      .config('spark.ui.port','4050')
      .getOrCreate()
)

In [7]:
spark

21/11/23 18:22:27 WARN org.apache.spark.scheduler.cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources


# Extração dos dados do Espírito Santo

In [16]:
df_ES = spark.read.parquet('gs://projeto_final_2021/pandas_to_parquet/ES_parquet/ES_completo.snappy')

In [17]:
#imprimindo o  esquema
df_ES.printSchema()

root
 |-- Matricula: string (nullable = true)
 |-- ServidorNome: string (nullable = true)
 |-- ServidorCpf: string (nullable = true)
 |-- QuadroNome: string (nullable = true)
 |-- LotacaoNome: string (nullable = true)
 |-- DataAdmissao: timestamp (nullable = true)
 |-- CargaHoraria: short (nullable = true)
 |-- PlanoCargos: string (nullable = true)
 |-- Mes: byte (nullable = true)
 |-- Ano: short (nullable = true)
 |-- Rubrica: string (nullable = true)
 |-- Valor: float (nullable = true)
 |-- Tipo: string (nullable = true)
 |-- Estado: string (nullable = true)



In [18]:
df_ES.show(4)

+-----------+--------------------+--------------+----------------+--------------------+-------------------+------------+-----------+---+----+--------------------+------+----+------+
|  Matricula|        ServidorNome|   ServidorCpf|      QuadroNome|         LotacaoNome|       DataAdmissao|CargaHoraria|PlanoCargos|Mes| Ano|             Rubrica| Valor|Tipo|Estado|
+-----------+--------------------+--------------+----------------+--------------------+-------------------+------------+-----------+---+----+--------------------+------+----+------+
|95893/61781|Adelimar C. Caeta...|***.*81.927-**|ACT - Temporário|Secretaria M. de ...|2019-05-07 00:00:00|         125|     PROF V|  1|2020|                INSS| 252.0|   D|    ES|
|95893/61781|Adelimar C. Caeta...|***.*81.927-**|ACT - Temporário|Secretaria M. de ...|2019-05-07 00:00:00|         125|     PROF V|  1|2020|  AUXÍLIO-TRANSPORTE| 64.04|   V|    ES|
|95893/61781|Adelimar C. Caeta...|***.*81.927-**|ACT - Temporário|Secretaria M. de ...|201

# Alteração de colunas e ordenação

In [19]:
df_ES.schema.names

['Matricula',
 'ServidorNome',
 'ServidorCpf',
 'QuadroNome',
 'LotacaoNome',
 'DataAdmissao',
 'CargaHoraria',
 'PlanoCargos',
 'Mes',
 'Ano',
 'Rubrica',
 'Valor',
 'Tipo',
 'Estado']

In [20]:
#Padronização dos dados para futura concatenação
df_ES = (df_ES.withColumn("QuadroNome",
                             when(col("QuadroNome") == "Aposentadoria", "INATIVO")
                             .when(col("QuadroNome") == "Inat/Pens Moléstia Grave", "INATIVO")
                             .when(col("QuadroNome") == "Pensao", "INATIVO")
                             .otherwise('ATIVO'))
        )

In [21]:
df_ES.filter(col('QuadroNome')== 'ATIVO').show()

+------------+--------------------+--------------+----------+--------------------+-------------------+------------+--------------------+---+----+--------------------+-------+----+------+
|   Matricula|        ServidorNome|   ServidorCpf|QuadroNome|         LotacaoNome|       DataAdmissao|CargaHoraria|         PlanoCargos|Mes| Ano|             Rubrica|  Valor|Tipo|Estado|
+------------+--------------------+--------------+----------+--------------------+-------------------+------------+--------------------+---+----+--------------------+-------+----+------+
| 95893/61781|Adelimar C. Caeta...|***.*81.927-**|     ATIVO|Secretaria M. de ...|2019-05-07 00:00:00|         125|              PROF V|  1|2020|                INSS|  252.0|   D|    ES|
| 95893/61781|Adelimar C. Caeta...|***.*81.927-**|     ATIVO|Secretaria M. de ...|2019-05-07 00:00:00|         125|              PROF V|  1|2020|  AUXÍLIO-TRANSPORTE|  64.04|   V|    ES|
| 95893/61781|Adelimar C. Caeta...|***.*81.927-**|     ATIVO|Secr

In [22]:
#filtrando os valores pelo salário Liquido
df_ES = (df_ES.where(col("Rubrica") == 'TOTAL LIQUIDO')
         .where(col("Tipo")== 'L')
        
        )

In [23]:
#Vizualização de modificação
df_ES.filter(col('Tipo') == 'L' ).show()

+--------------+--------------------+--------------+----------+--------------------+-------------------+------------+--------------------+---+----+-------------+-------+----+------+
|     Matricula|        ServidorNome|   ServidorCpf|QuadroNome|         LotacaoNome|       DataAdmissao|CargaHoraria|         PlanoCargos|Mes| Ano|      Rubrica|  Valor|Tipo|Estado|
+--------------+--------------------+--------------+----------+--------------------+-------------------+------------+--------------------+---+----+-------------+-------+----+------+
|   95893/61781|Adelimar C. Caeta...|***.*81.927-**|     ATIVO|Secretaria M. de ...|2019-05-07 00:00:00|         125|              PROF V|  1|2020|TOTAL LIQUIDO|2575.09|   L|    ES|
|  825778/23188|Adelina de Jesus ...|***.*25.387-**|     ATIVO|Secretaria M. de ...|2016-03-05 00:00:00|         220| CONSELHEIRO TUTELAR|  1|2020|TOTAL LIQUIDO|6401.63|   L|    ES|
|    71056/8738|Ademir Nogueira Lyra|***.*70.717-**|     ATIVO|Secretaria Munici...|1992-1

In [24]:
#Exclusão de colunas que não serão utilizadas
cols_to_keep = {'Mes','Ano', 'ServidorNome','PlanoCargos', 'QuadroNome', 'Valor', 'Estado'}
cols_to_drop = list(set(df_ES.schema.names).difference(cols_to_keep))
   
df_ES = df_ES.drop(*cols_to_drop)

In [25]:
#Rename de colunas para padronização pensando na futura concatenação
df_ES = (df_ES.withColumnRenamed('Mês', 'mes')
  .withColumnRenamed('Ano', 'ano')
  .withColumnRenamed('PlanoCargos', 'cargo')
  .withColumnRenamed('ServidorNome', 'nome')
  .withColumnRenamed('Valor', 'salarioLiquido')
  .withColumnRenamed('Estado', 'estado')
  .withColumnRenamed('QuadroNome', 'situacao')
         
  
)
df_ES.show()

+--------------------+--------+--------------------+---+----+--------------+------+
|                nome|situacao|               cargo|Mes| ano|salarioLiquido|estado|
+--------------------+--------+--------------------+---+----+--------------+------+
|Adelimar C. Caeta...|   ATIVO|              PROF V|  1|2020|       2575.09|    ES|
|Adelina de Jesus ...|   ATIVO| CONSELHEIRO TUTELAR|  1|2020|       6401.63|    ES|
|Ademir Nogueira Lyra|   ATIVO|GR 1002 REF I - S...|  1|2020|        860.45|    ES|
|Adenice Sanson de...|   ATIVO|              PROF V|  1|2020|       2938.73|    ES|
|Adriana de F. R. ...|   ATIVO|              PROF V|  1|2020|       3319.12|    ES|
|Adriana de Medeir...|   ATIVO|              PROF V|  1|2020|       5664.27|    ES|
|Adriana Garcia M....|   ATIVO|              PROF V|  1|2020|       3666.43|    ES|
|Adriana J. Ferrei...|   ATIVO|GR 1010 REF I - S...|  1|2020|       1315.93|    ES|
|ADRIANA TEIXEIRA ...|   ATIVO|GR 1092 REF I - S...|  1|2020|        1314.1|

In [26]:
#Ordenação de colunas
coletar_colunas = ('mes','ano','nome','cargo','situacao','salarioLiquido', 'estado')
df_ES.select(*coletar_colunas).show(2)

+---+----+--------------------+-------------------+--------+--------------+------+
|mes| ano|                nome|              cargo|situacao|salarioLiquido|estado|
+---+----+--------------------+-------------------+--------+--------------+------+
|  1|2020|Adelimar C. Caeta...|             PROF V|   ATIVO|       2575.09|    ES|
|  1|2020|Adelina de Jesus ...|CONSELHEIRO TUTELAR|   ATIVO|       6401.63|    ES|
+---+----+--------------------+-------------------+--------+--------------+------+
only showing top 2 rows



# Salvar Arquivo 

In [28]:
(df_ES.write.format("parquet")
.option("header", "true")
.option("inferschema", "true")
.save("gs://projeto_final_2021/parquet_to_bq/ES/")
)

                                                                                