In [1]:
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [5]:
import findspark
findspark.init()

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as func
import zipfile

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .getOrCreate()

In [8]:
# zipfile.ZipFile("/content/drive/MyDrive/mock-spark/companies.zip", "r").extractall("/content/drive/MyDrive/mock-spark")
# zipfile.ZipFile("/content/drive/MyDrive/mock-spark/partners.zip", "r").extractall("/content/drive/MyDrive/mock-spark")
# zipfile.ZipFile("/content/drive/MyDrive/mock-spark/establishments.zip", "r").extractall("/content/drive/MyDrive/mock-spark")


In [9]:
companies = spark.read.csv("/content/drive/MyDrive/mock-spark/companies", sep=";", inferSchema=True)
partners = spark.read.csv("/content/drive/MyDrive/mock-spark/partners", sep=";", inferSchema=True)
establishments = spark.read.csv("/content/drive/MyDrive/mock-spark/establishments", sep=";", inferSchema=True)


In [10]:
companies.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [11]:
companiesColNames = ['cnpj_base', 'companies_name', 'legal_nature', 'qualification_responsible', 'companies_share_capital', 'size_companies', 'federal_responsible']
partnersColNames = ['cnpj_base', 'partners_identifier', 'partners_name_or_social_reason', 'cnpj_or_partners_cpf', 'partners_qualification', 'date_entry_society', 'country', 'legal_representative', 'representative_name', 'qualification_legal_representative', 'age_group']
establishmentsColNames = ['cnpj_base', 'cnpj_origin', 'cnpj_dv', 'identifier_matriz_filial', 'fantasy_name', 'situation_cadastral', 'date', 'reason_cadastral_status', 'overseas_city_name', 'country', 'date_start_activity', 'cnae_main_tax', 'cnae_fiscal_secondary', 'street_type', 'street', 'number', 'complement', 'neighborhood', 'zip_code', 'uf', 'city', 'ddd_1', 'phone_1', 'ddd_2', 'phone_2', 'ddd_fax', 'fax', 'email', 'special_situation', 'date_special_situation']

def renameColumns(df, newColumns):
  for index, colName in enumerate(newColumns):
     df = df.withColumnRenamed(f"_c{index}", colName)
  return df

def changeTypeColumn(df, columnName, newType):
   df = df.withColumn(columnName, df[columnName].cast(newType))
   return df 

def formatDateType(df, columnName, setString):
   df = df.withColumn(columnName, func.to_date(df[columnName].cast(setString), "yyyyMMdd"))
   return df 

def replaceValueColumn(df, columnName, oldValue, newValue):
  df = df.withColumn(columnName, func.regexp_replace(columnName, oldValue, newValue))
  return df


In [12]:
companies = renameColumns(companies, companiesColNames)
partners = renameColumns(partners, partnersColNames)
establishments = renameColumns(establishments, establishmentsColNames)


In [13]:
companies.limit(5).toPandas()

Unnamed: 0,cnpj_base,companies_name,legal_nature,qualification_responsible,companies_share_capital,size_companies,federal_responsible
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [14]:
companies.printSchema()

root
 |-- cnpj_base: integer (nullable = true)
 |-- companies_name: string (nullable = true)
 |-- legal_nature: integer (nullable = true)
 |-- qualification_responsible: integer (nullable = true)
 |-- companies_share_capital: string (nullable = true)
 |-- size_companies: integer (nullable = true)
 |-- federal_responsible: string (nullable = true)



In [15]:
companies = replaceValueColumn(companies, "companies_share_capital", ",", ".")
companies = changeTypeColumn(companies, "companies_share_capital", DoubleType())
establishments = formatDateType(establishments, "date", StringType())
establishments = formatDateType(establishments, "date_start_activity", StringType())
partners = formatDateType(partners, "date_entry_society", StringType())

companies.printSchema()

root
 |-- cnpj_base: integer (nullable = true)
 |-- companies_name: string (nullable = true)
 |-- legal_nature: integer (nullable = true)
 |-- qualification_responsible: integer (nullable = true)
 |-- companies_share_capital: double (nullable = true)
 |-- size_companies: integer (nullable = true)
 |-- federal_responsible: string (nullable = true)



