# Instalando o Pyspark no Google Colab

In [1]:
# install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

# unzip the spark file to the current folder
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

# install findspark and pyspark using pip
!pip install -q findspark
!pip install pyspark

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 38 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 41.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=77c346cc59f78f2b3ac3f4635dad900aff359c0d9f46229fce4b5320cc92bbf3
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


# Chamandos as libraries

In [2]:
import pandas as pd
import pyspark
import findspark
findspark.init()

# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.functions import *

#iniciando sessão Spark
spark = SparkSession.builder \
  .master('local') \
  .appName('DesafioStone') \
  .config('spark.ui.port', '4050') \
  .getOrCreate()

In [3]:
#upload do arquivo base_desafio_lucros.json
from google.colab import files
uploaded = files.upload()

Saving base_desafio_lucros.json to base_desafio_lucros.json


# Desenvolvimento

In [4]:
def valores_iniciais():

  #valor disponivel para distribuicao
  valor_total_disponibilizado = 5000000

  #efetuado a divisão do valor total pelas 3 regras existentes
  valor_divisao_cenarios = valor_total_disponibilizado / 3 

  #valor salario minimo atualmente
  salario_minimo_base = 1212.00

  #calculos utilizados para a regra de bonus por faixa salarial
  salario_minimo_base_mult_3 = salario_minimo_base * 3
  salario_minimo_base_mult_5 = salario_minimo_base * 5
  salario_minimo_base_mult_8 = salario_minimo_base * 8

  #calculos utilizados para a regra de bonus por tempo de admissao
  qtd_dias_1_ano = 365
  qtd_dias_3_anos = 365 * 3 
  qtd_dias_8_anos = 365 * 8

  #variaveis do com os valores definidos de bonus, de acordo com a area
  #maior o peso, maior o bonus com base no salario minimo atual
  percentual_peso1_area = salario_minimo_base * 1
  percentual_peso2_area = salario_minimo_base * 1.25
  percentual_peso3_area = salario_minimo_base * 1.5
  percentual_peso5_area = salario_minimo_base * 2

  #variaveis do com os valores definidos de bonus, de acordo com a faixa salarial
  #maior o peso, menor o bonus com base no salario minimo atual  
  percentual_peso5_salario = salario_minimo_base * 1
  percentual_peso3_salario = salario_minimo_base * 1.25
  percentual_peso2_salario = salario_minimo_base * 1.5
  percentual_peso1_salario = salario_minimo_base * 2  

  #variaveis do com os valores definidos de bonus, de acordo com tempo de admissao
  #maior o peso, maior o bonus com base no salario minimo atual 
  percentual_peso1_tempo_admissao = salario_minimo_base * 0.75
  percentual_peso2_tempo_admissao = salario_minimo_base * 0.95
  percentual_peso3_tempo_admissao = salario_minimo_base * 1.15
  percentual_peso5_tempo_admissao = salario_minimo_base * 1.55  

  #dict com as variaveis utilizadas no processo 
  var_regra_bonus = {'valor_total_disponibilizado':valor_total_disponibilizado
                    ,'valor_divisao_cenarios':valor_divisao_cenarios
                    ,'salario_minimo_base':salario_minimo_base
                    ,'salario_minimo_base_mult_3':salario_minimo_base_mult_3
                    ,'salario_minimo_base_mult_5':salario_minimo_base_mult_5
                    ,'salario_minimo_base_mult_8':salario_minimo_base_mult_8
                    ,'qtd_dias_1_ano':qtd_dias_1_ano
                    ,'qtd_dias_3_anos':qtd_dias_3_anos
                    ,'qtd_dias_8_anos':qtd_dias_8_anos
                    ,'area' : {'percentual_peso1_area': percentual_peso1_area
                              ,'percentual_peso2_area': percentual_peso2_area
                              ,'percentual_peso3_area': percentual_peso3_area
                              ,'percentual_peso5_area': percentual_peso5_area}
                    ,'salario' : {'percentual_peso1_salario': percentual_peso1_salario
                                  ,'percentual_peso2_salario': percentual_peso2_salario
                                  ,'percentual_peso3_salario': percentual_peso3_salario
                                  ,'percentual_peso5_salario': percentual_peso5_salario}
                    ,'tempo_admissao' : {'percentual_peso1_tempo_admissao' : percentual_peso1_tempo_admissao
                                        ,'percentual_peso2_tempo_admissao' : percentual_peso2_tempo_admissao 
                                        ,'percentual_peso3_tempo_admissao' : percentual_peso3_tempo_admissao
                                        ,'percentual_peso5_tempo_admissao' : percentual_peso5_tempo_admissao  
                                        }
                  } 

  return(var_regra_bonus)

