In [1]:
%%capture
!apt-get update
# dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!apt-get install maven -qq
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark
!wget https://repo1.maven.org/maven2/com/google/guava/guava/30.1-jre/guava-30.1-jre.jar

In [2]:
# configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

# tornar o pyspark "importável"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

In [3]:
# importar pyspark e funções
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.functions import *

In [4]:
# iniciar uma sessão local 
#Start session Context com nome edc
spark = SparkSession.builder.appName("EDC").getOrCreate()

In [8]:
!wget --quiet --show-progress https://storage.googleapis.com/desafio-final/F.K03200%24Z.D10710.CNAE.csv



In [25]:
schema = 'codcnae STRING, descricao STRING'
# carregar dados do cnae
cnae = spark.read.format('csv').schema(schema).options(header=False, sep=";" , encoding ='latin1').load("./F.K03200$Z.D10710.CNAE.csv")                         

# ver algumas informações sobre os tipos de dados de cada coluna
cnae.printSchema()


root
 |-- codcnae: string (nullable = true)
 |-- descricao: string (nullable = true)



In [27]:
cnae.show()

+-------+--------------------+
|codcnae|           descricao|
+-------+--------------------+
|0111301|    Cultivo de arroz|
|0111302|    Cultivo de milho|
|0111303|    Cultivo de trigo|
|0111399|Cultivo de outros...|
|0112101|Cultivo de algodã...|
|0112102|     Cultivo de juta|
|0112199|Cultivo de outras...|
|0113000|Cultivo de cana-d...|
|0114800|     Cultivo de fumo|
|0115600|     Cultivo de soja|
|0116401| Cultivo de amendoim|
|0116402| Cultivo de girassol|
|0116403|   Cultivo de mamona|
|0116499|Cultivo de outras...|
|0119901|  Cultivo de abacaxi|
|0119902|     Cultivo de alho|
|0119903|Cultivo de batata...|
|0119904|   Cultivo de cebola|
|0119905|   Cultivo de feijão|
|0119906| Cultivo de mandioca|
+-------+--------------------+
only showing top 20 rows



In [26]:
cnae.count()

1359

In [28]:
!wget --quiet --show-progress https://storage.googleapis.com/desafio-final/F.K03200%24Z.D10710.MUNIC.csv



In [31]:
#schema através de DLL programaticamente
schema = 'codmuni STRING, cidade STRING'
# carregar dados do municipios
municipio = spark.read.format('csv').schema(schema).options(header=False, sep=";" ).load("/content/F.K03200$Z.D10710.MUNIC.csv")
# ver algumas informações sobre os tipos de dados de cada coluna
municipio.printSchema()

root
 |-- codmuni: string (nullable = true)
 |-- cidade: string (nullable = true)



In [32]:
municipio.show(10)

+-------+--------------------+
|codmuni|              cidade|
+-------+--------------------+
|   0001|       GUAJARA-MIRIM|
|   0002|ALTO ALEGRE DOS P...|
|   0003|         PORTO VELHO|
|   0004|             BURITIS|
|   0005|           JI-PARANA|
|   0006|         CHUPINGUAIA|
|   0007|           ARIQUEMES|
|   0008|             CUJUBIM|
|   0009|              CACOAL|
|   0010|          NOVA UNIAO|
+-------+--------------------+
only showing top 10 rows



