In [43]:
!pip install openpyxl

Collecting openpyxl
  Downloading https://files.pythonhosted.org/packages/39/08/595298c9b7ced75e7d23be3e7596459980d63bc35112ca765ceccafbe9a4/openpyxl-3.0.7-py2.py3-none-any.whl (243kB)
[K    100% |████████████████████████████████| 245kB 1.5MB/s ta 0:00:01
[?25hCollecting et-xmlfile (from openpyxl)
  Downloading https://files.pythonhosted.org/packages/96/c2/3dd434b0108730014f1b96fd286040dc3bcb70066346f7e01ec2ac95865f/et_xmlfile-1.1.0-py3-none-any.whl
Installing collected packages: et-xmlfile, openpyxl
Successfully installed et-xmlfile-1.1.0 openpyxl-3.0.7


# raw-to-stage

In [49]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd

spark = SparkSession.builder.appName("Test").getOrCreate()

In [66]:
# read parameters

source_path = "raw/microdados_educacao_basica_2020/DADOS/matricula_nordeste.CSV"
target_path = "stage/censo_educacao/states_and_cities"
mode = "xlsx"
sheet_name = "Anexo10 - UFS e Municípios"
skiprows = int("3")

source_path = "raw/microdados_educacao_basica_2020/ANEXOS/ANEXO I - Dicionаrio de Dados e Tabelas Auxiliares/Tabelas Auxiliares.xlsx"


In [67]:
def pandas_to_spark(spark, pandas_df):
    pandas_df.columns = pandas_df.columns.str.strip()     
    pandas_df.columns = pandas_df.columns.str.replace(' ', '_')         
    pandas_df.columns = pandas_df.columns.str.replace(r"[^a-zA-Z\d\_]+", "") 
    columns = list(pandas_df.columns)
    struct_list = []
    for column in columns: 
        struct_list.append(StructField(column, StringType()))
    p_schema = StructType(struct_list)
    return spark.createDataFrame(pandas_df, p_schema)

In [68]:
if mode == "xlsx":
    pdf = pd.read_excel(source_path, sheet_name=sheet_name, skiprows=skiprows, skipfooter=1,engine='openpyxl')
    df = pandas_to_spark(spark, pdf)
else:
    df = spark.read.option("header",True).options(delimiter='|').csv(source_path)
    
df.write.mode("overwrite").parquet(target_path)

# stage-to-curated

In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

In [2]:
spark = SparkSession.builder.appName("Test").getOrCreate()

In [None]:
"/lake/stage/censo_educacao/enrollment_stages"






In [6]:
enrollment_stages = spark.read.parquet("lake/stage/censo_educacao/enrollment_stages")

In [10]:
enrollment_stages.select(
    col("Cdigo").alias("enrollment_stage_id"),
    col("Nome_Etapa").alias("enrollment_stage_name")
).show(1000)

+--------------------+---------------------+
| enrollment_stage_id|enrollment_stage_name|
+--------------------+---------------------+
|                  69| EJA - Ensino Fund...|
|                  70| EJA - Ensino Fund...|
|                  71|   EJA - Ensino Médio|
|                  72| EJA - Ensino Fund...|
|                  73| Curso FIC integra...|
|                  74| Curso Técnico Int...|
|Etapas de Ensino ...|                  NaN|
|                   4| Ensino Fundamenta...|
|                   5| Ensino Fundamenta...|
|                   6| Ensino Fundamenta...|
|                   7| Ensino Fundamenta...|
|                   8| Ensino Fundamenta...|
|                   9| Ensino Fundamenta...|
|                  10| Ensino Fundamenta...|
|                  11| Ensino Fundamenta...|
|                  12| Ensino Fundamenta...|
|                  13| Ensino Fundamenta...|
|                  24| Ensino Fundamenta...|
|                  40| Curso Técnico  - ...|
|         

In [14]:
# read parameters

source_path = "lake/stage/censo_educacao/"
target_path = "lake/stage/censo_educacao/fact_table"

tables = [
    "matricula_co",
    "matricula_norte",
    "matricula_nordeste",
    "matricula_sudeste",
    "matricula_sul"
]

aux_tables = [
    "states_and_cities",
    "enrollment_stages"
]

In [29]:
dfs = {}
for table in tables:
    dfs[table] = spark.read.parquet(source_path + table)
    
aux_dfs = {}
for table in aux_tables:
    aux_dfs[table] = spark.read.parquet(source_path + table)

In [30]:
aux_dfs["region"] = aux_dfs["states_and_cities"].select(
    col("Cdigo_da_Regio").alias("region_id"),
    col("Nome_da_Regio").alias("region_name")
).dropDuplicates()\
.filter(~col("region_id").like("CO_%"))\
.filter(~col("region_id").like("NaN"))

aux_dfs["state"] = aux_dfs["states_and_cities"].select(
    col("Cdigo_da_UF").alias("state_id"),
    col("Nome_da_UF").alias("state_name"),
    col("Sigla_da_UF").alias("state_abrv")
).dropDuplicates()\
.filter(~col("state_id").like("CO_%"))\
.filter(~col("state_id").like("NaN"))

aux_dfs["cities"] = aux_dfs["states_and_cities"].select(
    col("Cdigo_do_Municpio").alias("city_id"),
    col("Nome_do_Municpio").alias("city_name")
).dropDuplicates()\
.filter(~col("city_id").like("CO_%"))\
.filter(~col("city_id").like("NaN"))

aux_dfs["enrollment_stages"] = aux_dfs["enrollment_stages"].select(
    col("Cdigo").alias("enrollment_stage_id"),
    col("Nome_Etapa").alias("enrollment_stage_name")
).dropDuplicates()\
.filter(~col("enrollment_stage_id").like("NaN"))\
.filter(~col("enrollment_stage_name").like("NaN"))

In [31]:
for table in tables:
    dfs[table] = dfs[table].join(aux_dfs["region"], dfs[table]["CO_REGIAO"] == aux_dfs["region"]["region_id"], "left")
    dfs[table] = dfs[table].join(aux_dfs["state"], dfs[table]["CO_UF"] == aux_dfs["state"]["state_id"], "left")
    dfs[table] = dfs[table].join(aux_dfs["cities"], dfs[table]["CO_MUNICIPIO"] == aux_dfs["cities"]["city_id"], "left")
    
    dfs[table] = dfs[table].withColumn("student_gender", 
                                       when(dfs[table]["TP_SEXO"] == "1","Male")
                                       .when(dfs[table]["TP_SEXO"] == "2","Female")
                                       .otherwise("Other"))
    
    dfs[table] = dfs[table].withColumn("student_ethnicity", 
                                       when(dfs[table]["TP_COR_RACA"] == "1","Branca")
                                       .when(dfs[table]["TP_COR_RACA"] == "2","Preta")
                                       .when(dfs[table]["TP_COR_RACA"] == "3","Parda")
                                       .when(dfs[table]["TP_COR_RACA"] == "4","Amarela")
                                       .when(dfs[table]["TP_COR_RACA"] == "5","Indígena")
                                       .otherwise("Não declarada"))
    
    dfs[table] = dfs[table].select(
        col("ID_ALUNO").alias("student_id"),
        col("ID_MATRICULA").alias("enrollment_id"),
        col("NU_IDADE").alias("student_age"),
        col("student_gender"),
        col("student_ethnicity"),
        col("IN_NECESSIDADE_ESPECIAL").alias("has_special_needs"),
        col("IN_BAIXA_VISAO").alias("is_vision_impaired"),
        col("IN_CEGUEIRA").alias("is_blind"),
        col("IN_DEF_AUDITIVA").alias("is_hearing_impaired"),
        col("IN_DEF_FISICA").alias("is_physically_handicapped"),
        col("IN_DEF_INTELECTUAL").alias("is_intellectual_disabled"),
        col("IN_SURDEZ").alias("is_deaf"),
        col("IN_SURDOCEGUEIRA").alias("is_deaf_blind"),
        col("IN_DEF_MULTIPLA").alias("is_multiple_impaired"),
        col("IN_AUTISMO").alias("is_autist"),
        col("IN_SUPERDOTACAO").alias("is_gifted"),
        col("TP_ETAPA_ENSINO").alias("enrollment_stage_id"),
        col("region_id"),
        col("region_name"),
        col("state_id"),
        col("state_name"),
        col("state_abrv"),
        col("city_id"),
        col("city_name")
    )
    
#     df2=df.select(col("*"),when(df.gender == "1","Male")
#                   .when(df.gender == "2","Female")
#                   .when(df.gender.isNull() ,"")
#                   .otherwise(df.gender).alias("new_gender"))

    
#     1 - Masculino 2 - Feminino
# 0 - Não declarada 1 - Branca 2 - Preta 3 - Parda 4 - Amarela 5 - Indígena




In [32]:
union_df = dfs["matricula_co"].union(dfs["matricula_norte"])
union_df = union_df.union(dfs["matricula_nordeste"])
union_df = union_df.union(dfs["matricula_sudeste"])
union_df = union_df.union(dfs["matricula_sul"])

In [131]:
union_df.write.parquet(target_path)

In [33]:
union_df.show()

+--------------------+-------------+-----------+--------------+-----------------+-----------------+------------------+--------+-------------------+-------------------------+------------------------+-------+-------------+--------------------+---------+---------+-------------------+---------+------------+--------+------------------+----------+-------+-------------------+
|          student_id|enrollment_id|student_age|student_gender|student_ethnicity|has_special_needs|is_vision_impaired|is_blind|is_hearing_impaired|is_physically_handicapped|is_intellectual_disabled|is_deaf|is_deaf_blind|is_multiple_impaired|is_autist|is_gifted|enrollment_stage_id|region_id| region_name|state_id|        state_name|state_abrv|city_id|          city_name|
+--------------------+-------------+-----------+--------------+-----------------+-----------------+------------------+--------+-------------------+-------------------------+------------------------+-------+-------------+--------------------+---------+-----

In [133]:
import json

In [137]:
parameters = """
{
    "source_path": "stage/censo_educacao/",
    "target_path": "curated/censo_educacao/enrollments",
    "tables": [
        "matricula_co",
        "matricula_norte",
        "matricula_nordeste",
        "matricula_sudeste",
        "matricula_sul"
    ],
    "aux_tables": [
        "states_and_cities"
    ]

}

"""

In [138]:
parameters_json = json.loads(parameters)

In [140]:
type(parameters_json["tables"])

list

# curated-to-dw