def leitura_tabela():

  #processo apenas para leitura de arquivo json
  df_tabela_origem = spark.read.json('base_desafio_lucros.json') 

  return df_tabela_origem

def tratamento_tabela_origem(df_tabela_origem
                             ,salario_minimo_base_mult_3
                             ,salario_minimo_base_mult_5
                             ,salario_minimo_base_mult_8
                             ,qtd_dias_1_ano
                             ,qtd_dias_3_anos
                             ,qtd_dias_8_anos):
  
  #processo para tratamento de alguns campos, e adicao da variaveis de peso - utilizadas em suas respectivas regras
  df_tabela_origem_tratada = df_tabela_origem \
                            .select("nome"
                                    ,"matricula"
                                    ,upper(col("cargo")).alias("cargo")
                                    ,upper(col("area")).alias("area")
                                    ,regexp_replace(col("salario_bruto"), "[\$#,]", "").cast('float').alias("salario_bruto")
                                    ,to_date("data_de_admissao").alias("data_de_admissao")
                                    ,datediff(current_date(),col("data_de_admissao")).alias("dias_admissao")
                                    ) \
                              .withColumn('peso_area_atuacao',
                                  when(col('area') == 'DIRETORIA', 1)
                                  .when(col('area').isin('CONTABILIDADE','FINANCEIRO','TECNOLOGIA'), 2)
                                  .when(col('area') == 'SERVIÇOS GERAIS', 3)
                                  .when(col('area') == 'RELACIONAMENTO COM O CLIENTE', 5)
                                  .otherwise(99)) \
                              .withColumn('peso_faixa_salarial',
                                         when(col('salario_bruto') >= salario_minimo_base_mult_8, 5)
                                         .when((col('salario_bruto') < salario_minimo_base_mult_8)
                                               & (col('salario_bruto') >= salario_minimo_base_mult_5), 3)
                                         .when((col('salario_bruto') < salario_minimo_base_mult_5)
                                               & (col('salario_bruto') >= salario_minimo_base_mult_3), 2)     
                                         .when((col('salario_bruto') < salario_minimo_base_mult_3)
                                               & (col('cargo')!='JOVEM APRENDIZ'), 1) \
                                         .otherwise(99)) \
                              .withColumn('peso_tempo_admissao',
                                          when(col('dias_admissao') <= qtd_dias_1_ano, 1)
                                          .when((col('dias_admissao') > qtd_dias_1_ano)
                                                & (col('dias_admissao') <= qtd_dias_3_anos), 2)
                                          .when((col('dias_admissao') > qtd_dias_3_anos)
                                                & (col('dias_admissao') <= qtd_dias_8_anos), 3)                                 
                                          .otherwise(5)) ##acima de 8 anos                                         
  ##Para a definicao de regra do bonus por salario, surgiu uma duvida para o peso 1, stou entendendo que o jovem aprendiz não tera direito a este bonus

  return df_tabela_origem_tratada    

