In [1]:
import os
os.environ.get('JAVA_HOME')
os.environ.get('SPARK_HOME')

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Starting with Spark") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

23/06/14 20:20:27 WARN Utils: Your hostname, Viniciuss-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 172.16.0.149 instead (on interface en0)
23/06/14 20:20:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/14 20:20:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark

In [5]:
## Simple example, how to create a Spark DataFrame

data = [('Will', '23'), ('Bill', '32')]
colNames = ['name', 'age']
df = spark.createDataFrame(data, colNames)
df.show()

                                                                                

+----+---+
|name|age|
+----+---+
|Will| 23|
|Bill| 32|
+----+---+



In [6]:
# Using the Brazil Receita Federal data about business registration number (CNPJ);

business = spark.read.csv('data/empresas/', sep=';', inferSchema=True)

business_col_names = ['root_cnpj', 'company_legal_name', 'legal_nature', 'responsible_qualification', \
    'company_share_capital', 'business_size', 'federative_entity_responsible']

for index, col_name in enumerate(business_col_names):
    business = business.withColumnRenamed(f"_c{index}", col_name)

                                                                                

In [7]:
establishments = spark.read.csv('data/estabelecimentos/', sep=';', inferSchema=True)

estab_col_names = ['root_cnpj', 'order_cnpj', 'cnpj_dv', 'id_hq_or_branch', 'company_fantasy_name',\
    'registration_status', 'date_registration_status', 'reason_registration_status', 'city_outside_name',\
        'country', 'activity_start_date', 'main_cnae', 'secundary_cnae',\
            'street_type', 'street', 'number', 'complement', 'neighborhood', 'zip_code', 'state_code', \
                'city', 'ddd_1', 'phone_1', 'ddd_2', 'phone_2', 'ddd_fax', 'fax', \
                    'email', 'special_situation', 'date_special_situation']

for index, col_name in enumerate(estab_col_names):
    establishments = establishments.withColumnRenamed(f"_c{index}", col_name)

                                                                                

In [8]:
partners = spark.read.csv('data/socios/', sep=';', inferSchema=True)

partners_col_names = ['root_cnpj', 'partner_id', 'partner_name_or_legal_name', \
    'cnpj_cpf_partner', 'partner_qualification', 'entry_date_partnership', 'country', 'legal_representative',\
        'representative_name', 'legal_representative_qualification', 'age_group']

for index, col_name in enumerate(partners_col_names):
    partners = partners.withColumnRenamed(f"_c{index}", col_name)

                                                                                

In [9]:
business.printSchema()

root
 |-- root_cnpj: integer (nullable = true)
 |-- company_legal_name: string (nullable = true)
 |-- legal_nature: integer (nullable = true)
 |-- responsible_qualification: integer (nullable = true)
 |-- company_share_capital: string (nullable = true)
 |-- business_size: integer (nullable = true)
 |-- federative_entity_responsible: string (nullable = true)



In [10]:
partners.printSchema()

root
 |-- root_cnpj: integer (nullable = true)
 |-- partner_id: integer (nullable = true)
 |-- partner_name_or_legal_name: string (nullable = true)
 |-- cnpj_cpf_partner: string (nullable = true)
 |-- partner_qualification: integer (nullable = true)
 |-- entry_date_partnership: integer (nullable = true)
 |-- country: integer (nullable = true)
 |-- legal_representative: string (nullable = true)
 |-- representative_name: string (nullable = true)
 |-- legal_representative_qualification: integer (nullable = true)
 |-- age_group: integer (nullable = true)



In [11]:
establishments.printSchema()

root
 |-- root_cnpj: integer (nullable = true)
 |-- order_cnpj: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- id_hq_or_branch: integer (nullable = true)
 |-- company_fantasy_name: string (nullable = true)
 |-- registration_status: integer (nullable = true)
 |-- date_registration_status: integer (nullable = true)
 |-- reason_registration_status: integer (nullable = true)
 |-- city_outside_name: string (nullable = true)
 |-- country: integer (nullable = true)
 |-- activity_start_date: integer (nullable = true)
 |-- main_cnae: integer (nullable = true)
 |-- secundary_cnae: string (nullable = true)
 |-- street_type: string (nullable = true)
 |-- street: string (nullable = true)
 |-- number: string (nullable = true)
 |-- complement: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- zip_code: integer (nullable = true)
 |-- state_code: string (nullable = true)
 |-- city: integer (nullable = true)
 |-- ddd_1: string (nullable = true)
 |-- phone_

In [12]:
# Converting string to double type into the company_share_capital capital into business dataset

from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

business = business.withColumn('company_share_capital', f.regexp_replace('company_share_capital', ',', '.'))
business = business.withColumn('company_share_capital', business['company_share_capital'].cast(DoubleType()))

In [13]:
# Converting date columns in the establishments DataFrame to date type

