In [0]:
import pyspark
from pyspark.sql.functions import *  
from pyspark.sql.types import IntegerType, FloatType, StringType,BooleanType,DateType, DoubleType

In [0]:
# Lendo os dados em csv e atribuindo-os a um dataframe
df = spark.read.options(header=True, delimiter=';').csv("/FileStore/tables/base_mensalizada_de_funcionarios.csv")

In [0]:
# Analise inicial
df.show()

In [0]:
# visualizando uma quantidade especifica de linhas
df.show(5)

In [0]:
# visualizando o nome das colunas
df.columns

In [0]:
# visualizando o Schema do dataframe
df.printSchema()

In [0]:
# exibindo estatísticas básicas
df.describe().show()

In [0]:
# exibindo o total de linhas do dataframe
df.count()  

In [0]:
# Filtrando linhas com valores nulos em várias colunas no dataframe
df.filter(df.id_funcionario.isNull() | df.data_de_nascimento.isNull() | df.data_de_admissao.isNull() | df.data_de_demissao.isNull() | df.grau_de_instrucao.isNull() | df.cargo.isNull() | df.salario.isNull()).display()

In [0]:
# Completude: as colunas não podem conter valores nulos, então substitui por " ------ "
df2 =df.na.fill("  -------  ")

In [0]:
# Unicidade: colunas que são chaves não podem se repetir dentro de um mês referência, o que quer dizer que os dados de um funcionário não podem aparecer mais de uma vez dentro de um mês.
# Excluindo linhas duplicadas levando em consideração o mês de referencia
df3 = df2.dropDuplicates()

In [0]:
# Total de linhas no novo dataframe após excluir as linhas duplicadas
df3.count()

In [0]:
# Integridade: temos que garantir que em determinada coluna haja valores condizentes ao tipo e à finalidade dela. Por exemplo: se a coluna é do tipo texto, não pode haver valores incompatíveis como, por exemplo, números. Garanta a integridade dos dados em cada coluna.
# substitui o valor de salario na lina 89 por não informado
df4 = df3.withColumn('salario', regexp_replace('salario', 'O líder de tecnologia é uma pessoa maravilhosa, sempre muito atencioso com a equipe e claro em suas expectativas.', '  ------  '))

In [0]:
# Valores do tipo texto devem ter a primeira letra de uma palavra no modo maiúsculo;
df5 = df4.withColumn("grau_de_instrucao", initcap(col('grau_de_instrucao'))).withColumn("cargo", initcap(col('cargo')))

In [0]:
# Remova espaços antes e depois de todas as colunas do tipo string;
df6 = df5.withColumn('grau_de_instrucao', regexp_replace(col("grau_de_instrucao"), " ", "")).withColumn('cargo', regexp_replace(col("cargo"), " ", ""))

In [0]:
# Colunas de data devem estar no formato date;
df7 = df6.withColumn('mes_referencia', to_date(df6.mes_referencia, 'dd/MM/yyyy')).withColumn('data_de_nascimento', to_date(df6.data_de_nascimento, 'dd/MM/yyyy')).withColumn('data_de_admissao', to_date(df6.data_de_admissao, 'dd/MM/yyyy')).withColumn('data_de_demissao', to_date(df6.data_de_demissao, 'dd/MM/yyyy'))

In [0]:
df7.printSchema()

In [0]:
# funcao para substituir valores nulos em colunas de data
def replace(column, value):
    return when(column == value, column).otherwise(lit("____-__-__"))

# Substituindo valores nulos nas colunas com datas
df8 = df7.withColumn("data_de_demissao", replace(col("data_de_demissao"), None))

In [0]:
# Salário deve estar no formato double ou float.
df9 = df8.withColumn("salario",round(df7.salario.cast(DoubleType()),2))

In [0]:
# Admitido no mês: essa coluna deverá ser do tipo numérico, respeitando a seguinte condição: se a data de admissão do registro for igual ao mês de referência analisado, admitido no mês recebe valor é igual a 1, senão recebe valor igual a 0.
df10 = df9.withColumn("admitido_no_mes",when(df9['data_de_admissao']==df9['mes_referencia'],1).otherwise(0))

In [0]:
# Demitido no mês: essa coluna deverá ser do tipo numérico, respeitando a seguinte condição: se a data de demissão do registro for igual ao mês de referência analisado, demitido no mês recebe valor é igual a 1, senão recebe valor igual a 0.
df11 = df10.withColumn("demitido_no_mes",when(df10['data_de_demissao']==df10['mes_referencia'],1).otherwise(0))

In [0]:
# Idade: calcule a idade em anos no mês de referência em relação à data de nascimento.
df12 = df11.withColumn("idade", floor(months_between(col("mes_referencia"),col("data_de_nascimento"))/lit(12)))

In [0]:
df12.show()

In [0]:
# Escolaridade categorizada: com base na coluna grau de instrução, categorize a escolaridade do colaborador para conter apenas as seguintes categorias: ensino fundamental incompleto, ensino fundamental completo, ensino médio incompleto, ensino médio completo, ensino superior incompleto, ensino superior completo, pós graduação, mestrado, doutorado.
df13 = df12.withColumn("grau_de_instrucao", when(df12.grau_de_instrucao == "4ªSérieCompleto", "EnsinoFundamentalIncompleto") \
                       .when(df12.grau_de_instrucao == "Graduação","EnsinoSuperiorCompleto") \
                       .when(df12.grau_de_instrucao == "EnsinoMédioComp","EnsinoMédioCompleto") \
                       .otherwise(df12.grau_de_instrucao))


In [0]:
df13.show()

In [0]:
#Tempo de empresa: com base no mês referência e a data de admissão, obtenha o tempo de empresa do colaborador em meses.
df14 = df13.withColumn("tempo_de_empresa(meses)", floor(months_between(col("mes_referencia"),col("data_de_admissao"))))

In [0]:
df14.show()

In [0]:
df14.select('id_funcionario').distinct().show()

In [0]:
df14.orderBy(col("id_funcionario"),col("mes_referencia")).display()

In [0]:
df14.filter("mes_referencia == '2020-12-01' and id_funcionario == 46457").show()