In [16]:
establishments.printSchema()

root
 |-- cnpj_base: integer (nullable = true)
 |-- cnpj_origin: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identifier_matriz_filial: integer (nullable = true)
 |-- fantasy_name: string (nullable = true)
 |-- situation_cadastral: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- overseas_city_name: string (nullable = true)
 |-- country: integer (nullable = true)
 |-- date_start_activity: date (nullable = true)
 |-- cnae_main_tax: integer (nullable = true)
 |-- cnae_fiscal_secondary: string (nullable = true)
 |-- street_type: string (nullable = true)
 |-- street: string (nullable = true)
 |-- number: string (nullable = true)
 |-- complement: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- zip_code: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- city: integer (nullable = true)
 |-- ddd_1: string (nullable = true)
 |-- phone_1: string (nullable =

In [17]:
partners.printSchema()

root
 |-- cnpj_base: integer (nullable = true)
 |-- partners_identifier: integer (nullable = true)
 |-- partners_name_or_social_reason: string (nullable = true)
 |-- cnpj_or_partners_cpf: string (nullable = true)
 |-- partners_qualification: integer (nullable = true)
 |-- date_entry_society: date (nullable = true)
 |-- country: integer (nullable = true)
 |-- legal_representative: string (nullable = true)
 |-- representative_name: string (nullable = true)
 |-- qualification_legal_representative: integer (nullable = true)
 |-- age_group: integer (nullable = true)



In [18]:
companies.limit(5).toPandas()

Unnamed: 0,cnpj_base,companies_name,legal_nature,qualification_responsible,companies_share_capital,size_companies,federal_responsible
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,


In [27]:
partners\
  .select("partners_name_or_social_reason", "age_group", func.year("date_entry_society").alias("year_entry_society"))\
  .orderBy(["year_entry_society", "age_group"], ascending=[False, False])\
  .show(5, False)

+------------------------------+---------+------------------+
|partners_name_or_social_reason|age_group|year_entry_society|
+------------------------------+---------+------------------+
|EDGAR FRANCISCO DA SILVA      |9        |2021              |
|WILLIAM WHITING BEACH VEALE   |9        |2021              |
|ANTONIO TAVARES DE ANDRADE    |9        |2021              |
|AURA MARIA DE ANDRADE         |9        |2021              |
|ANTONIA DE SOUSA VIEIRA       |9        |2021              |
+------------------------------+---------+------------------+
only showing top 5 rows



In [21]:
partners\
  .select([func.count(func.when(func.isnull(column), 1)).alias(column) for column in partners.columns])\
  .show()

+---------+-------------------+------------------------------+--------------------+----------------------+------------------+-------+--------------------+-------------------+----------------------------------+---------+
|cnpj_base|partners_identifier|partners_name_or_social_reason|cnpj_or_partners_cpf|partners_qualification|date_entry_society|country|legal_representative|representative_name|qualification_legal_representative|age_group|
+---------+-------------------+------------------------------+--------------------+----------------------+------------------+-------+--------------------+-------------------+----------------------------------+---------+
|        0|                  0|                           208|                1234|                     0|                 1|2038255|                   0|            1995432|                                 0|        0|
+---------+-------------------+------------------------------+--------------------+----------------------+--------------

In [28]:
companies\
  .where("companies_share_capital==50")\
  .show(5, False)

+---------+------------------------------------+------------+-------------------------+-----------------------+--------------+-------------------+
|cnpj_base|companies_name                      |legal_nature|qualification_responsible|companies_share_capital|size_companies|federal_responsible|
+---------+------------------------------------+------------+-------------------------+-----------------------+--------------+-------------------+
|17350147 |ERIK MARCELO DOS SANTOS 42107848858 |2135        |50                       |50.0                   |1             |null               |
|17833214 |ALEXANDRE MACHADO LIMA 73750123772  |2135        |50                       |50.0                   |1             |null               |
|20860830 |YASMIN MOURA DA FONSECA 13457709793 |2135        |50                       |50.0                   |1             |null               |
|22242856 |JOAO CESAR MESSIAS 08707149883      |2135        |50                       |50.0                   |1      

In [41]:
partners\
  .select("partners_name_or_social_reason")\
  .where(partners["partners_name_or_social_reason"].startswith("MARIA"))\
  .where(partners["partners_name_or_social_reason"].endswith("SILVA"))\
  .limit(20)\
  .show(20, False)

+----------------------------------+
|partners_name_or_social_reason    |
+----------------------------------+
|MARIA LUIZA DA SILVA              |
|MARIA ANGELICA DO NASCIMENTO SILVA|
|MARIA ALEXANDRINA DA SILVA        |
|MARIA DE FATIMA DA SILVA          |
|MARIA VALDETE FERNANDES DA SILVA  |
|MARIA ISABEL DA SILVA             |
|MARIA DE FATIMA BATISTA DA SILVA  |
|MARIA DE FATIMA DA SILVA          |
|MARIA DA PENHA HENRIQUE DA SILVA  |
|MARIA ANTONIA NOGUEIRA SILVA      |
|MARIA APARECIDA DA SILVA          |
|MARIA EMILIA PEREIRA DA SILVA     |
|MARIA GORETI BOTELHO DA SILVA     |
|MARIA DA PIEDADE SILVA            |
|MARIA MADALENA VIEIRA DA SILVA    |
|MARIA NAILZA DA SILVA             |
|MARIA APARECIDA DUARTE DA SILVA   |
|MARIA DO SOCORRO TAVARES DA SILVA |
|MARIA CRISTINA SANCHES DA SILVA   |
|MARIA DO SOCORRO DA SILVA         |
+----------------------------------+



In [50]:
partners\
  .select("partners_name_or_social_reason")\
  .where(partners["partners_name_or_social_reason"].like("ERIK %"))\
  .show(truncate=False)

+-------------------------------+
|partners_name_or_social_reason |
+-------------------------------+
|ERIK GUSTAF SAMUEL ANDERSSON   |
|ERIK CARNEIRO DA SILVA         |
|ERIK PEROTTA BIRKELAND         |
|ERIK CRANSTON WOODHEAD SIEGMANN|
|ERIK JEAN DE ASSIS MENEZES     |
|ERIK DOUGLAS DE ANDRADA        |
|ERIK ROCUMBACK                 |
|ERIK LIMA VALENTIM             |
|ERIK KURKOWSKI WEBER           |
|ERIK KAZUO TAKARA              |
|ERIK BRUNO VILELA DOS SANTOS   |
|ERIK DE MARIA                  |
|ERIK JAYSON DE QUEIROZ SORIANO |
|ERIK DUARTE BIANCHI            |
|ERIK DUARTE RODRIGUES          |
|ERIK DA SILVA                  |
|ERIK PETERSON DE JESUS         |
|ERIK STEIN VIEIRA MANICOBA     |
|ERIK SPADA DA SILVA            |
|ERIK GALARDI                   |
+-------------------------------+
only showing top 20 rows



In [53]:
partners\
  .select(func.year("date_entry_society").alias("entry_year"))\
  .where("entry_year >= 2010")\
  .groupBy("entry_year")\
  .count()\
  .orderBy("entry_year", ascending=True)\
  .show()

+----------+------+
|entry_year| count|
+----------+------+
|      2010| 79337|
|      2011| 83906|
|      2012| 80101|
|      2013| 83919|
|      2014| 80590|
|      2015| 80906|
|      2016| 81587|
|      2017| 90221|
|      2018| 99935|
|      2019|118248|
|      2020|125927|
|      2021| 56316|
+----------+------+



In [54]:
companies\
  .select("cnpj_base", "size_companies", "companies_share_capital")\
  .groupBy("size_companies")\
  .agg(
      func.mean("companies_share_capital").alias("mean_share_capital"),
      func.count("cnpj_base").alias("frequency")
  )\
  .orderBy("size_companies", ascending=True)\
  .show()

+--------------+------------------+---------+
|size_companies|mean_share_capital|frequency|
+--------------+------------------+---------+
|          null|  8.35421888053467|     5985|
|             1|339994.53313506936|  3129043|
|             3|2601001.7677092673|   115151|
|             5| 708660.4208249798|  1335500|
+--------------+------------------+---------+



In [65]:
companiesWithEstablishments = establishments.join(companies, "cnpj_base", how="inner")
companiesWithEstablishments.printSchema()

root
 |-- cnpj_base: integer (nullable = true)
 |-- cnpj_origin: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identifier_matriz_filial: integer (nullable = true)
 |-- fantasy_name: string (nullable = true)
 |-- situation_cadastral: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- overseas_city_name: string (nullable = true)
 |-- country: integer (nullable = true)
 |-- date_start_activity: date (nullable = true)
 |-- cnae_main_tax: integer (nullable = true)
 |-- cnae_fiscal_secondary: string (nullable = true)
 |-- street_type: string (nullable = true)
 |-- street: string (nullable = true)
 |-- number: string (nullable = true)
 |-- complement: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- zip_code: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- city: integer (nullable = true)
 |-- ddd_1: string (nullable = true)
 |-- phone_1: string (nullable =

In [68]:
freq = companiesWithEstablishments\
  .select(
      "cnpj_base",
      func.year("date_start_activity").alias("start_date")
  ).where("start_date >= 2010")\
  .groupBy("start_date")\
  .agg(func.count("cnpj_base").alias("frequency"))\
  .orderBy("start_date", ascening=True)\


In [71]:
freq.union(
    freq.select(
        func.lit("Total").alias("start_date"),
        func.sum(freq.frequency).alias("frequency")
    )
).show()



+----------+---------+
|start_date|frequency|
+----------+---------+
|      2010|   154159|
|      2011|   172677|
|      2012|   232480|
|      2013|   198424|
|      2014|   202276|
|      2015|   212523|
|      2016|   265417|
|      2017|   237292|
|      2018|   275435|
|      2019|   325922|
|      2020|   400654|
|      2021|   153275|
|     Total|  2830534|
+----------+---------+



In [72]:
companiesWithEstablishments.createOrReplaceTempView("companiesWithEstablishmentsView")



In [75]:
freqSQL = spark\
  .sql("""
      SELECT YEAR(date_start_activity) AS start_date, COUNT(cnpj_base) AS frequency
        FROM companiesWithEstablishmentsView
        WHERE YEAR(date_start_activity) >= 2010
        GROUP BY start_date
        ORDER BY start_date
    """)

freqSQL.show()

+----------+---------+
|start_date|frequency|
+----------+---------+
|      2010|   154159|
|      2011|   172677|
|      2012|   232480|
|      2013|   198424|
|      2014|   202276|
|      2015|   212523|
|      2016|   265417|
|      2017|   237292|
|      2018|   275435|
|      2019|   325922|
|      2020|   400654|
|      2021|   153275|
+----------+---------+



In [81]:
freqSQL.createOrReplaceTempView("freqSQLView")

In [82]:
spark\
  .sql("""
    SELECT * 
      FROM freqSQLView
    UNION ALL 
    SELECT 'Total' AS start_date, SUM(frequency) AS frequency 
      FROM freqSQLView
  """)\
  .show()

+----------+---------+
|start_date|frequency|
+----------+---------+
|      2010|   154159|
|      2011|   172677|
|      2012|   232480|
|      2013|   198424|
|      2014|   202276|
|      2015|   212523|
|      2016|   265417|
|      2017|   237292|
|      2018|   275435|
|      2019|   325922|
|      2020|   400654|
|      2021|   153275|
|     Total|  2830534|
+----------+---------+



In [88]:
companies.write.parquet(
    path="/content/drive/MyDrive/mock-spark/output/companies",
    mode="overwrite",
)

In [89]:
partners.write.parquet(
    "/content/drive/MyDrive/mock-spark/output/partners",
    mode="overwrite",
)

In [95]:
establishments.write.csv(
    path="/content/drive/MyDrive/mock-spark/output/establishments",
    mode="overwrite",
    sep=";",
    header=True
)

In [96]:
companiesWithEstablishments.coalesce(1).write.csv(
    path="/content/drive/MyDrive/mock-spark/output/companies_establishments",
    mode="overwrite",
    sep=";",
    header=True
)