#### OBJETIVO

O objetivo deste trabalho é construir um pipeline de dados utilizando tecnologias na nuvem para analisar dados do censo populacional fornecidos pelo IBGE. O pipeline será capaz de buscar, coletar, modelar, carregar e analisar esses dados para fornecer insights sobre a demografia e outras características da população brasileira.

#### PERGUNTAS A SEREM RESPONDIDAS

##### Qual é a distribuição populacional por estado e município?

Determinar a quantidade de habitantes em diferentes regiões do Brasil.

##### Qual é a composição etária da população?

Analisar a distribuição etária e identificar tendências de envelhecimento ou rejuvenescimento da população.

##### Quais são as taxas de alfabetização e escolaridade?

Avaliar os níveis de alfabetização e educação em diferentes regiões.

##### Qual é a distribuição de renda da população?

Analisar a distribuição de renda e identificar disparidades econômicas.

#### LEITURA E PROCESSAMENTO DA BASE DE DADOS

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Criação de uma SparkSession
spark = SparkSession.builder \
    .appName("IBGE Census Data Analysis") \
    .getOrCreate()

# Caminho do arquivo após o upload para o Databricks
file_path = "/FileStore/tables/dados.csv"

# Leitura dos dados do arquivo CSV
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(file_path)

# Limpeza de dados: Remoção de nulos e duplicatas
df_cleaned = df.dropna().dropDuplicates()

# Criação da tabela temporária
df_cleaned.createOrReplaceTempView("census_data")

# Exibir o esquema do DataFrame para entender as colunas disponíveis
df.printSchema()

# Exibir as primeiras linhas do DataFrame
df.show()


root
 |-- UF: integer (nullable = true)
 |-- Sexo: integer (nullable = true)
 |-- Idade: integer (nullable = true)
 |-- Cor: integer (nullable = true)
 |-- Anos de Estudo: integer (nullable = true)
 |-- Renda: integer (nullable = true)
 |-- Altura: double (nullable = true)

+---+----+-----+---+--------------+-----+-----------+
| UF|Sexo|Idade|Cor|Anos de Estudo|Renda|     Altura|
+---+----+-----+---+--------------+-----+-----------+
| 11|   0|   23|  8|            12|  800|1.603807616|
| 11|   1|   23|  2|            12| 1150|1.739789827|
| 11|   1|   35|  8|            15|  880|1.760443822|
| 11|   0|   46|  2|             6| 3500|1.783157945|
| 11|   1|   47|  8|             9|  150|1.690630954|
| 11|   1|   34|  8|            12|  790|1.637905598|
| 11|   0|   57|  8|            12| 3150|1.570078218|
| 11|   1|   60|  8|            12| 1700|1.608495442|
| 11|   1|   50|  4|            14| 1800|1.780328883|
| 11|   0|   26|  8|            12| 1150|1.793202625|
| 11|   1|   46|  8|   

#### CATÁLOGO DOS DADOS

In [0]:
# Descrição detalhada dos dados
catalog = {
    "UF": {
        "description": "Unidade Federativa",
        "categories": df.select("UF").distinct().rdd.flatMap(lambda x: x).collect()
    },
    "Sexo": {
        "description": "Sexo da pessoa",
        "categories": df.select("Sexo").distinct().rdd.flatMap(lambda x: x).collect()
    },
    "Idade": {
        "description": "Idade da pessoa",
        "min_value": df.agg({"Idade": "min"}).collect()[0][0],
        "max_value": df.agg({"Idade": "max"}).collect()[0][0]
    },
    "Cor": {
        "description": "Cor/raça da pessoa",
        "categories": df.select("Cor").distinct().rdd.flatMap(lambda x: x).collect()
    },
    "Anos de Estudo": {
        "description": "Número de anos de estudo",
        "min_value": df.agg({"Anos de Estudo": "min"}).collect()[0][0],
        "max_value": df.agg({"Anos de Estudo": "max"}).collect()[0][0]
    },
    "Renda": {
        "description": "Renda da pessoa",
        "min_value": df.agg({"Renda": "min"}).collect()[0][0],
        "max_value": df.agg({"Renda": "max"}).collect()[0][0]
    },
    "Altura": {
        "description": "Altura da pessoa",
        "min_value": df.agg({"Altura": "min"}).collect()[0][0],
        "max_value": df.agg({"Altura": "max"}).collect()[0][0]
    }
}

# Exibindo o catálogo de dados
import json
print(json.dumps(catalog, indent=4))


