In [1]:
import os
from dotenv import load_dotenv
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import regexp_replace, col, to_date

load_dotenv()

print(">> Process search")
print("Start Spark")

# Create session Spark
spark = SparkSession.builder \
    .appName("ProcessCNPJ") \
    .config("spark.sql.decimalOperations.showTrailingZeroes", "false") \
    .config("spark.jars.packages", "org.elasticsearch:elasticsearch-spark-30_2.12:8.13.4") \
    .config("spark.jars", os.getenv("SPARK_JARS")) \
    .getOrCreate()

>> Process search
Start Spark


24/05/24 22:15:12 WARN Utils: Your hostname, vmi1385443.contaboserver.net resolves to a loopback address: 127.0.1.1; using 154.53.35.160 instead (on interface eth0)
24/05/24 22:15:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.elasticsearch#elasticsearch-spark-30_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-77744c15-662e-4f82-985b-b912d23a90ef;1.0
	confs: [default]
	found org.elasticsearch#elasticsearch-spark-30_2.12;8.13.4 in central
	found org.scala-lang#scala-reflect;2.12.8 in central
	found org.slf4j#slf4j-api;1.7.6 in central
	found commons-logging#commons-logging;1.1.1 in central
	found javax.xml.bind#jaxb-api;2.3.1 in central
	found com.google.protobuf#protobuf-java;2.5.0 in central
	found org.apache.spark#spark-yarn_2.12;3.3.3 in central
:: resolution report :: resolve 361ms :: artifacts dl 11ms
	:: modules in use:
	com.google.protobuf#protobuf-java;2.5.0 from central in [default]
	commons-logging#commons-logging;1.1.1 from central in [default]
	javax.xml.bind#jaxb-api;2.3.1 from central in [default]
	org.apache.spark#spark-yarn_2.12;3.3.3 from central 

In [7]:
# spark.stop()

In [2]:
# Define the schema based on the dictionary
schema = StructType([
    StructField("cnpj_basico", StringType(), True),
    StructField("razao_social", StringType(), True),
    StructField("natureza_juridica", StringType(), True),
    StructField("qualificacao_responsavel", StringType(), True),
    StructField("capital_social", StringType(), True),
    StructField("porte_empresa", IntegerType(), True),
    StructField("ente_federativo_responsavel", StringType(), True)
])

empresas_df = spark.read.option("header", "false").option("encoding", "ISO-8859-1").option("delimiter", ";").option("quote", '"').option("escape", "").schema(schema).csv("../downloads/empresas")

empresas_df = empresas_df.withColumn("capital_social", regexp_replace(col("capital_social"), ",", "."))
empresas_df = empresas_df.withColumn("capital_social", col("capital_social").cast("double"))

schema = StructType([
    StructField("cnpj_basico", StringType(), True),
    StructField("cnpj_ordem", StringType(), True),
    StructField("cnpj_dv", StringType(), True),
    StructField("identificador_matriz_filial", IntegerType(), True),
    StructField("nome_fantasia", StringType(), True),
    StructField("situacao_cadastral", IntegerType(), True),
    StructField("data_situacao_cadastral", StringType(), True),
    StructField("motivo_situacao_cadastral", StringType(), True),
    StructField("nome_cidade_exterior", StringType(), True),
    StructField("pais", StringType(), True),
    StructField("data_inicio_atividade", StringType(), True),
    StructField("cnae_fiscal_principal", StringType(), True),
    StructField("cnae_fiscal_secundaria", StringType(), True),
    StructField("tipo_logradouro", StringType(), True),
    StructField("logradouro", StringType(), True),
    StructField("numero", StringType(), True),
    StructField("complemento", StringType(), True),
    StructField("bairro", StringType(), True),
    StructField("cep", StringType(), True),
    StructField("uf", StringType(), True),
    StructField("municipio", StringType(), True),
    StructField("ddd_1", StringType(), True),
    StructField("telefone_1", StringType(), True),
    StructField("ddd_2", StringType(), True),
    StructField("telefone_2", StringType(), True),
    StructField("ddd_fax", StringType(), True),
    StructField("fax", StringType(), True),
    StructField("correio_eletronico", StringType(), True),
    StructField("situacao_especial", StringType(), True),
    StructField("data_situacao_especial", StringType(), True)
])

estabelecimentos_df = spark.read.option("header", "false").option("encoding", "ISO-8859-1").option("delimiter", ";").option("quote", '"').option("escape", "").schema(schema).csv("../downloads/estabelecimentos")

empresas_df.createOrReplaceTempView("emp")
estabelecimentos_df.createOrReplaceTempView("est")

24/05/24 22:15:18 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [3]:
#  mostrar as primeiras 5 linhas de cada dataframe
print("Empresas")
empresas_df.show(5)
print("Estabelecimentos")
estabelecimentos_df.show(5)


Empresas
+-----------+--------------------+-----------------+------------------------+--------------+-------------+---------------------------+
|cnpj_basico|        razao_social|natureza_juridica|qualificacao_responsavel|capital_social|porte_empresa|ente_federativo_responsavel|
+-----------+--------------------+-----------------+------------------------+--------------+-------------+---------------------------+
|   22967214|NILSON DA SILVA S...|             2135|                      50|       24000.0|            1|                          "|
|   22967215|VIVIAN CRISTINA F...|             2135|                      50|        5000.0|            1|                          "|
|   22967216|PALOMA GOMES DE O...|             2135|                      50|           1.0|            1|                          "|
|   22967217|TEREZA DIAS COSTA...|             2135|                      50|       26400.0|            1|                          "|
|   22967218|LUIS CARLOS CASTA...|            

In [4]:
# Define a consulta SQL
sql_query = """
    SELECT 
        CONCAT(est.cnpj_basico, est.cnpj_ordem, est.cnpj_dv) AS cnpj,
        emp.razao_social,
        emp.capital_social,
        est.nome_fantasia,
        est.situacao_cadastral,
        est.cnae_fiscal_principal,
        est.uf,
        est.municipio
    FROM est
    INNER JOIN emp
        ON est.cnpj_basico = emp.cnpj_basico
"""

# Executa a consulta e mostra o resultado
result = spark.sql(sql_query)

result.write.format("org.elasticsearch.spark.sql") \
    .option("es.resource", "opencnpj/_doc") \
    .option("es.nodes", "0.0.0.0:9200") \
    .save()

# result.show(5)
# result.write.option("header", "true").csv("../data/search")

24/05/24 22:15:26 WARN RestClient: Could not verify server is Elasticsearch! ES-Hadoop will require server validation when connecting to an Elasticsearch cluster if that Elasticsearch cluster is v7.14 and up.
24/05/24 22:15:26 WARN Resource: Detected type name in resource [opencnpj/_doc]. Type names are deprecated and will be removed in a later release.
24/05/24 22:15:26 WARN RestClient: Could not verify server is Elasticsearch! ES-Hadoop will require server validation when connecting to an Elasticsearch cluster if that Elasticsearch cluster is v7.14 and up.
24/05/24 22:17:34 WARN RestClient: Could not verify server is Elasticsearch! ES-Hadoop will require server validation when connecting to an Elasticsearch cluster if that Elasticsearch cluster is v7.14 and up.
24/05/24 22:17:34 WARN RestClient: Could not verify server is Elasticsearch! ES-Hadoop will require server validation when connecting to an Elasticsearch cluster if that Elasticsearch cluster is v7.14 and up.
24/05/24 22:17:34