In [1]:
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType
from graphframes import *

In [2]:
# Dados de carga
csv_pj = 'dataset/empresas_estabelecimentos_1m.csv'
csv_socios = 'dataset/socios_1m.csv'
maior_cnpj = '04258527'
menor_cnpj = '03230095'
parquet_nodes = 'dataset/grapho/nodes_1m'
parquet_edges = 'dataset/grapho/edges_1m'

In [3]:
spark = SparkSession.builder \
        .config("spark.driver.memory", "128g") \
        .appName("gera_arvore1") \
        .getOrCreate()

23/12/09 09:00:34 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
# Schema para Json Empresa
json_schema_em = StructType([
    StructField("porteEmpresa", StringType(), True),
    StructField("capitalSocial", StringType(), True),
    StructField("cpfResponsavel", StringType(), True),
    StructField("nomeEmpresarial", StringType(), True),
    StructField("naturezaJuridica", StringType(), True),
    StructField("qualificacaoResponsavel", StringType(), True)
])

# Schema para Json Estabelecimento
json_schema_es = StructType([
    StructField("uf", StringType(), True),
    StructField("cep", StringType(), True),
    StructField("ddd1", StringType(), True),
    StructField("ddd2", StringType(), True),
    StructField("pais", StringType(), True),
    StructField("email", StringType(), True),
    StructField("bairro", StringType(), True),
    StructField("numero", StringType(), True),
    StructField("municipio", StringType(), True),
    StructField("telefone1", StringType(), True),
    StructField("telefone2", StringType(), True),
    StructField("cnaeFiscal", StringType(), True),
    StructField("logradouro", StringType(), True),
    StructField("complemento", StringType(), True),
    StructField("dataCadastro", StringType(), True),
    StructField("nomeFantasia", StringType(), True),
    StructField("cidadeExterior", StringType(), True),
    StructField("tipoLogradouro", StringType(), True),
    StructField("cnaesSecundarias", StringType(), True),
    StructField("situacaoEspecial", StringType(), True),
    StructField("situacaoCadastral", StringType(), True),
    StructField("dataSituacaoEspecial", StringType(), True),
    StructField("dataSituacaoCadastral", StringType(), True),
    StructField("motivoSituacaoCadastral", StringType(), True),
    StructField("identificadorMatrizFilial", StringType(), True)
])

# Definindo o esquema para leitura do CSV
schema = StructType([
    StructField("nu_cnpj_raiz", StringType(), True),
    StructField("te_dados_em", StringType(), True),
    StructField("id_estabelecimento", StringType(), True),
    StructField("te_dados_es", StringType(), True)
])

# Lendo o CSV com o esquema definido
pj = spark.read \
    .option("delimiter", ",") \
    .option("multiline", "true") \
    .option("escape", "\"") \
    .csv(csv_pj, schema=schema, header=True)

pj = pj.withColumn("te_dados_em", from_json(col("te_dados_em"), json_schema_em)) \
      .withColumn("te_dados_es", from_json(col("te_dados_es"), json_schema_es)) \
      .filter(substring(col("id_estabelecimento"), 0, 4) == '0001') \
      .withColumnRenamed("nu_cnpj_raiz", "id")

pj = pj.orderBy(col("id").asc()).dropDuplicates(['id'])

# verificando unidade de chaves:
# count_df = pj.groupBy('id').count()
# repeated_cnpj = count_df.filter(col('count') > 1)
# repeated_cnpj.show()

In [5]:
# Schema para Json Socios
json_schema_sc = StructType([
    StructField("pais", StringType(), True),
    StructField("entradaSociedade", StringType(), True),
    StructField("socioEstrangeiro", StringType(), True),
    StructField("qualificacaoSocio", StringType(), True),
    StructField("identificadorSocio", StringType(), True),
    StructField("cpfRepresentanteLegal", StringType(), True),
    StructField("qualificacaoRepresentanteLegal", StringType(), True)
])

# Definindo o esquema para leitura do CSV
schema = StructType([
    StructField("nu_cnpj_raiz", StringType(), True),
    StructField("id_socio", StringType(), True),
    StructField("te_dados_sc", StringType(), True)
])

# Lendo o CSV com o esquema definido
socio = spark.read \
        .option("delimiter", ",") \
        .option("multiline", "true") \
        .option("escape", "\"") \
        .csv(csv_socios, schema=schema, header=True)

# tratando o df socios
# Tratamento 1: Expandir o JSON
socio = socio.withColumn("te_dados_sc", from_json(col("te_dados_sc"), json_schema_sc))

# converte cnpj para nu_cnpj_raiz
socio = socio.withColumn("id_socio", when(socio["te_dados_sc"]["identificadorSocio"] == '1',
                                    socio["id_socio"].substr(1, 8))
                                .otherwise(socio["id_socio"]))
# Remomeia fonte e destino
socio = socio.withColumnRenamed("nu_cnpj_raiz", "src").withColumnRenamed("id_socio", "dst")
# Ordena por src, dst e remove arestas dubplicadas
socio = socio.orderBy(col("src").asc(), col("dst").asc()).dropDuplicates(['src', 'dst'])

In [6]:
# Cria df para vértices do tipo Pessoa Física:
pf = socio.filter(col("te_dados_sc.identificadorSocio") == '2').select(col("dst").alias("id")).distinct()

# Cria df para vértices de PJs que não foram exportados, mas estão no dst dos sócios
filtro = (col("te_dados_sc.identificadorSocio") == '1') & ((col("dst") > maior_cnpj) | (col("dst") < menor_cnpj))
pj_extra = socio.filter(filtro).select(col("dst").alias("id")).distinct()

cols_pj = [col_name for col_name in pj.columns if col_name != 'id']
for col_name in cols_pj:
    pf = pf.withColumn(col_name, lit(None))
    pj_extra = pj_extra.withColumn(col_name, lit(None))

# cria tipo de pessoa nos vértices
pj = pj.withColumn("id_tipoPessoa", lit(1))
pj_extra = pj_extra.withColumn("id_tipoPessoa", lit(1))
pf = pf.withColumn("id_tipoPessoa", lit(2))

# Cria tipo de relacionamento no socio
socio = socio.withColumn("id_tipoRel", lit(1))

pj = pj.union(pj_extra)
vertex = pj.union(pf).orderBy("id")

# verificando unidade de chaves:
# count_df = vertex.groupBy('id').count()
# repeated_vertex = count_df.filter(col('count') > 1)
# repeated_vertex.show()

In [7]:
# gera arquivo parquet de socio
socio.write.mode('overwrite').parquet(parquet_edges)

                                                                                

In [8]:
# gera arquivo parquet de pessoas
pj.write.mode('overwrite').parquet(parquet_nodes)

23/12/09 09:01:04 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [9]:
spark.stop()