# Desafio modulo 3

#### Lendo dados do bucket publico

In [39]:
import pyspark.sql.functions as f

In [40]:
def read_data():
    path = ['gs://desafio-final/F.K03200$Z.D10710.CNAE.csv',
            'gs://desafio-final/F.K03200$Z.D10710.MUNIC.csv',
            'gs://desafio-final/estabelecimentos'
           ]
    schemas = ['cod_cnae STRING, descricao_cnae STRING',
              'cod_municipio INT, nm_cidade STRING',
               """
                cod_cnpj_basico STRING, 
                cod_cnpj_ordem STRING, 
                cod_cnpj_dv STRING, 
                cod_matriz_filial INT, 
                nm_fantasia STRING,
                cod_situacao INT,
                dt_situacao_cadastral STRING,
                cod_motivo_situacao INT,
                nm_cidade_exterior STRING,
                cod_pais STRING,
                dt_atividade STRING,
                cod_CNAE_principal STRING,
                cod_CNAE_secundario STRING,
                ds_tipo_logradouro STRING,
                ds_logradouro STRING,
                num_endereco INT,
                ds_complemento STRING,
                nm_bairro STRING,
                cod_cep STRING,
                sg_uf STRING,
                cod_municipio STRING,
                num_ddd STRING,
                num_telefone STRING,
                num_ddd2 STRING,
                num_telefone2 STRING,
                num_ddd_fax STRING,
                num_fax STRING,
                nm_email STRING,
                ds_situacao_especial STRING,
                dt_situacao_especial STRING
               """
              ]
    
    df_list = []
    c = 0
    for i in path:
        df = (
            spark
            .read
            .format('csv')
            .option('sep', ';')
            .option('encoding', 'ISO-8859-1')
            .option('escape', "\"")
            .schema(schemas[c])
            .load(path[c])
        )
        df_list.append(df)
        c+=1
    return df_list[0], df_list[1], df_list[2]
    

In [41]:
df_cnae, df_municipio, df_estabelecimento = read_data()

In [42]:
def processing(df):
    #limpa string
    df_estabelecimento_processado = df
    str_cols = ['cod_cnpj_basico', 
                'cod_cnpj_ordem', 
                'cod_cnpj_dv', 
                'nm_fantasia', 
                'nm_cidade_exterior', 
                'cod_pais',
                'cod_CNAE_principal',
                'cod_CNAE_secundario',
                'ds_tipo_logradouro',
                'ds_logradouro',
                'ds_complemento',
                'nm_bairro',
                'cod_cep',
                'sg_uf',
                'cod_municipio',
                'num_ddd', 
                'num_telefone',
                'num_ddd2',
                'num_telefone2',
                'num_ddd_fax',
                'num_fax', 
                'nm_email',
                'ds_situacao_especial',
                'dt_situacao_especial']
    for c in str_cols:
        df_estabelecimento_processado = (
            df_estabelecimento_processado
            .withColumn(c, f.trim(f.col(c)))
        )
    #converte data e trata numeros
    df_estabelecimento_processado = (
        df_estabelecimento_processado
        .withColumn("dt_situacao_cadastral", f.to_date(f.col("dt_situacao_cadastral"),"yyyyMMdd"))
        .withColumn("dt_atividade", f.to_date(f.col("dt_atividade"),"yyyyMMdd"))
        .withColumn("dt_situacao_especial", f.to_date(f.col("dt_situacao_especial"),"yyyyMMdd"))
        .withColumn("num_endereco", f.coalesce(f.col("num_endereco"), f.lit(0)))
    )
    
    #cuida das datas anteriores a 1900
    df_estabelecimento_processado = (
        df_estabelecimento_processado
        .withColumn("dt_atividade", f.when(f.col("dt_atividade") <= "1850-01-01", None)
                                        .otherwise(f.col("dt_atividade")))
        .withColumn("dt_situacao_cadastral", f.when(f.col("dt_situacao_cadastral") <= "1850-01-01", None)
                                        .otherwise(f.col("dt_situacao_cadastral")))
    
    
    )
    return df_estabelecimento_processado






        
               