{
    "UF": {
        "description": "Unidade Federativa",
        "categories": [
            31,
            53,
            28,
            26,
            27,
            12,
            22,
            52,
            13,
            16,
            41,
            15,
            43,
            17,
            35,
            23,
            51,
            50,
            25,
            24,
            29,
            21,
            32,
            11,
            33,
            14,
            42
        ]
    },
    "Sexo": {
        "description": "Sexo da pessoa",
        "categories": [
            1,
            0
        ]
    },
    "Idade": {
        "description": "Idade da pessoa",
        "min_value": 13,
        "max_value": 99
    },
    "Cor": {
        "description": "Cor/ra\u00e7a da pessoa",
        "categories": [
            6,
            4,
            8,
            2,
            0
        ]
    },
    "Anos de Estudo": {
        "description": "N\u00

#### LINHAGEM DOS DADOS

In [0]:
data_lineage = {
    "data_source": "IBGE",
    "data_url": "https://www.ibge.gov.br",
    "techniques_used": [
        "Download de dados CSV",
        "Leitura e processamento com PySpark",
        "Limpeza de dados (remoção de nulos e duplicatas)"
    ]
}

# Exibindo a linhagem dos dados
print(json.dumps(data_lineage, indent=4))


{
    "data_source": "IBGE",
    "data_url": "https://www.ibge.gov.br",
    "techniques_used": [
        "Download de dados CSV",
        "Leitura e processamento com PySpark",
        "Limpeza de dados (remo\u00e7\u00e3o de nulos e duplicatas)"
    ]
}


#### LIMPEZA E TRANSFORMAÇÃO DA BASE DE DADOS

In [0]:
# Limpeza de dados: Remoção de nulos e duplicatas
df_cleaned = df.dropna().dropDuplicates()

# Criação da tabela temporária
df_cleaned.createOrReplaceTempView("census_data")

# Vamos supor que queremos analisar a coluna 'Renda'
df_transformed = df_cleaned.withColumn("Renda", col("Renda").cast("integer"))

# Exibir as primeiras linhas do DataFrame transformado
df_transformed.show()


+---+----+-----+---+--------------+-----+-----------+
| UF|Sexo|Idade|Cor|Anos de Estudo|Renda|     Altura|
+---+----+-----+---+--------------+-----+-----------+
| 11|   0|   53|  8|            16| 7000|1.673665133|
| 11|   0|   69|  8|             1| 3030|1.746351196|
| 11|   1|   52|  2|             6| 1200|1.736442076|
| 11|   0|   57|  2|            12| 2000|1.715137632|
| 11|   0|   23|  8|            12|  800|1.603807616|
| 11|   0|   34|  2|            16| 6000|1.711825037|
| 11|   0|   57|  8|            12| 3150|1.570078218|
| 11|   1|   34|  8|            12|  790|1.637905598|
| 11|   0|   45|  2|            12| 2000|1.780450392|
| 11|   1|   62|  8|            12| 1150| 1.76597309|
| 11|   0|   46|  2|             6| 3500|1.783157945|
| 11|   1|   43|  8|            10| 1050|1.789011804|
| 11|   1|   35|  8|            15|  880|1.760443822|
| 11|   1|   23|  2|            12| 1150|1.739789827|
| 11|   0|   38|  8|            12| 2000|1.750826666|
| 11|   1|   46|  8|        

#### CARREGAMENTO DOS DADOS EM UM DATA WAREHOUSE

In [0]:
# Carregar os dados transformados em um Data Warehouse na nuvem
df_transformed.write.format("parquet") \
    .mode("overwrite") \
    .save("dbfs:/path/to/transformed-data")


#### ANÁLISE DOS DADOS

A análise dos dados ocorre através de consultas SQL.

In [0]:
# Análise de dados com SQL

# Exemplo de consulta: Distribuição de renda por estado (UF)
renda_distribution = spark.sql("""
    SELECT UF, AVG(Renda) as avg_renda
    FROM census_data
    GROUP BY UF
    ORDER BY avg_renda DESC
""")

# Exibição dos resultados
renda_distribution.show()


+---+------------------+
| UF|         avg_renda|
+---+------------------+
| 53| 4241.954722492697|
| 35|2638.1049859550562|
| 33|2496.4031677465805|
| 41| 2493.870752984389|
| 42| 2470.854945054945|
| 43|2315.1583359696297|
| 50|2262.6041666666665|
| 51| 2130.652777777778|
| 31|2056.4320843091336|
| 32|2026.3838517538054|
| 52| 1994.580793559517|
| 16|    1861.353515625|
| 11| 1789.761223162004|
| 14| 1783.588888888889|
| 17|1771.0949464012251|
| 26|1527.0793193717277|
| 12|1506.0917822838846|
| 13|1445.1300997280146|
| 29|1429.6450935805492|
| 15|1399.0768712070128|
+---+------------------+
only showing top 20 rows



In [0]:
# Distribuição populacional por estado
populacao_estado = spark.sql("""
    SELECT UF, COUNT(*) as population
    FROM census_data
    GROUP BY UF
    ORDER BY UF
""")

# Exibição da distribuição da população por estado
populacao_estado.show()

+---+----------+
| UF|population|
+---+----------+
| 11|      1537|
| 12|       937|
| 13|      2206|
| 14|       540|
| 15|      4449|
| 16|       512|
| 17|      1306|
| 21|      1787|
| 22|      1211|
| 23|      3359|
| 24|       973|
| 25|      1274|
| 26|      3820|
| 27|       903|
| 28|      1287|
| 29|      5717|
| 31|      7686|
| 32|      1511|
| 33|      5556|
| 35|      8544|
+---+----------+
only showing top 20 rows



In [0]:
# Composição etária da população
distribuicao_idade = spark.sql("""
    SELECT CASE 
             WHEN Idade BETWEEN 0 AND 14 THEN '0-14'
             WHEN Idade BETWEEN 15 AND 24 THEN '15-24'
             WHEN Idade BETWEEN 25 AND 34 THEN '25-34'
             WHEN Idade BETWEEN 35 AND 44 THEN '35-44'
             WHEN Idade BETWEEN 45 AND 54 THEN '45-54'
             WHEN Idade BETWEEN 55 AND 64 THEN '55-64'
             ELSE '65+' 
           END as age_group,
           COUNT(*) as population
    FROM census_data
    GROUP BY age_group
    ORDER BY age_group
""")

# 
distribuicao_idade.show()

+---------+----------+
|age_group|population|
+---------+----------+
|     0-14|         2|
|    15-24|      3340|
|    25-34|     15870|
|    35-44|     21505|
|    45-54|     19920|
|    55-64|     11881|
|      65+|      4322|
+---------+----------+



In [0]:
# Taxas de alfabetização e escolaridade
spark.sql("""
    SELECT UF, 
           AVG(CASE WHEN `Anos de Estudo` >= 1 THEN 1 ELSE 0 END) as literacy_rate,
           AVG(`Anos de Estudo`) as avg_years_of_education
    FROM census_data
    GROUP BY UF
    ORDER BY UF
""").show()



+---+-------------+----------------------+
| UF|literacy_rate|avg_years_of_education|
+---+-------------+----------------------+
| 11|          1.0|     8.811971372804164|
| 12|          1.0|     8.398078975453576|
| 13|          1.0|     9.361740707162285|
| 14|          1.0|     9.916666666666666|
| 15|          1.0|     8.672285906945381|
| 16|          1.0|           10.05078125|
| 17|          1.0|     9.027565084226646|
| 21|          1.0|     7.518746502518187|
| 22|          1.0|     7.014037985136251|
| 23|          1.0|     8.665078892527537|
| 24|          1.0|     8.823227132579651|
| 25|          1.0|     8.165620094191523|
| 26|          1.0|     9.214921465968587|
| 27|          1.0|     7.838316722037653|
| 28|          1.0|     7.337995337995338|
| 29|          1.0|     8.725730278117894|
| 31|          1.0|     9.129846474108769|
| 32|          1.0|     9.466578424884183|
| 33|          1.0|    10.547516198704104|
| 35|          1.0|    10.639044943820224|
+---+------

In [0]:
# Distribuição de renda da população
spark.sql("""
    SELECT UF,
           PERCENTILE_APPROX(Renda, 0.5) as median_income,
           AVG(Renda) as avg_income,
           STDDEV(Renda) as income_stddev
    FROM census_data
    GROUP BY UF
    ORDER BY UF
""").show()


+---+-------------+------------------+------------------+
| UF|median_income|        avg_income|     income_stddev|
+---+-------------+------------------+------------------+
| 11|         1200| 1789.761223162004| 2406.161161153445|
| 12|          900|1506.0917822838846|2276.2334150835873|
| 13|          900|1445.1300997280146|1757.9355914283663|
| 14|         1000| 1783.588888888889|2079.6592381481846|
| 15|          850|1399.0768712070128|2053.7795545208614|
| 16|         1200|    1861.353515625|2020.6886316340306|
| 17|         1000|1771.0949464012251| 2934.590740977675|
| 21|          700|1019.4320089535535| 1887.816904937352|
| 22|          750|  1074.55078447564|2373.3557261812466|
| 23|          789|1255.4036915748734|1821.9635356536114|
| 24|          800|  1344.72147995889|1651.8055003631348|
| 25|          788| 1293.370486656201|1950.2724310934977|
| 26|          900|1527.0793193717277|  2389.62249744054|
| 27|          788|1144.5526024363235| 1237.856196693837|
| 28|         

#### CONCLUSÃO
Este pipeline demonstra a capacidade de manipular, limpar, modelar e analisar grandes conjuntos de dados usando PySpark no Databricks, e serve como um exemplo robusto de como estruturar e documentar um projeto de análise de dados de ponta a ponta. Essas consultas SQL permitirão que você responda às perguntas sobre a distribuição populacional, composição etária, taxas de alfabetização e escolaridade, e distribuição de renda da população com base nos dados disponíveis.