In [0]:
# spark.conf.set("spark.hadoop.google.cloud.auth.service.account.enable", "true")
# spark.conf.set("spark.hadoop.fs.gs.auth.service.account.email", "bucket-bigquery@leafy-environs-409823.iam.gserviceaccount.com")
# spark.conf.set("spark.hadoop.fs.gs.project.id", "leafy-environs-409823")
# spark.conf.set("spark.hadoop.fs.gs.auth.service.account.private.key", dbutils.secrets.get(scope='gcp-bucket', key='databricks-bucket-key'))
# spark.conf.set("spark.hadoop.fs.gs.auth.service.account.private.key.id", dbutils.secrets.get(scope='gcp-bucket', key='databricks-bucket-key_id'))

In [0]:
%pip install fuzzywuzzy
%pip install python-Levenshtein
dbutils.library.restartPython()

In [0]:
from datetime import datetime
import re
from pathlib import Path
from pyspark.sql.functions import col, when, regexp_replace, lit, levenshtein, rank, row_number, pandas_udf, length
from pyspark.sql.types import FloatType, StringType, IntegerType
from pyspark.sql.types import StructType, StructField
from pyspark.sql.window import Window
from re import sub
from fuzzywuzzy import fuzz, process

In [0]:
# GCP secret -> steps to install
# databricks configure --token
# databricks secrets create-scope gcp-bucket --initial-manage-principal users
# databricks secrets put-secret gcp-bucket databricks-bucket-key
# databricks secrets put-secret gcp-bucket databricks-bucket-key_id
# Add config on Cluster Spark config https://docs.gcp.databricks.com/en/connect/storage/gcs.html

print(dbutils.secrets.listScopes())
print(dbutils.secrets.list('gcp-bucket'))

dbutils.fs.ls("gs://bossa-bucket-coutj/")


In [0]:

#READING datasus raw data parquet

dbutils.widgets.text("file_name", "", "Enter file_name")
file_name = dbutils.widgets.get('file_name') or '2016/folha_2016_1.parquet'
# print(file_name)
df = spark.read.parquet(f"gs://bossa-bucket-coutj/raw/{file_name}")

In [0]:
# Remove "-"

for column in df.columns:
    # print(column)
    df = df.withColumn(column, when( col(column)=='-', None).otherwise(col(column)))
    
# display(df)

In [0]:
# Fix currency format and cast string values to float
numeric_columns = [col for col in df.columns if col not in ['nome', 'cargo', 'funcao']]

for column in numeric_columns:
    df = df.withColumn(
                column, regexp_replace(col(column), "\.", "")
                ).withColumn(column, regexp_replace(column, ",", ".").cast("float"))


In [0]:
# Define data date

date_str = re.search(r"(?<=folha_)(.*)(?=\.parquet)",file_name).group(0).split('_')
date = datetime.strptime(f"{date_str[0]}/{date_str[1]}/1","%Y/%m/%d")
table_name = f"{date_str[0]}_{date_str[1]}"
# Add date column
df = df.withColumn('date',lit(date))

In [0]:
out_table_schema = {
    "nome": [
        "nome"
    ],
    "cargo": [
        "cargo"
    ],
    "funcao": [
        "funcao"
    ],
    "rendimento_do_funcionario": [
        "rendfunc",
        "rendfuncionario",
        "rendimentos_do_funcionario",
        "rendimento_func",
        "rendimento_funcionario",
        "vencimento",
        "rendimento_do_funcionario"
    ],
    "comissao": [
        "comissao"
    ],
    "representacao_grat_seguranca": [
        "representacao_grat_seguranca",
        "representacao_grat_segur",
        "representacao_grat_segur_qualificacao",
        "represent_grat_segur",
        "representacao_grat_segur_grat_qualificacao_sfam",
        "representacao_grat",
        "representacao_qualificacao_grat_seguranca_s_fam",
        "representacao_grat_seguranca_grat_qualificacao_s_fam",
        "representacao_grat_segur_grat_qualificacao_s_fam",
        "represent_gratseg"
    ],
    "incorporado": [
        "incorporado",
        "eignucrorporado"
    ],
    "trienio": [
        "trienio"
    ],
    "bolsa_reforco_escolar": [
        "abono_de_permanencia",
        "bolsa_reforco_escolar_abono_permanencia",
        "bolsa_reforco_escolar"
    ],
    "ferias": [
        "ferias"
    ],
    "redutor": [
        "redutor"
    ],
    "ipalerj_mensalidade": [
        "ipalerj_mensalidade",
        "ipalerj_mensalida_de",
        "ipalerj_mens"
    ],
    "pensao_alimenticia": [
        "pensao_alimenticia"
    ],
    "previdencia_inss": [
        "previdencia_inss",
        "previdencia"
    ],
    "imposto_de_renda": [
        "ir",
        "imposto_de_renda",
        "imp_de_renda"
    ],
    "indenizatoria": [
        "indenizatoria"
    ],
    "rendimento_liquido": [
        "total_liquido",
        "rendimento_liquido"
    ],
    "mes_referencia": [
        "date"
    ]
}

