In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import os

In [4]:
spark = (
    SparkSession.builder
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.executor.heartbeatInterval ", "36000")
    .config("spark.sql.broadcastTimeout", "360000")
    .config("spark.hadoop.fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID"))
    .config("spark.hadoop.fs.s3a.secret.key", os.getenv("AWS_SECRET_ACCESS_KEY"))
    .config("spark.hadoop.fs.s3a.fast.upload", True)
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.1.2')
    .config('spark.sql.repl.eagerEval.enabled', True)
    .getOrCreate()
)
spark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInWrite","LEGACY")
spark.conf.set("spark.sql.repl.eagerEval.enabled",True) #ele mostra o dataframe só de escrever seu nome
spark.conf.set("spark.sql.repl.eagerEval.maxNumRows",100) #numero de linhas a mostrar quando pedir para aparecer o dataframe

# IBGE

## Regiões

In [5]:
# Ler arquivo da zona raw
regions = (
    spark
    .read
    .option("inferSchema", True)
    .option("multiline", True)
    .format("json")
    .load("s3a://raw-zone/ibge/regions/")
)

21/10/08 23:22:48 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [7]:
# Salva como parquet na zona de processamento
(
    regions
    .write
    .mode("overwrite")
    .format("parquet")
    .save("s3a://processing-zone/ibge/regions/")
)

                                                                                

## Mesorregiões

In [8]:
# Ler arquivo da zona raw
mesoregions = (
    spark
    .read
    .option("inferSchema", True)
    .option("multiline", True)
    .format("json")
    .load("s3a://raw-zone/ibge/mesoregions/")
)

                                                                                

In [10]:
# Desaninhando campos
mesoregions_norm = mesoregions.select(
    "id",
    "nome",
    col("UF.id").alias("UFId"),
    col("UF.nome").alias("UFName"),
    col("UF.sigla").alias("UFSigla"),
    col("UF.regiao.id").alias("UFRegiaoId"),
    col("UF.regiao.nome").alias("UFRegiaoNome"),
    col("UF.regiao.sigla").alias("UFRegiaoSigla")
)

In [12]:
# Salva como parquet na zona de processamento
(
    mesoregions_norm
    .write
    .mode("overwrite")
    .format("parquet")
    .save("s3a://processing-zone/ibge/mesoregions/")
)

                                                                                

## Microrregiões

In [13]:
# Ler arquivo da zona raw
microregions = (
    spark
    .read
    .option("inferSchema", True)
    .option("multiline", True)
    .format("json")
    .load("s3a://raw-zone/ibge/microregions/")
)

                                                                                

In [16]:
microregions_norm = microregions.select(
    "id",
    "nome",
    col("mesorregiao.id").alias("mesorregiaoId"),
    col("mesorregiao.nome").alias("mesorregiaoNome"),
    col("mesorregiao.UF.id").alias("mesorregiaoUFId"),
    col("mesorregiao.UF.Nome").alias("mesorregiaoUFNome"),
    col("mesorregiao.UF.sigla").alias("mesorregiaoUFSigla"),
    col("mesorregiao.UF.regiao.id").alias("mesorregiaoUFRegiaoId"),
    col("mesorregiao.UF.regiao.nome").alias("mesorregiaoUFRegiaoNome"),
    col("mesorregiao.UF.regiao.sigla").alias("mesorregiaoUFRegiaoSigla"),
    
)

In [18]:
# Salva como parquet na zona de processamento
(
    microregions_norm 
    .write
    .mode("overwrite")
    .format("parquet")
    .save("s3a://processing-zone/ibge/microregions/")
)

                                                                                

# PNADC

In [19]:
# Ler arquivo da zona raw
pnadc = (
    spark
    .read
    .option("inferSchema", True)
    .option("multiline", True)
    .format("json")
    .load("s3a://raw-zone/pnadc/")
)


                                                                                

In [22]:
# Salva como parquet na zona de processamento
(
    pnadc 
    .write
    .mode("overwrite")
    .format("parquet")
    .save("s3a://processing-zone/pnadc/")
)

                                                                                