In [None]:
from pyspark.sql.session import SparkSession
import pyspark.sql.functions as f

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
emp = (
    spark.read.csv("EMPRESAS_CNPJ.csv", sep=';')
    .toDF(
        'cnpj_basico',
        'razao_social',
        'natureza_juridica',
        'qualificacao_responsavel',
        'capital_social',
        'porte',
        'ente_responsavel')
)

est = (
    spark.read.csv("ESTABELE_CNPJ.csv", sep=';')
    .toDF(
        'cnpj_basico',
        'cnpj_ordem',
        'cnpj_dv',
        'id_matriz',
        'nome_fantasia',
        'situacao',
        'situacao_data',
        'situacao_motivo',
        'cidade_exterior',
        "pais_codigo",
        'inicio_data',
        'cnae_principal',
        'cnae_secundario',
        'logradouro_tipo',
        'logradouro',
        'lograoduro_numero',
        'logradouro_complemento',
        'bairro',
        'cep',
        'uf',
        'municipio',
        'ddd1',
        'telefone1',
        'ddd2',
        'telefone2',
        'ddd_fax',
        'fax',
        'email',
        'situacao_especial',
        'situacao_especial_data')
)

In [None]:
est = est.filter(f.col('municipio') == 5413)

In [None]:
emp = (
    emp
    .select(
        'cnpj_basico',
        'natureza_juridica',
        'capital_social',
        'porte',
    )
)

est = (
    est
    .select(
        'cnpj_basico',
        'cnpj_ordem',
        'cnpj_dv',
        'id_matriz',
        'situacao',
        'situacao_data',
        'inicio_data',
        'cnae_principal',
        'cep',
    )
)

In [None]:
est = (
    est
    .join(emp, on='cnpj_basico', how='inner')
)

In [None]:
est.columns

### Identificador matriz

In [None]:
est = (
    est
    .withColumn('id_matriz', f.when(f.col('id_matriz') == 1, 1)
                 .otherwise(0))
)

### Situação

In [None]:
est.groupBy("situacao").count().show(50, False)

In [None]:
est = (
    est
    .withColumn('flag_ativa', f.when(f.col('situacao') == '02', 1)
                 .otherwise(0))
)

### Formatando as datas

In [None]:
est = (
    est
    .withColumn('situacao_data', f.to_date(f.col('situacao_data').cast('string'), "yyyyMMdd"))
    .withColumn('inicio_data', f.to_date(f.col('inicio_data').cast('string'), "yyyyMMdd"))
)

In [None]:
est = (
    est
    .withColumn('cnpj', f.concat_ws("",
                f.col('cnpj_basico'), f.col('cnpj_ordem'), f.col('cnpj_dv')
                ))
    .drop('cnpj_basico', 'cnpj_dv', 'cnpj_ordem')
)

In [None]:
est.groupBy('natureza_juridica').count().sort("count", ascending=False).show(500)

In [None]:
est = (
    est
    .withColumn('natureza_juridica', f.when(f.col('natureza_juridica') == 2135, "empresario_individual")
                 .when(f.col('natureza_juridica') == 2062, 'sociedade_empresaria_ltda')
                 .otherwise('outros'))
)

In [None]:
est = (
    est
    .withColumn("capital_social", f.regexp_replace('capital_social', ",", "."))
    .withColumn("capital_social", f.col('capital_social').cast('float'))
)

In [None]:
est.select("capital_social").toPandas().describe()

In [None]:
est.write.parquet("./Dados_tratados")