# Fix column names

for column in df.columns:
    col_orig_name = column
    column = sub("(_{2,})", "_", column)
    found = False
    for col_ref_name in out_table_schema:
        if column in out_table_schema[col_ref_name]:
            found = True
            df = df.withColumnRenamed(col_orig_name, col_ref_name)
            break
        
    if not found:
        raise Exception(f"Found columns not mapped on the schema: {column}")

# Adding empty non existing columns 
for schema_column in out_table_schema:
    if schema_column not in df.columns:
        df = df.withColumn(schema_column, lit(None))
        df = df.withColumn(schema_column, col(schema_column).cast(FloatType()))

In [0]:
# Remove rows with empty names

df = df.na.drop(subset=['nome'])


In [0]:
# Standardize role names (Teste fuzzywuzzy)


positions = [
    "ASS. ESP. DE TECN. PARLAMENTAR",
    "ASSESSOR",
    "ASSESSOR ADJUNTO",
    "ASSESSOR ASSISTENTE",
    "ASSESSOR DA COORDENADORIA",
    "ASSESSOR DE DIRETOR",
    "ASSESSOR ESPECIAL",
    "ASSESSOR ESPECIAL ADJUNTO",
    "ASSESSOR ESPECIAL ASSISTENTE",
    "ASSESSOR GABINETE",
    "ASSESSOR PARLAMENTAR I",
    "ASSESSOR PARLAMENTAR II",
    "ASSESSOR PARLAMENTAR III",
    "ASSESSOR PARLAMENTAR IV",
    "ASSESSOR PARLAMENTAR IX",
    "ASSESSOR PARLAMENTAR V",
    "ASSESSOR PARLAMENTAR VI",
    "ASSESSOR PARLAMENTAR VII",
    "ASSESSOR PARLAMENTAR VIII",
    "ASSESSOR TEC. PARLAMENTAR",
    "ASSIST. DE DIRETOR DE DIVISAO",
    "ASSISTENTE ADJUNTO",
    "ASSISTENTE DE COORDENADORIA",
    "ASSISTENTE DE DIRETOR DE DEPTO",
    "ASSISTENTE I",
    "ASSISTENTE II",
    "ASSISTENTE III",
    "ASSISTENTE IV",
    "ASSISTENTE IX",
    "ASSISTENTE PRESIDENTE COMISSAO",
    "ASSISTENTE SUB-DIRETOR GERAL",
    "ASSISTENTE V",
    "ASSISTENTE VI",
    "ASSISTENTE VII",
    "ASSISTENTE VIII",
    "AUXILIAR ADMINISTRATIVO",
    "AUXILIAR DE GABINETE",
    "AUXILIAR ESPECIAL",
    "AUXILIAR I",
    "AUXILIAR II",
    "AUXILIAR III",
    "AUXILIAR IV",
    "AUXILIAR LEGISLATIVO",
    "AUXILIAR V",
    "CHEFE DE GABINETE DIRETOR GERAL",
    "CHEFE DE GABINETE",
    "CHEFE DE GABINETE PARLAMENTAR",
    "CHEFE DE GABINETE PRES.",
    "CHEFE GABINETE LIDERANCA",
    "CHEFE GABINETE SUPL. LIDER.",
    "CHEFE GABINETE PRIMEIRO SECRETARIO",
    "CHEFE GABINETE SECRETARIAS",
    "CHEFE GABINETE VICE PRES.",
    "CHEFE GABINETE VOGAL",
    "CONSULTOR ESP. P/ ASSUNT. PARLA.",
    "CONTROLADOR",
    "COORD P/ ASSUNTOS MILITARES",
    "COORD. DE ALMOXARIFADO",
    "COORD. DE BENS PATRIMONIAS",
    "COORD. DE COMUNICACOES",
    "COORD. DE ODONTOLOGIA",
    "COORD. DE PORTARIA",
    "COORDENADOR DE DIVISAO",
    "COORDENADOR DE ENFERMAGEM",
    "COORDENADOR DE OFICINA",
    "DIRETOR DE DEPARTAMENTO",
    "DIRETOR GERAL ALERJ",
    "ENCARREGADO DE SETOR",
    "MEMBRO DE COMISSAO",
    "PRESIDENTE COMIS. LICITACAO",
    "PRESIDENTE DA C.P.P.A",
    "PRESIDENTE DE COMISSAO",
    "PROCURADOR GERAL",
    "REDATOR DE ASSUNTOS CONSTITUICAO",
    "SECRETARIO DE COMISSAO",
    "SECRETARIO GERAL",
    "SECRETARIO GERAL MS DIRETORA",
    "SUBCHEFE DE GABINETE",
    "SUBDIRETOR GERAL ADMINISTRACAO",
    "SUBDIRETOR GERAL ALERJ",
    "SUBDIRETOR GERAL ASSUNTOS LEGISLATIVOS",
    "SUBDIRETOR GERAL CERIMONIAL",
    "SUBDIRETOR GERAL COM SOCIAL",
    "SUBDIRETOR GERAL DE CONTROLE INTERNO",
    "SUBDIRETOR GERAL ENG ARQUIT",
    "SUBDIRETOR GERAL ESCOLA DO LEGISLATIVO",
    "SUBDIRETOR GERAL FINANCAS",
    "SUBDIRETOR GERAL FORUM PERM",
    "SUBDIRETOR GERAL INFORMATICA",
    "SUBDIRETOR GERAL RECURSOS HUMANOS",
    "SUBDIRETOR GERAL SEGURANCA",
    "SUBDIRETOR GERAL TV ALERJ",
    "SUBPROCURADOR GERAL",
    "SUBPROCURADOR GERAL ADJUNTO",
    "SUPERINTENDENTE II",
    "SUPERINTENDENTE IV",
    "VOGAL"
]