establishments = establishments\
    .withColumn('date_registration_status',\
        f.to_date(establishments.date_registration_status.cast(StringType()), 'yyyyMMdd'))\
    .withColumn('activity_start_date',\
        f.to_date(establishments.activity_start_date.cast(StringType()), 'yyyyMMdd'))\
    .withColumn('date_special_situation',\
        f.to_date(establishments.date_special_situation.cast(StringType()), 'yyyyMMdd'))


In [14]:
# Converting date columns in the partners DataFrame to date type

partners = partners\
    .withColumn('entry_date_partnership',\
        f.to_date(partners.entry_date_partnership.cast(StringType()), 'yyyyMMdd'))

In [15]:
# Query the business dataset

business.select('root_cnpj', 'company_legal_name', 'legal_nature', 'company_share_capital', 'business_size')\
    .filter(business.company_share_capital > 1000000000)\
    .show(5, False)

[Stage 9:>                                                          (0 + 1) / 1]

+---------+-----------------------------------------------------------------+------------+---------------------+-------------+
|root_cnpj|company_legal_name                                               |legal_nature|company_share_capital|business_size|
+---------+-----------------------------------------------------------------+------------+---------------------+-------------+
|6352117  |REBOUCAS E CIA LTDA.                                             |2062        |2.9202702116E10      |3            |
|21199157 |TECHNO-CELLS INDUSTRIA DE SEMICONDUTORES SOLARES ES LTDA.        |2062        |3.0159349E9          |5            |
|34298743 |TVGD HOLDING LTDA                                                |2240        |1.0E10               |5            |
|9203283  |GILDASIO FERNANDES FONSECA                                       |2135        |5.2102967809E10      |1            |
|8842690  |ATVOS AGROINDUSTRIAL PARTICIPACOES S.A. - EM RECUPERACAO JUDICIAL|2054        |8.19789190757E9      

                                                                                

In [17]:
partners\
    .select('partner_name_or_legal_name', 'entry_date_partnership', f.year('entry_date_partnership').alias('year_entry_date_partnership'))\
        .show(5, False)

+-------------------------------+----------------------+---------------------------+
|partner_name_or_legal_name     |entry_date_partnership|year_entry_date_partnership|
+-------------------------------+----------------------+---------------------------+
|LILIANA PATRICIA GUASTAVINO    |1994-07-25            |1994                       |
|CRISTINA HUNDERTMARK           |1994-07-25            |1994                       |
|CELSO EDUARDO DE CASTRO STEPHAN|1994-05-16            |1994                       |
|EDUARDO BERRINGER STEPHAN      |1994-05-16            |1994                       |
|HANNE MAHFOUD FADEL            |1994-06-09            |1994                       |
+-------------------------------+----------------------+---------------------------+
only showing top 5 rows



In [22]:
establishments\
    .select('company_fantasy_name', 'city', f.year('activity_start_date').alias('year_activity_start_date'),\
    f.year('date_registration_status').alias('year_date_registration_status'))\
        .show(5, False)

+--------------------+----+------------------------+-----------------------------+
|company_fantasy_name|city|year_activity_start_date|year_date_registration_status|
+--------------------+----+------------------------+-----------------------------+
|PIRAMIDE M. C.      |7107|1994                    |2001                         |
|null                |7107|1994                    |2008                         |
|null                |7107|1994                    |1997                         |
|null                |7107|1994                    |2008                         |
|EMBROIDERY & GIFT   |7075|1995                    |1998                         |
+--------------------+----+------------------------+-----------------------------+
only showing top 5 rows



In [23]:
# Searching for Null values in the partners dataset

partners\
    .select([f.count(f.when(f.isnull(c), 1)).alias(c) \
        for c in partners.columns])\
    .show()



+---------+----------+--------------------------+----------------+---------------------+----------------------+-------+--------------------+-------------------+----------------------------------+---------+
|root_cnpj|partner_id|partner_name_or_legal_name|cnpj_cpf_partner|partner_qualification|entry_date_partnership|country|legal_representative|representative_name|legal_representative_qualification|age_group|
+---------+----------+--------------------------+----------------+---------------------+----------------------+-------+--------------------+-------------------+----------------------------------+---------+
|        0|         0|                       208|            1234|                    0|                     1|2038255|                   0|            1995432|                                 0|        0|
+---------+----------+--------------------------+----------------+---------------------+----------------------+-------+--------------------+-------------------+----------------

                                                                                

In [24]:
business\
    .select([f.count(f.when(f.isnull(c), 1)).alias(c) \
        for c in business.columns])\
    .show()



+---------+------------------+------------+-------------------------+---------------------+-------------+-----------------------------+
|root_cnpj|company_legal_name|legal_nature|responsible_qualification|company_share_capital|business_size|federative_entity_responsible|
+---------+------------------+------------+-------------------------+---------------------+-------------+-----------------------------+
|        0|                 0|           0|                        0|                    0|         5985|                      4579678|
+---------+------------------+------------+-------------------------+---------------------+-------------+-----------------------------+



                                                                                