def calculo_bonus(df_funcionarios
                  ,variavel_peso
                  ,calculo_peso1
                  ,calculo_peso2
                  ,calculo_peso3
                  ,calculo_peso5
                  ,valor_divisao_cenarios):

  #efetuando o calculo das regras, de acordo com os dados de entrada
  df_calculo_bonus = df_funcionarios \
                   .withColumn('bonus_calculo',
                                 when(col(variavel_peso) == 1, calculo_peso1)
                                .when(col(variavel_peso) == 2, calculo_peso2)
                                .when(col(variavel_peso) == 3, calculo_peso3)
                                .when(col(variavel_peso) == 5, calculo_peso5)
                                .otherwise(0)) \
                    .select(col('matricula')
                            ,col(variavel_peso)
                            ,round(col('bonus_calculo'),2).alias('bonus_calculo')
                            )

  df_valida_divisao_regra = df_calculo_bonus.select(df_calculo_bonus['bonus_calculo']) \
                        .groupBy() \
                        .sum() \
                        .collect()[0][0]  

  print('Soma dos bonus por ', variavel_peso,' : ', df_valida_divisao_regra)                       

  #Validacao da soma dos bonus por regra - precisa ser menor que 1/3 do total : valor_divisao_cenarios
  if df_valida_divisao_regra <= valor_divisao_cenarios:
    print('Dentro da fatia disponivel para o bonus de: ', variavel_peso)
  else:
    print('Valor ultrapassou a fatia disponivel para o bonus de: ', variavel_peso)
    print('Necessários ajustar os valores iniciais')

  return df_calculo_bonus

def join_dataframes(df_funcionarios
                   ,df_calculo_regra_area
                   ,df_calculo_regra_salario
                   ,df_calculo_regra_tempo):
  
  #Dataframe para juntar todos as dfs criados anteriormente
  #definindo um unico df com todos os valores de bonus
  df_join_dataframes = df_funcionarios \
                       .join(df_calculo_regra_area
                             ,"matricula"
                             ,"inner") \
                       .join(df_calculo_regra_salario
                             ,"matricula"
                             ,"inner") \
                       .join(df_calculo_regra_tempo
                             ,"matricula"
                             ,"inner") \
                       .select(
                           df_funcionarios["matricula"]
                           ,df_funcionarios["nome"]
                           ,df_funcionarios["cargo"]
                           ,df_funcionarios["area"]
                           ,df_funcionarios["salario_bruto"]
                           ,df_funcionarios["peso_area_atuacao"]
                           ,df_funcionarios["peso_faixa_salarial"]
                           ,df_funcionarios["peso_tempo_admissao"]
                           ,df_calculo_regra_area["bonus_calculo"].alias("bonus_valor_area")
                           ,df_calculo_regra_salario["bonus_calculo"].alias("bonus_valor_salario")
                           ,df_calculo_regra_tempo["bonus_calculo"].alias("bonus_valor_tempo")
                           ,(df_calculo_regra_area["bonus_calculo"] + df_calculo_regra_salario["bonus_calculo"]
                           + df_calculo_regra_tempo["bonus_calculo"]).alias("bonus_valor_total")
                       )
 
  return(df_join_dataframes)
  
def valor_total_pago(df_join_dataframes_func):

  #calculo do total distribuido pela empresa
  #somando todos os bonus acumulados de cada funcionario
  df_valor_total_pago = df_join_dataframes_func \
                        .select(df_join_dataframes_func['bonus_valor_total']) \
                        .groupBy() \
                        .sum() \
                        .collect()[0][0]            
                      
  return(df_valor_total_pago)

def agrupamento_funcionarios(df_join_dataframes_func
                             ,variavel_agrupamento):
  
  #processo para calcular o total distrubido agrupando pelo campo parametrizado 
  df_agrupamento_retorno = df_join_dataframes_func \
                           .groupBy(col(variavel_agrupamento)) \
                           .agg(sum(col('bonus_valor_total')).alias('soma_bonus_valor_total')
                               ,count(col('matricula')).alias('numero_funcionarios')) \
                           .orderBy(col(variavel_agrupamento).desc())

  return(df_agrupamento_retorno)