df_positions = spark.createDataFrame([ [pos] for pos in positions], StructType([StructField('funcoes', StringType(), True)]))

df_funcoes_orig = df.select(col('funcao').alias("funcao_orig")).distinct()

# display(df_funcoes_orig)

df_cross_join_funcao = df_funcoes_orig.crossJoin(df_positions)

@udf(IntegerType())
def matchstring(s1, s2):
    return fuzz.token_sort_ratio(s1, s2)

df_cross_join_funcao = df_cross_join_funcao.withColumn('distancia', matchstring(col('funcao_orig'), col('funcoes')))

df_cross_join_funcao = (
    df_cross_join_funcao
    .filter(col('funcao_orig').isNotNull())
    .withColumn('rank', row_number().over(Window.partitionBy('funcao_orig').orderBy(col("distancia").desc())))
    .groupBy('funcao_orig', 'funcoes', 'distancia').min('rank').filter(col('min(rank)') == 1)
    )

df_cross_join_funcao = df_cross_join_funcao.withColumn('result', when(col('distancia') > 80, col('funcoes')).otherwise(col('funcao_orig')))
df_cross_join_funcao = df_cross_join_funcao[['funcao_orig', 'result']]

df = df.join(df_cross_join_funcao, df.funcao == df_cross_join_funcao.funcao_orig, how='left')
df = df.withColumn('funcao', col('result'))
df = df.drop(*['funcao_orig', 'result'])
        

In [0]:
# Standardize cargo

df = (df.withColumn('cargo', when(length('cargo') <= 3, None)
        .when(col('cargo').rlike("^[^A-Za-z].*$"), None)
        .otherwise(col('cargo')))
    )