In [73]:
# download do http para arquivo local
!wget --quiet --show-progress https://storage.googleapis.com/desafio-final/estabelecimentos/K3241.K03200Y0.D10710.ESTABELE.csv 
!wget --quiet --show-progress https://storage.googleapis.com/desafio-final/estabelecimentos/K3241.K03200Y1.D10710.ESTABELE.csv
!wget --quiet --show-progress https://storage.googleapis.com/desafio-final/estabelecimentos/K3241.K03200Y2.D10710.ESTABELE.csv
!wget --quiet --show-progress https://storage.googleapis.com/desafio-final/estabelecimentos/K3241.K03200Y3.D10710.ESTABELE.csv
!wget --quiet --show-progress https://storage.googleapis.com/desafio-final/estabelecimentos/K3241.K03200Y4.D10710.ESTABELE.csv
!wget --quiet --show-progress https://storage.googleapis.com/desafio-final/estabelecimentos/K3241.K03200Y5.D10710.ESTABELE.csv
!wget --quiet --show-progress https://storage.googleapis.com/desafio-final/estabelecimentos/K3241.K03200Y6.D10710.ESTABELE.csv
!wget --quiet --show-progress https://storage.googleapis.com/desafio-final/estabelecimentos/K3241.K03200Y7.D10710.ESTABELE.csv
!wget --quiet --show-progress https://storage.googleapis.com/desafio-final/estabelecimentos/K3241.K03200Y8.D10710.ESTABELE.csv
!wget --quiet --show-progress https://storage.googleapis.com/desafio-final/estabelecimentos/K3241.K03200Y9.D10710.ESTABELE.csv                         



In [74]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType

In [75]:
schema = 'cnpj_basico STRING, cnpj_ordem STRING, cnpj_dv STRING, matriz_filial STRING, nome STRING, ' \
          ' situacao STRING, data STRING, motivo STRING, exterior STRING, pais STRING, inicio STRING, ' \
          ' cnae_principal STRING, cnae_secundario STRING, tipo STRING, logradouro STRING, numero STRING,' \
          ' compl STRING, bairro STRING, cep STRING, estado STRING, codmuni STRING, ddd STRING, telefone STRING, dd2 STRING, telefone2 STRING,' \
          ' ddd_fax STRING, fax STRING , email STRING, situacao_esp STRING, data_sit STRING'
         
