In [24]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Joins") \
    .getOrCreate()

spark

In [25]:
# imports and path
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as func
from pyspark.sql import DataFrame

pathEmpresas = "../datasets/alura/01/empresas/"
pathSocios = "../datasets/alura/01/socios/"
pathEstabs = "../datasets/alura/01/estabelecimentos/"

In [26]:
empresas = spark.read.csv(pathEmpresas, sep=';', inferSchema=True)
empresas.show(5)



+----+--------------------+----+---+-------+---+----+
| _c0|                 _c1| _c2|_c3|    _c4|_c5| _c6|
+----+--------------------+----+---+-------+---+----+
| 306|FRANCAMAR REFRIGE...|2240| 49|   0,00|  1|null|
|1355|BRASILEIRO & OLIV...|2062| 49|   0,00|  5|null|
|4820|REGISTRO DE IMOVE...|3034| 32|   0,00|  5|null|
|5347|ROSELY APARECIDA ...|2135| 50|   0,00|  5|null|
|6846|BADU E FILHOS TEC...|2062| 49|4000,00|  1|null|
+----+--------------------+----+---+-------+---+----+
only showing top 5 rows



                                                                                

In [27]:
socios = spark.read.csv(pathSocios, sep=';', inferSchema=True)
empresas.show(5)



+----+--------------------+----+---+-------+---+----+
| _c0|                 _c1| _c2|_c3|    _c4|_c5| _c6|
+----+--------------------+----+---+-------+---+----+
| 306|FRANCAMAR REFRIGE...|2240| 49|   0,00|  1|null|
|1355|BRASILEIRO & OLIV...|2062| 49|   0,00|  5|null|
|4820|REGISTRO DE IMOVE...|3034| 32|   0,00|  5|null|
|5347|ROSELY APARECIDA ...|2135| 50|   0,00|  5|null|
|6846|BADU E FILHOS TEC...|2062| 49|4000,00|  1|null|
+----+--------------------+----+---+-------+---+----+
only showing top 5 rows



                                                                                

In [28]:

estabs = spark.read.csv(pathEstabs, sep=';', inferSchema=True)
estabs.show(5)



+----+---+---+---+-----------------+---+--------+---+----+----+--------+-------+----+----+--------------------+----+-------+------------------+-------+----+----+----+----+----+----+----+----+----+----+----+
| _c0|_c1|_c2|_c3|              _c4|_c5|     _c6|_c7| _c8| _c9|    _c10|   _c11|_c12|_c13|                _c14|_c15|   _c16|              _c17|   _c18|_c19|_c20|_c21|_c22|_c23|_c24|_c25|_c26|_c27|_c28|_c29|
+----+---+---+---+-----------------+---+--------+---+----+----+--------+-------+----+----+--------------------+----+-------+------------------+-------+----+----+----+----+----+----+----+----+----+----+----+
|1879|  1| 96|  1|   PIRAMIDE M. C.|  8|20011029|  1|null|null|19940509|1412602|null| RUA|     JOSE FIGLIOLINI| 608|   null|         VILA NILO|2278020|  SP|7107|null|null|null|null|null|null|null|null|null|
|2818|  1| 43|  1|             null|  8|20081231| 71|null|null|19940512|4671100|null| RUA|              BAQUIA| 416|   null|VL NOVA MANCHESTER|3443000|  SP|7107|null|null|n

                                                                                

In [29]:
def rename(df: DataFrame, names=[]) -> DataFrame:
    for idx, name in enumerate(names):
        df = df.withColumnRenamed(f"_c{idx}", name)
    # display(res)
    return df

In [30]:
empresColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']
sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']


In [31]:
empresas = rename(empresas, empresColNames)
empresas = empresas.withColumn('capital_social_da_empresa', func.regexp_replace('capital_social_da_empresa', ',', '.'))
empresas = empresas.withColumn('capital_social_da_empresa', empresas['capital_social_da_empresa'].cast(DoubleType()))
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [32]:
# for idx, name in enumerate(estabsColNames):
#         estabs = estabs.withColumnRenamed(f"_c{idx}", name)

estabs = rename(estabs, estabsColNames)

In [33]:
estabs = estabs.withColumn(
        'data_situacao_cadastral', func.to_date(estabs.data_situacao_cadastral.cast(StringType()), 'yyyyMMdd')
    ).withColumn(
        'data_de_inicio_atividade', func.to_date(estabs.data_de_inicio_atividade.cast(StringType()), 'yyyyMMdd')
    ).withColumn(
        'data_da_situacao_especial', func.to_date(estabs.data_da_situacao_especial.cast(StringType()), 'yyyyMMdd')
    )

In [34]:
sel = estabs.select(
    'cnpj_basico',
    func.year('data_situacao_cadastral').alias('cadastral'),
    func.year('data_de_inicio_atividade').alias('inicio'),
    func.year('data_da_situacao_especial').alias('epecial')
)