positions = [
    "ASSESSOR PARLAMENTAR I",
    "ASSESSOR PARLAMENTAR II",
    "ASSESSOR PARLAMENTAR III",
    "ASSESSOR PARLAMENTAR IV",
    "ASSESSOR PARLAMENTAR V",
    "ASSESSOR PARLAMENTAR VI",
    "ASSESSOR PARLAMENTAR VII",
    "ASSESSOR PARLAMENTAR VIII",
    "ASSESSOR PARLAMENTAR IX",
    "ASSISTENTE I",
    "ASSISTENTE II",
    "ASSISTENTE III",
    "ASSISTENTE IV",
    "ASSISTENTE V",
    "ASSISTENTE VI",
    "ASSISTENTE VII",
    "ASSISTENTE VIII",
    "ASSISTENTE IX",
    "ASSISTENTE X",
    "AUXILIAR I",
    "AUXILIAR II",
    "AUXILIAR III",
    "AUXILIAR IV",
    "AUXILIAR V",
    "AUXILIAR VI",
    "AUXILIAR VII",
    "AUXILIAR VIII",
    "AUXILIAR IX",
    "AUXILIAR X",
    "AUXILIAR ADMINISTRATIVO",
    "AUXILIAR ESPECIAL",
    "AUXILIAR LEGISLATIVO",
    "CONSULTOR TECNICO",
    "DEPUTADO ESTADUAL",
    "ENCARREGADO DE SETOR",
    "ESPEC LEG NIV – 1",
    "ESPEC LEG NIV – 2",
    "ESPEC LEG NIV – 3",
    "ESPEC LEG NIV – 4",
    "ESPEC LEG NIV – 5",
    "ESPEC LEG NIV – 6",
    "ESPEC LEG NIV – 7",
    "ESPEC LEG NIV – 8",
    "ESPEC LEG NIV – 9",
    "ESPEC LEG NIV – 10"
]

df_positions = spark.createDataFrame([ [pos] for pos in positions], StructType([StructField('cargos', StringType(), True)]))

df_cargos_orig = df.select(col('cargo').alias("cargo_orig")).distinct()

df_cross_join_cargo = df_cargos_orig.crossJoin(df_positions)

@udf(IntegerType())
def matchstring(s1, s2):
    return fuzz.token_set_ratio(s1, s2)

df_cross_join_cargo = df_cross_join_cargo.withColumn('distancia', matchstring(col('cargo_orig'), col('cargos')))

df_cross_join_cargo = (
    df_cross_join_cargo
    .filter(col('cargo_orig').isNotNull())
    .withColumn('rank', row_number().over(Window.partitionBy('cargo_orig').orderBy(col("distancia").desc())))
    .groupBy('cargo_orig', 'cargos', 'distancia').min('rank').filter(col('min(rank)') == 1)
    )

df_cross_join_cargo = df_cross_join_cargo.withColumn('result', when(col('distancia') > 80, col('cargos')).otherwise(col('cargo_orig')))
df_cross_join_cargo = df_cross_join_cargo[['cargo_orig', 'result']]

df = df.join(df_cross_join_cargo, df.cargo == df_cross_join_cargo.cargo_orig, how='left')
df = df.withColumn('cargo', col('result'))
df = df.drop(*['cargo_orig', 'result'])


In [0]:

try:
    bq_lines = (spark.read.format('bigquery')
        .option('table', f"leafy-environs-409823.alerj_ds.alerj_payslip")
        .option("parentProject", 'leafy-environs-409823')
        .load())
    bq_lines.createOrReplaceTempView('bq_table_view')

    sql_query = f"""
        SELECT * FROM bq_table_view where 
            nome="{df.first()["nome"]}" AND
            mes_referencia="{df.first()["mes_referencia"]}" AND
            rendimento_liquido="{df.first()["rendimento_liquido"]}"
        """
    existing_data = spark.sql(sql_query)
except Exception as e:
    if "alerj_ds.alerj_payslip" in str(e.java_exception):
        empty_schema = StructType([])
        existing_data = spark.createDataFrame([], schema=empty_schema)
    else:
        raise Exception(f"Error Reading From BigQuery: {e}")


In [0]:
# Write dataframe to Bigquery:
if not len(existing_data.take(1)):
    (df.write.format("bigquery")
        .mode("append")
        .option("project", 'leafy-environs-409823')
        .option("parentProject", 'leafy-environs-409823')
        .option("temporaryGcsBucket","bossa-bucket-coutj")
        .option("table",f"leafy-environs-409823.alerj_ds.alerj_payslip")
        .save())
    
    df.write.mode('overwrite').parquet(f'gs://bossa-bucket-coutj/trusted/{file_name}')
else:
    print("Data Already Exists")