def desafio_stone():

  #processo principal

  #chamando variaveis iniciais
  var_valores_iniciais = valores_iniciais()

  #logs de validacao
  print("valor_total_disponibilizado = ", var_valores_iniciais['valor_total_disponibilizado']
        ,"valor_divisao_cenarios = ",  var_valores_iniciais['valor_divisao_cenarios']
        ,"salario_minimo_base = ",  var_valores_iniciais['salario_minimo_base']     
        )
  
  print("salario_minimo_base_multiplicado_por_3 = ", var_valores_iniciais['salario_minimo_base_mult_3']
        ,"salario_minimo_base_multiplicado_por_5 = ", var_valores_iniciais['salario_minimo_base_mult_5']
        ,"salario_minimo_base_multiplicado_por_8 = ", var_valores_iniciais['salario_minimo_base_mult_8'])
  
  print("qtd_dias_1_ano = ", var_valores_iniciais['qtd_dias_1_ano']
        ,"qtd_dias_3_anos = ", var_valores_iniciais['qtd_dias_3_anos']
        ,"qtd_dias_8_anos = ", var_valores_iniciais['qtd_dias_8_anos'])

  print("percentual_peso1_area  = ", var_valores_iniciais['area']['percentual_peso1_area'])
  print("percentual_peso2_area  = ", var_valores_iniciais['area']['percentual_peso2_area'])
  print("percentual_peso3_area  = ", var_valores_iniciais['area']['percentual_peso3_area'])
  print("percentual_peso5_area  = ", var_valores_iniciais['area']['percentual_peso5_area'])

  print("percentual_peso1_salario  = ", var_valores_iniciais['salario']['percentual_peso5_salario'])
  print("percentual_peso2_salario  = ", var_valores_iniciais['salario']['percentual_peso2_salario'])
  print("percentual_peso3_salario  = ", var_valores_iniciais['salario']['percentual_peso2_salario'])
  print("percentual_peso5_salario  = ", var_valores_iniciais['salario']['percentual_peso1_salario'])

  print("percentual_peso1_tempo_admissao  = ", var_valores_iniciais['tempo_admissao']['percentual_peso1_tempo_admissao'])
  print("percentual_peso2_tempo_admissao  = ", var_valores_iniciais['tempo_admissao']['percentual_peso2_tempo_admissao'])
  print("percentual_peso3_tempo_admissao  = ", var_valores_iniciais['tempo_admissao']['percentual_peso3_tempo_admissao'])
  print("percentual_peso5_tempo_admissao  = ", var_valores_iniciais['tempo_admissao']['percentual_peso5_tempo_admissao'])

  df_tabela_origem = leitura_tabela()
  df_tabela_origem.show(5)

  #chamando def tratamento_tabela_origem para receber a tabel de funcionarios tratada e com os pesos de cada regra
  df_funcionarios = tratamento_tabela_origem(df_tabela_origem
                                             ,var_valores_iniciais['salario_minimo_base_mult_3']
                                             ,var_valores_iniciais['salario_minimo_base_mult_5']
                                             ,var_valores_iniciais['salario_minimo_base_mult_8']
                                             ,var_valores_iniciais['qtd_dias_1_ano']
                                             ,var_valores_iniciais['qtd_dias_3_anos']
                                             ,var_valores_iniciais['qtd_dias_8_anos'])
  
  df_funcionarios.show(5)

  #chamando def calculo_bonus por area
  df_calculo_regra_area = calculo_bonus(df_funcionarios
                                       ,'peso_area_atuacao'
                                       ,var_valores_iniciais['area']['percentual_peso1_area']
                                       ,var_valores_iniciais['area']['percentual_peso2_area']
                                       ,var_valores_iniciais['area']['percentual_peso3_area']
                                       ,var_valores_iniciais['area']['percentual_peso5_area']
                                       ,var_valores_iniciais['valor_divisao_cenarios'])
  df_calculo_regra_area.show(5)

  #chamando def calculo_bonus por salario
  df_calculo_regra_salario = calculo_bonus(df_funcionarios
                                          ,'peso_faixa_salarial'
                                          ,var_valores_iniciais['salario']['percentual_peso1_salario']
                                          ,var_valores_iniciais['salario']['percentual_peso2_salario']
                                          ,var_valores_iniciais['salario']['percentual_peso3_salario']
                                          ,var_valores_iniciais['salario']['percentual_peso5_salario']
                                          ,var_valores_iniciais['valor_divisao_cenarios'])
  df_calculo_regra_salario.show(5)

  #chamando def calculo_bonus por tempo
  df_calculo_regra_tempo = calculo_bonus(df_funcionarios
                                  ,'peso_tempo_admissao'
                                  ,var_valores_iniciais['tempo_admissao']['percentual_peso1_tempo_admissao']
                                  ,var_valores_iniciais['tempo_admissao']['percentual_peso2_tempo_admissao']
                                  ,var_valores_iniciais['tempo_admissao']['percentual_peso3_tempo_admissao']
                                  ,var_valores_iniciais['tempo_admissao']['percentual_peso5_tempo_admissao']
                                  ,var_valores_iniciais['valor_divisao_cenarios'])
  df_calculo_regra_tempo.show(5)
  
  #chamando def join_dataframes para realizar o join dos dataframes, e com a soma dos bonus de cada funcionario
  df_join_dataframes_func = join_dataframes(df_funcionarios
                                           ,df_calculo_regra_area
                                           ,df_calculo_regra_salario
                                           ,df_calculo_regra_tempo)
  df_join_dataframes_func.show(5)     

  var_valor_total_pago = valor_total_pago(df_join_dataframes_func)
  print("Total distribuído = ", var_valor_total_pago)

  print("Total disponibilizado = ", var_valores_iniciais['valor_total_disponibilizado'])

  print("Total distribuído e número de funcionários por área:")
  df_agrupamento_area = agrupamento_funcionarios(df_join_dataframes_func,'area')
  df_agrupamento_area.show(1000, truncate = False)

  print("Total distribuído e número de funcionários por faixa salarial:")
  df_agrupamento_salario = agrupamento_funcionarios(df_join_dataframes_func,'peso_faixa_salarial')
  df_agrupamento_salario.show(1000, truncate = False)

  print("Total distribuído e número de funcionários por tempo de admissão:")
  df_agrupamento_tempo = agrupamento_funcionarios(df_join_dataframes_func,'peso_tempo_admissao')
  df_agrupamento_tempo.show(1000, truncate = False)

  return None

# entry point for PySpark ETL application
if __name__ == '__main__':
  desafio_stone()

valor_total_disponibilizado =  5000000 valor_divisao_cenarios =  1666666.6666666667 salario_minimo_base =  1212.0
salario_minimo_base_multiplicado_por_3 =  3636.0 salario_minimo_base_multiplicado_por_5 =  6060.0 salario_minimo_base_multiplicado_por_8 =  9696.0
qtd_dias_1_ano =  365 qtd_dias_3_anos =  1095 qtd_dias_8_anos =  2920
percentual_peso1_area  =  1212.0
percentual_peso2_area  =  1515.0
percentual_peso3_area  =  1818.0
percentual_peso5_area  =  2424.0
percentual_peso1_salario  =  1212.0
percentual_peso2_salario  =  1818.0
percentual_peso3_salario  =  1818.0
percentual_peso5_salario  =  2424.0
percentual_peso1_tempo_admissao  =  909.0
percentual_peso2_tempo_admissao  =  1151.3999999999999
percentual_peso3_tempo_admissao  =  1393.8
percentual_peso5_tempo_admissao  =  1878.6000000000001
+--------------------+--------------------+----------------+---------+--------------------+-------------+
|                area|               cargo|data_de_admissao|matricula|                nome|s