# Create data frame de estabelecimentos
estab1  = spark.read.format('csv').schema(schema).options(header=False, sep=";" ).load("/content/K3241.K03200Y0.D10710.ESTABELE.csv")
estab2  = spark.read.format('csv').schema(schema).options(header=False, sep=";" ).load("/content/K3241.K03200Y1.D10710.ESTABELE.csv")
estab3  = spark.read.format('csv').schema(schema).options(header=False, sep=";" ).load("/content/K3241.K03200Y2.D10710.ESTABELE.csv")
estab4  = spark.read.format('csv').schema(schema).options(header=False, sep=";" ).load("/content/K3241.K03200Y3.D10710.ESTABELE.csv")
estab5  = spark.read.format('csv').schema(schema).options(header=False, sep=";" ).load("/content/K3241.K03200Y4.D10710.ESTABELE.csv")
estab6  = spark.read.format('csv').schema(schema).options(header=False, sep=";" ).load("/content/K3241.K03200Y5.D10710.ESTABELE.csv")
estab7  = spark.read.format('csv').schema(schema).options(header=False, sep=";" ).load("/content/K3241.K03200Y6.D10710.ESTABELE.csv")
estab8  = spark.read.format('csv').schema(schema).options(header=False, sep=";" ).load("/content/K3241.K03200Y7.D10710.ESTABELE.csv")
estab9  = spark.read.format('csv').schema(schema).options(header=False, sep=";" ).load("/content/K3241.K03200Y8.D10710.ESTABELE.csv")
estab10 = spark.read.format('csv').schema(schema).options(header=False, sep=";" ).load("/content/K3241.K03200Y9.D10710.ESTABELE.csv"



DataFrame[cnpj_basico: string, cnpj_ordem: string, cnpj_dv: string, matriz_filial: string, nome: string, situacao: string, data: string, motivo: string, exterior: string, pais: string, inicio: string, cnae_principal: string, cnae_secundario: string, tipo: string, logradouro: string, numero: string, compl: string, bairro: string, cep: string, estado: string, codmuni: string, ddd: string, telefone: string, dd2: string, telefone2: string, ddd_fax: string, fax: string, email: string, situacao_esp: string, data_sit: string]

In [80]:
#unindo todos os dataframes
estab = estab1.union(estab2)
estab = estab.union(estab3)
estab = estab.union(estab4)
estab = estab.union(estab5)
estab = estab.union(estab6)
estab = estab.union(estab7)
estab = estab.union(estab8)
estab = estab.union(estab9)
estab.count()

44444992

In [97]:
#faltou o 10
estab = estab.union(estab10)

In [98]:
estab.count()

49198427

In [35]:
municipio.show()

+-------+--------------------+
|codmuni|              cidade|
+-------+--------------------+
|   0001|       GUAJARA-MIRIM|
|   0002|ALTO ALEGRE DOS P...|
|   0003|         PORTO VELHO|
|   0004|             BURITIS|
|   0005|           JI-PARANA|
|   0006|         CHUPINGUAIA|
|   0007|           ARIQUEMES|
|   0008|             CUJUBIM|
|   0009|              CACOAL|
|   0010|          NOVA UNIAO|
|   0011|       PIMENTA BUENO|
|   0012|             PARECIS|
|   0013|             VILHENA|
|   0014|PIMENTEIRAS DO OESTE|
|   0015|                JARU|
|   0016|PRIMAVERA DE ROND...|
|   0017| OURO PRETO DO OESTE|
|   0018|  SAO FELIPE D'OESTE|
|   0019|   PRESIDENTE MEDICI|
|   0020|SAO FRANCISCO DO ...|
+-------+--------------------+
only showing top 20 rows



In [99]:
#Qual o código do CNAE mais presente nas empresas ativas? #Quantas empresas utilizam esse CNAE?
from pyspark.sql.functions import col, concat_ws
consulta1 = (
    estab
    .filter(col('situacao') == '02')   
    .withColumn('cnae',concat_ws(',',col('cnae_principal'),col('cnae_secundario')))
    .withColumn('cnae',split(col('cnae'),','))
    .withColumn('cnae',explode(col('cnae')))
    .groupby('cnae')
    .agg(f.count('cnpj_basico').alias('qtde'))
    .orderBy(col('qtde').desc())
    )
consulta1.show(5)

+-------+-------+
|   cnae|   qtde|
+-------+-------+
|4781400|1781556|
|4772500|1021115|
|5611203| 937293|
|9602501| 893140|
|4723700| 854114|
+-------+-------+
only showing top 5 rows



In [101]:
#Quantos CNPJs não ativos existem no estado de São Paulo?
consulta3 = (
    estab
    .filter(col('situacao') != '02')   
    .filter(col('estado') == 'SP')

    )
consulta3.count()

7966464

In [105]:
estab.select('situacao').distinct().collect()

[Row(situacao='20140505'),
 Row(situacao='20180619'),
 Row(situacao='01'),
 Row(situacao='DIET & LIGHT"'),
 Row(situacao='20051103'),
 Row(situacao='20170109'),
 Row(situacao='08'),
 Row(situacao='03'),
 Row(situacao='02'),
 Row(situacao='BAZAR E PAPELARIA."'),
 Row(situacao='20181130'),
 Row(situacao='"'),
 Row(situacao='20210325'),
 Row(situacao='20100518'),
 Row(situacao='04'),
 Row(situacao='20190514'),
 Row(situacao='20210422'),
 Row(situacao='20050924')]

In [106]:
estab.show(10)

+-----------+----------+-------+-------------+--------------------+--------+--------+------+--------+----+--------+--------------+--------------------+-------+--------------------+------+--------------------+--------------------+--------+------+-------+---+--------+----+---------+-------+----+--------------------+------------+--------+
|cnpj_basico|cnpj_ordem|cnpj_dv|matriz_filial|                nome|situacao|    data|motivo|exterior|pais|  inicio|cnae_principal|     cnae_secundario|   tipo|          logradouro|numero|               compl|              bairro|     cep|estado|codmuni|ddd|telefone| dd2|telefone2|ddd_fax| fax|               email|situacao_esp|data_sit|
+-----------+----------+-------+-------------+--------------------+--------+--------+------+--------+----+--------+--------------+--------------------+-------+--------------------+------+--------------------+--------------------+--------+------+-------+---+--------+----+---------+-------+----+--------------------+---------

In [109]:
#CNAE DA CONSULTORIA
cnae.filter(col('descricao')=='Consultoria em tecnologia da informação').show()

+-------+--------------------+
|codcnae|           descricao|
+-------+--------------------+
|6204000|Consultoria em te...|
+-------+--------------------+



In [113]:
#belo horizonte
municipio.filter(upper(col('cidade')) == 'BELO HORIZONTE').show()

+-------+--------------+
|codmuni|        cidade|
+-------+--------------+
|   4123|BELO HORIZONTE|
+-------+--------------+



In [118]:
#Quantas empresas de “Consultoria em tecnologia da informação” existem em Belo Horizonte?
consulta6 = (
    estab
    .filter(col('codmuni') == '4123')   
    .withColumn('cnae',concat_ws(',',col('cnae_principal'),col('cnae_secundario')))
    .withColumn('cnae',split(col('cnae'),','))
    .withColumn('cnae',explode(col('cnae')))
    .filter(col('cnae') == '6204000')

)
consulta6.count()

6930

In [93]:
from pyspark.sql.functions import col, concat_ws
#Qual o código do CNAE mais presente nas empresas ativas? Quantas empresas utilizam esse CNAE?
 #.withColumn('cnae',explode(split('cnae_secundario',',')))
consulta1 = (
    estab10
    .filter(col('situacao') == '02') 
    .withColumn('cnae',concat_ws(',',col('cnae_principal'),col('cnae_secundario')))
    .withColumn('cnae',split(col('cnae'),','))
    .withColumn('cnae',explode(col('cnae')))
    )
consulta1.show(5)

+-----------+----------+-------+-------------+-------------+--------+--------+------+--------+----+--------+--------------+--------------------+----+-------------+------+-----+-----------+--------+------+-------+---+--------+----+---------+-------+----+--------------------+------------+--------+-------+
|cnpj_basico|cnpj_ordem|cnpj_dv|matriz_filial|         nome|situacao|    data|motivo|exterior|pais|  inicio|cnae_principal|     cnae_secundario|tipo|   logradouro|numero|compl|     bairro|     cep|estado|codmuni|ddd|telefone| dd2|telefone2|ddd_fax| fax|               email|situacao_esp|data_sit|   cnae|
+-----------+----------+-------+-------------+-------------+--------+--------+------+--------+----+--------+--------------+--------------------+----+-------------+------+-----+-----------+--------+------+-------+---+--------+----+---------+-------+----+--------------------+------------+--------+-------+
|   28625545|      0001|     02|            1|APTOS CLINICA|      02|20170913|    00|

In [130]:
#Qual o CNAE primário do IGTI?
#Dica: O IGTI está localizado em Belo Horizonte.
consulta7 = (
    estab
    .filter(col('codmuni') == '4123')   
    .withColumn('nome',upper(col('nome')))
    .filter( estab.nome.rlike ('.*IGTI.*')) 
    .select('nome','cnae_principal')
    )
consulta7.show(5)

+----+--------------+
|nome|cnae_principal|
+----+--------------+
|IGTI|       8532500|
+----+--------------+



In [131]:
cnae.filter(col('codcnae')=='8532500').show()

+-------+--------------------+
|codcnae|           descricao|
+-------+--------------------+
|8532500|Educação superior...|
+-------+--------------------+



In [136]:
#Quantas empresas foram abertas desde 2020?
estab.filter(col('data').rlike('202.*')).count()

12223867

In [145]:
#Quantas empresas foram abertas desde 2020?
consulta8 = (
    estab    
    .withColumn('abertura',to_date(col('inicio'),'yyyyMMdd'))
    .select('data','abertura')
    .filter(col('abertura')>='2020-01-01')
    )
consulta8.count()

6314458