In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

In [None]:
#Criando uma session com acesso ao gcp
spark = (
    SparkSession
    .builder
    .appName("Desafio-edc-format-trusted")
    .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    .config("spark.jars","./jars/gcs-connector-hadoop3-latest.jar")
    .config("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
    .config("fs.AbstractFileSystem.gs.impl","com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
    .config("fs.gs.auth.service.account.enable","true")
    .config("fs.gs.auth.service.account.json.keyfile", "/mnt/d/EngDados/gcp/gcp-estudos-engdados-20f2cdfffed8.json")
    .config("spark.driver.memory", "8g")
    .getOrCreate()
)

## Formatação dos dados de CNAE - Atividade Economica
Ajustar os tipos de dados para melhor leitura/joins

In [None]:
df_cnae = spark.read.parquet("gs://bootcamp-edc/raw/cnae.parquet/*.parquet")

In [None]:
df_cnae.limit(5).toPandas()

In [None]:
df_cnae = (
    df_cnae
    .withColumn("cod_atividade", f.col("cod_atividade").cast("int"))
    .withColumn("desc_atividade", f.trim(f.col("desc_atividade")))
    )

In [None]:
df_cnae.limit(10).toPandas()

In [None]:
df_cnae.printSchema()

In [None]:
df_cnae.write.mode("overwrite").parquet("gs://bootcamp-edc/trusted/cnae.parquet")

In [None]:
del(df_cnae)

## Leitura dos dados de Município

In [None]:
df_municipio = spark.read.parquet("gs://bootcamp-edc/raw/municipio.parquet/*.parquet")

In [None]:
df_municipio.limit(5).toPandas()

In [None]:
df_municipio = (
    df_municipio
    .withColumn("cod_municipio", f.col("cod_municipio").cast("int"))
    .withColumn("desc_municipio", f.initcap(f.trim(f.col("desc_municipio"))))
)

In [None]:
df_municipio.limit(10).toPandas()

In [None]:
df_municipio.printSchema()

In [None]:
df_municipio.write.mode("overwrite").parquet("gs://bootcamp-edc/trusted/municipio.parquet")

In [None]:
del(df_municipio)

## Leitura dos dados de estabelecimentos

In [None]:
df_estabelecimento = spark.read.parquet("gs://bootcamp-edc/raw/estabelecimento.parquet/*.parquet")

In [None]:
df_estabelecimento.limit(5).toPandas()

In [None]:
#Formata colunas inteiras
int_columns = ["idc_matriz_filial","cod_situacao_cadastral","motivo_situacao_cadastral","cod_pais","cod_municipio"]
for column in int_columns:
    df_estabelecimento = df_estabelecimento.withColumn(column, f.col(column).cast("int"))

In [None]:
#Formata colunas strings
str_columns = ["nome_fantasia","nome_cidade_exterior"]
for column in str_columns:
    df_estabelecimento = df_estabelecimento.withColumn(column, f.initcap(f.trim(f.col(column))))
    
df_estabelecimento = df_estabelecimento.withColumn("email", f.lower(f.trim(f.col("email"))))

In [None]:
df_estabelecimento.limit(10).toPandas()

In [None]:
df_estabelecimento.printSchema()

In [None]:
df_estabelecimento.write.mode("overwrite").parquet("gs://bootcamp-edc/trusted/estabelecimento.parquet")

In [None]:
del(df_estabelecimento)

In [None]:
spark.stop()