sel.limit(50).show()

+-----------+---------+------+-------+
|cnpj_basico|cadastral|inicio|epecial|
+-----------+---------+------+-------+
|       1879|     2001|  1994|   null|
|       2818|     2008|  1994|   null|
|       3110|     1997|  1994|   null|
|       3733|     2008|  1994|   null|
|       4628|     1998|  1995|   null|
|       4628|     2004|  1999|   null|
|       4628|     1996|  1995|   null|
|       4628|     2005|  1994|   null|
|      10804|     2017|  1994|   null|
|      11448|     2008|  1994|   null|
|      13566|     2000|  1994|   null|
|      15324|     2008|  1994|   null|
|      17853|     2007|  1994|   null|
|      18103|     2008|  1994|   null|
|      22504|     2015|  1994|   null|
|      23494|     2015|  1994|   null|
|      23494|     2015|  2004|   null|
|      24117|     1985|  1968|   null|
|      24242|     2008|  1994|   null|
|      24323|     2008|  1982|   null|
+-----------+---------+------+-------+
only showing top 20 rows



In [35]:
socios = rename(socios, sociosColNames)
socios = socios\
    .withColumn(
        "data_de_entrada_sociedade", 
        func.to_date(socios.data_de_entrada_sociedade.cast(StringType()), 'yyyyMMdd')
    )
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [36]:
empresas_join = estabs.join(empresas, 'cnpj_basico', 'inner')

In [37]:
empresas_join.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [38]:
empresas_join.select(
    'cnpj_basico',
    # func.isnull('data_de_inicio_atividade').alias('data_de_inicio_atividade'),
    func.year('data_de_inicio_atividade').alias('data_de_inicio')
).show(10, False)



+-----------+--------------+
|cnpj_basico|data_de_inicio|
+-----------+--------------+
|243        |1994          |
|336        |1994          |
|362        |1994          |
|451        |1994          |
|458        |1994          |
|481        |1994          |
|513        |1981          |
|633        |1994          |
|633        |2000          |
|642        |1994          |
+-----------+--------------+
only showing top 10 rows



                                                                                

In [39]:
freq = empresas_join.select(
    'cnpj_basico',
    func.year('data_de_inicio_atividade').alias('data_de_inicio')
)\
.where('data_de_inicio >= 2010')\
.groupBy('data_de_inicio')\
.agg(func.count('cnpj_basico').alias('frequencia'))\
.orderBy('data_de_inicio', ascending=True)

In [40]:
freq.toPandas()

22/04/24 21:52:14 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:14 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:14 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:14 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:14 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:14 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:14 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:14 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                

Unnamed: 0,data_de_inicio,frequencia
0,2010,154159
1,2011,172677
2,2012,232480
3,2013,198424
4,2014,202276
5,2015,212523
6,2016,265417
7,2017,237292
8,2018,275435
9,2019,325922


In [41]:
freq.union(
    freq.select(
        func.lit('Total').alias('data_de_inicio'),
        func.sum(freq.frequencia).alias('frequencia')
    )
).show()

22/04/24 21:52:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:29 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:29 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.

+--------------+----------+
|data_de_inicio|frequencia|
+--------------+----------+
|          2010|    154159|
|          2011|    172677|
|          2012|    232480|
|          2013|    198424|
|          2014|    202276|
|          2015|    212523|
|          2016|    265417|
|          2017|    237292|
|          2018|    275435|
|          2019|    325922|
|          2020|    400654|
|          2021|    153275|
|         Total|   2830534|
+--------------+----------+



                                                                                

In [42]:
empresas_join.createOrReplaceTempView("empresasJoinView")

In [43]:
res = spark.sql("""
            SELECT 
                YEAR(data_de_inicio_atividade) AS data_de_inicio,
                COUNT(cnpj_basico) AS count
            FROM empresasJoinView 
            WHERE YEAR(data_de_inicio_atividade) >= 2010
            GROUP BY data_de_inicio
            ORDER BY data_de_inicio
                """)
res.toPandas()

22/04/24 21:52:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                

Unnamed: 0,data_de_inicio,count
0,2010,154159
1,2011,172677
2,2012,232480
3,2013,198424
4,2014,202276
5,2015,212523
6,2016,265417
7,2017,237292
8,2018,275435
9,2019,325922


In [44]:
res.createOrReplaceTempView("freqView")

In [45]:
res = spark.sql("""
            SELECT * FROM freqView 
            UNION ALL SELECT 'Total' AS data_de_inicio, SUM(count) as count FROM freqView
            """)
res.toPandas()

22/04/24 21:52:57 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:57 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:57 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:57 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:57 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:57 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:57 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/24 21:52:57 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                

Unnamed: 0,data_de_inicio,count
0,2010,154159
1,2011,172677
2,2012,232480
3,2013,198424
4,2014,202276
5,2015,212523
6,2016,265417
7,2017,237292
8,2018,275435
9,2019,325922