In [43]:
df_estabelecimento = processing(df_estabelecimento)

In [44]:
def write_trusted(df, path):
    for c in range(3):
        try:
            (
                df[c]
                .write
                .format('parquet')
                .mode('overwrite')
                .save(path[c])
            )
            print("Operacao concluida com sucesso no path:  " + path[c])
        except Exception as e:
            print(e)
    
        
    

In [45]:
write_trusted([df_estabelecimento, df_cnae, df_municipio], 
              ["gs://datalake-demo-spark/trusted/estabelecimento",
               "gs://datalake-demo-spark/trusted/cnae",
               "gs://datalake-demo-spark/trusted/municipio"])

                                                                                

Operacao concluida com sucesso no path:  gs://datalake-demo-spark/trusted/estabelecimento


                                                                                

Operacao concluida com sucesso no path:  gs://datalake-demo-spark/trusted/cnae


                                                                                

Operacao concluida com sucesso no path:  gs://datalake-demo-spark/trusted/municipio


In [53]:
def read_data_trusted():
    path = ["gs://datalake-demo-spark/trusted/estabelecimento",
               "gs://datalake-demo-spark/trusted/cnae",
               "gs://datalake-demo-spark/trusted/municipio"]
    df_list = []
    c = 0
    for i in path:
        df = (
            spark
            .read
            .format('parquet')
            .load(path[c])
        )
        df_list.append(df)
        c+=1
    return df_list[0], df_list[1], df_list[2]

In [54]:
df_estabelecimento, df_cnae, df_municipio = read_data_trusted()

                                                                                

In [60]:
def join_data(df_estabelecimento, df_cnae, df_municipio):
    df_final = (
        df_estabelecimento
        .join(df_cnae.hint('broadcast'), f.col("cod_CNAE_principal") == f.col("cod_cnae"), 'left')
        .join(df_municipio.hint('broadcast'), "cod_municipio", 'left')    
    )
    return df_final


In [61]:
df_final = join_data(df_estabelecimento, df_cnae, df_municipio)

In [64]:
def write_refined(df):
    try:
        (
            df
            .write
            .format('parquet')
            .partitionBy('sg_uf')
            .mode('overwrite')
            .save("gs://datalake-demo-spark/refined")
        )
        print("Operacao concluida com sucesso")
    except:
        print("Erro. Operacao nao concluida")
    

In [65]:
write_refined(df_final)

                                                                                

Operacao concluida com sucesso


In [72]:
(
    df
    .filter('cod_situacao = 2')
    .groupBy("cod_CNAE_principal")
    .count()
    .orderBy(f.col("count").desc())
    .limit(5)
    .show()
)



+------------------+------+
|cod_CNAE_principal| count|
+------------------+------+
|           4781400|991316|
|           9602501|754449|
|           4399103|458648|
|           7319002|453914|
|           4712100|444980|
+------------------+------+



                                                                                

In [74]:
#Quantos CNPJs não ativos existem no estado de São Paulo? 
(
    df
    .filter('cod_situacao != 2')
    .filter('sg_uf = "SP"')
    .count()
)

                                                                                

7966472

In [82]:
#Quantas empresas de “Consultoria em tecnologia da informação” existem em Belo Horizonte? 
(
    df
    .filter('nm_cidade = "BELO HORIZONTE"')
    .filter('descricao_cnae = "Consultoria em tecnologia da informação"')
    .count()
)

                                                                                

2325

In [91]:
 #Qual o CNAE primário do IGTI?
(
    df
    .select('cod_CNAE_principal')
    .filter('nm_cidade = "BELO HORIZONTE"')
    .filter('nm_fantasia like "%IGTI%"')
    .limit(5)
    .toPandas()
)

                                                                                

Unnamed: 0,cod_CNAE_principal
0,8532500


In [94]:
#Quantas empresas foram abertas desde 2020?
(
    df
    .withColumn("ano", f.year(f.col("dt_atividade")))
    .filter('ano >= 2020')
    .count()
)

                                                                                

6314456