# Teste Unitário - Amostras

**Objetivo**: Gerar as amostras de dados de escolas, alunas, professores e turmas

---

## Cabeçalho 

### Imports 

In [None]:
import gc
import os
import pyunpack
import rarfile
import shutil
import zipfile
import numpy as np
import pandas as pd

from pathlib import Path
from tqdm import tqdm

### Configurações 

In [None]:
%config Completer.use_jedi = False

### Caminhos 

In [None]:
try:
    PASTA_NOTEBOOK / ""
except:
    PASTA_NOTEBOOK = Path(os.getcwd())
    
PASTA_PROJETO = PASTA_NOTEBOOK.parent
PASTA_DADOS = PASTA_PROJETO / "dados"
PASTA_SAIDA = PASTA_PROJETO / "saida"
PASTA_TESTE = PASTA_PROJETO / "src/tests"
PASTA_TESTE_DADOS = PASTA_TESTE / "dados"
PASTA_TESTE_SAIDA = PASTA_TESTE / "saida"
PASTA_TESTE_CODIGO = PASTA_TESTE / "codigos"

os.chdir(PASTA_PROJETO)

### Funções 

In [None]:
def carrega_dados(caminho: Path, tabela: str, cols: list, filtros: dict = None) -> dict:
    usecols = lambda x: (
        x in [
            "PK_COD_ENTIDADE",
            "CO_ENTIDADE",
            "PK_COD_TURMA", 
            "ID_TURMA",
            "FK_COD_DOCENTE",
            "CO_PESSOA_FISICA",
            "ID_DOCENTE",
            "PK_COD_MATRICULA",
            "ID_MATRICULA",
            "FK_COD_ALUNO",
            "ID_ALUNO",
            "ANO_CENSO",
            "NU_ANO_CENSO",
        ]
    )
    
    # inicializa os dados de entrada como um dicionário vazio
    dados = dict()

    # para cada arquivo do censo demográfico
    for censo in tqdm(os.listdir(caminho)):
        # abre o arquivo zip com o conteúdo do censo
        with zipfile.ZipFile(caminho / f"{censo}") as z:
            # lista os conteúdos dos arquivos zip que contém o nome tabela
            arqs = [f for f in z.namelist() if f"{tabela}." in f.lower()]

            # se houver algum arquivo deste tipo dentro do zip
            if len(arqs) == 1:
                arq = arqs[0]

                # e este arquivo for um CSV
                if ".csv" in arq.lower():
                    # le os conteúdos do arquivo por meio do buffer do zip
                    dados[censo] = pd.read_csv(
                        z.open(arq), encoding="latin-1", sep="|", usecols=usecols
                    )

                # caso seja outro arquivo zip
                elif ".zip" in arq.lower():
                    # cria um novo zipfile e le o arquivo deste novo zip
                    with zipfile.ZipFile(z.open(arq)) as z2:
                        arq = z2.namelist()[0]
                        dados[censo] = pd.read_csv(
                            z2.open(arq), encoding="latin-1", sep="|", usecols=usecols
                        )

                # caso seja um arquivo winrar
                elif ".rar" in arq.lower():
                    # extraí o conteúdo do arquivo
                    z.extract(arq, path=caminho)
                    pyunpack.Archive(caminho / f"{arq}").extractall(caminho)

                    # lê os dados do disco
                    csv = [
                        f
                        for f in os.listdir(caminho)
                        if f"{tabela}." in f.lower()
                    ][0]
                    dados[censo] = pd.read_csv(
                        caminho / f"{csv}", encoding="latin-1", sep="|", usecols=usecols
                    )

                    # excluí os conteúdos extraídos
                    shutil.rmtree(caminho / f"{arq.split('/')[0]}")
                    os.remove(caminho / f"{csv}")
                
                # renomeia as colunas
                dados[censo].rename(
                    columns={
                        "PK_COD_ENTIDADE": "CO_ENTIDADE",
                        "PK_COD_TURMA": "ID_TURMA",
                        "FK_COD_DOCENTE": "ID_DOCENTE",
                        "CO_PESSOA_FISICA": "ID_DOCENTE",
                        "PK_COD_MATRICULA": "ID_MATRICULA",
                        "FK_COD_ALUNO": "ID_ALUNO",
                        "ANO_CENSO": "NU_ANO_CENSO",
                    }, 
                    inplace=True
                )
                                
                # aplica os filtros sobre o arquivo
                if filtros is not None:
                    for c, l in filtros.items():
                        dados[censo] = dados[censo].loc[lambda f: f[c].isin(l)]
                
                # seleciona as colunas
                dados[censo] = dados[censo].reindex(columns=cols).drop_duplicates()

    return dados