In [25]:
establishments\
    .select([f.count(f.when(f.isnull(c), 1)).alias(c) \
        for c in establishments.columns])\
    .show()

23/06/14 20:54:09 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+---------+----------+-------+---------------+--------------------+-------------------+------------------------+--------------------------+-----------------+-------+-------------------+---------+--------------+-----------+------+------+----------+------------+--------+----------+----+-------+-------+-------+-------+-------+-------+-------+-----------------+----------------------+
|root_cnpj|order_cnpj|cnpj_dv|id_hq_or_branch|company_fantasy_name|registration_status|date_registration_status|reason_registration_status|city_outside_name|country|activity_start_date|main_cnae|secundary_cnae|street_type|street|number|complement|neighborhood|zip_code|state_code|city|  ddd_1|phone_1|  ddd_2|phone_2|ddd_fax|    fax|  email|special_situation|date_special_situation|
+---------+----------+-------+---------------+--------------------+-------------------+------------------------+--------------------------+-----------------+-------+-------------------+---------+--------------+-----------+------+-----

                                                                                

In [34]:
partners\
    .select('partner_name_or_legal_name', 'entry_date_partnership',\
        f.year('entry_date_partnership').alias('year_entry_date_partnership'))\
            .orderBy(['year_entry_date_partnership', 'age_group'], ascending=True)\
                .show(5, False)



+-------------------------------+----------------------+---------------------------+
|partner_name_or_legal_name     |entry_date_partnership|year_entry_date_partnership|
+-------------------------------+----------------------+---------------------------+
|JOAO FRANCISCO DE AMORIM JUNCAL|null                  |null                       |
|NAIR YOKO HIRAI TAKAKI         |1900-01-01            |1900                       |
|MARIA SILENE BEZERRA DE AGUIAR |1900-01-01            |1900                       |
|VALMAR CARDOSO DE SANTANA      |1901-01-01            |1901                       |
|JOSE NELSON VIEIRA CAMPOS      |1901-01-01            |1901                       |
+-------------------------------+----------------------+---------------------------+
only showing top 5 rows



                                                                                

In [36]:
business\
    .select('company_legal_name', 'legal_nature', 'company_share_capital', 'business_size')\
        .filter(business.company_share_capital < 100)\
            .orderBy('company_share_capital', ascending=True)\
                .show(5, False)



+------------------------------------------+------------+---------------------+-------------+
|company_legal_name                        |legal_nature|company_share_capital|business_size|
+------------------------------------------+------------+---------------------+-------------+
|MC FERNANDES REPRESENTACAO E COMERCIO LTDA|2062        |0.0                  |5            |
|VANDERLEI FERREIRA MACHADO                |2135        |0.0                  |5            |
|LAR DOS IDOSOS ASTROGILDO RIBEIRO         |3999        |0.0                  |5            |
|ACOTRYLL COMERCIO DE ACO E METAIS LTDA    |2062        |0.0                  |5            |
|GISELA SPROVIERI                          |2135        |0.0                  |5            |
+------------------------------------------+------------+---------------------+-------------+
only showing top 5 rows



                                                                                

In [38]:
partners\
    .select('partner_name_or_legal_name', 'entry_date_partnership')\
        .filter(partners.partner_name_or_legal_name.contains('ARANTES'))\
            .orderBy('entry_date_partnership', ascending=False)\
                .show(5, False)

+-------------------------------------+----------------------+
|partner_name_or_legal_name           |entry_date_partnership|
+-------------------------------------+----------------------+
|ANTONIO PADUA ARANTES                |2021-05-11            |
|NATACHA BARZAN ARANTES               |2021-04-30            |
|MARCELO FABIANO ARANTES GARCIA MORENO|2021-04-29            |
|THIAGO ARANTES                       |2021-04-26            |
|CLODOALDO DOS SANTOS ARANTES         |2021-04-26            |
+-------------------------------------+----------------------+
only showing top 5 rows



                                                                                

In [42]:
partners\
    .select('partner_name_or_legal_name', 'entry_date_partnership')\
        .where(f.upper(partners.partner_name_or_legal_name).like('%ARANTES%'))\
            .show(5, False)

+-------------------------------+----------------------+
|partner_name_or_legal_name     |entry_date_partnership|
+-------------------------------+----------------------+
|LUCIANO ARANTES BARROS         |1998-07-28            |
|FABIANA ARANTES QUEIROS        |1997-12-08            |
|LUCIANO LANA ARANTES           |2000-11-21            |
|CLEONICE MARIA ARANTES DE CICCO|2006-08-30            |
|ROBERTO ARANTES                |2006-12-23            |
+-------------------------------+----------------------+
only showing top 5 rows