---

## Escolas 

In [None]:
np.random.seed(42)

dados = carrega_dados(PASTA_DADOS / "censo_escolar", "escolas", ["CO_ENTIDADE"])

escolas = (
    pd.concat([d["CO_ENTIDADE"] for d in dados.values()])
    .drop_duplicates()
    .sample(1000)
)

escolas.to_pickle(PASTA_TESTE_CODIGO / "escolas.pkl")

del dados
while gc.collect():
    continue

## Gestores 

In [None]:
escolas = pd.read_pickle(PASTA_TESTE_CODIGO / "escolas.pkl")

dados = carrega_dados(
    PASTA_DADOS / "censo_escolar", 
    "gestor", 
    ["ID_GESTOR"],
    filtros={"CO_ENTIDADE": escolas.values}
)

gestores = (
    pd.concat([d["ID_GESTOR"] for d in dados.values()])
    .drop_duplicates()
)

gestores.to_pickle(PASTA_TESTE_CODIGO / "gestores.pkl")

del dados
while gc.collect():
    continue

## Turmas 

In [None]:
np.random.seed(42)

escolas = pd.read_pickle(PASTA_TESTE_CODIGO / "escolas.pkl")

dados = carrega_dados(
    PASTA_DADOS / "censo_escolar", 
    "turmas", 
    ["ID_TURMA", "NU_ANO_CENSO"],
    filtros={"CO_ENTIDADE": escolas.values}
)

turmas = (
    pd.concat([d["ID_TURMA"].sample(1000) for d in dados.values()])
    .drop_duplicates()
)

turmas.to_pickle(PASTA_TESTE_CODIGO / "turmas.pkl")

del dados
while gc.collect():
    continue

## Docentes 

In [None]:
np.random.seed(42)

turmas = pd.read_pickle(PASTA_TESTE_CODIGO / "turmas.pkl")

docentes = pd.DataFrame()

for reg in ["co", "nordeste", "norte", "sudeste", "sul"]:
    dados = carrega_dados(
        PASTA_DADOS / "censo_escolar", 
        f"docentes_{reg}", 
        ["ID_DOCENTE", "NU_ANO_CENSO"],
        filtros={"ID_TURMA": turmas.values}
    )
    
    docentes = docentes.append(pd.concat(list(dados.values())))

docentes = pd.concat([
    docentes.loc[lambda f: f["NU_ANO_CENSO"] == a, "ID_DOCENTE"].sample(1000) 
    for a in docentes["NU_ANO_CENSO"].unique()
]).drop_duplicates()
docentes.to_pickle(PASTA_TESTE_CODIGO / "docentes.pkl")

del dados
while gc.collect():
    continue

## Matriculas 

In [None]:
np.random.seed(42)

turmas = pd.read_pickle(PASTA_TESTE_CODIGO / "turmas.pkl")

matriculas = pd.DataFrame()

for reg in ["co", "nordeste", "norte", "sudeste", "sul"]:
    dados = carrega_dados(
        PASTA_DADOS / "censo_escolar", 
        f"matricula_{reg}", 
        ["ID_MATRICULA", "ID_ALUNO", "NU_ANO_CENSO"],
        filtros={"ID_TURMA": turmas.values}
    )
    
    matriculas = matriculas.append(pd.concat(list(dados.values())))

    
matriculas = pd.concat([
    matriculas.loc[lambda f: f["NU_ANO_CENSO"] == a, "ID_MATRICULA"].sample(1000) 
    for a in matriculas["NU_ANO_CENSO"].unique()
]).drop_duplicates()
matriculas.to_pickle(PASTA_TESTE_CODIGO / "matriculas.pkl")

del dados
while gc.collect():
    continue

---