
## Projeto de Engenharia de Dados

**1. Introdução**

Este projeto tem como objetivo processar e analisar dados históricos sobre pessoas, utilizando o Apache Spark e a arquitetura de Data Lake. A base de dados inclui informações como ano de nascimento, falecimento e profissões, e será organizada em um modelo de Data Warehouse com camadas Bronze, Silver e Gold. O pipeline de dados será responsável pela ingestão, transformação e modelagem dos dados, para permitir a criação de métricas e insights, como expectativa de vida, profissões mais comuns e pessoas que viveram mais de 90 anos.

Tecnologias Utilizadas:
Apache Spark: Para processamento distribuído e transformações em grandes volumes de dados.

**Data Lake:** Para armazenar dados brutos antes das transformações.

**Data Warehouse:** Para estruturar os dados em um modelo dimensional, com camadas Bronze, Silver e Gold.

**Delta Lake:** Para garantir consistência, controle de versões e transações ACID durante a carga e atualização dos dados.

O projeto visa responder a perguntas de negócio relacionadas à longevidade, profissões e tendências históricas de nascimento e falecimento.

**2. Objetivo do Trabalho**

O objetivo deste projeto é realizar a ingestão, transformação, modelagem e análise de dados históricos sobre pessoas, com informações como ano de nascimento, falecimento e profissões. Através da utilização do Apache Spark e da arquitetura Data Lake, o projeto visa construir uma arquitetura escalável para armazenar e processar grandes volumes de dados, utilizando uma plataforma de nuvem.

O projeto se concentrará na criação de um Data Warehouse com camadas Bronze, Silver e Gold, possibilitando a análise eficiente das informações e a geração de métricas de interesse, como expectativa de vida média, profissões mais comuns e longevidade das pessoas. A modelagem será feita com base em um modelo estrela, estruturando os dados em tabelas de dimensão e fatos.

As principais atividades do projeto incluem a ingestão e validação dos dados de diferentes fontes, a transformação dos dados brutos em um formato estruturado, a modelagem dimensional para facilitar consultas analíticas, e a análise de métricas que possam responder a perguntas de negócio relacionadas à longevidade, profissões e padrões temporais dos indivíduos.

Este projeto permitirá a geração de insights sobre a evolução histórica da população, além de fornecer uma base robusta para futuras análises.

**Perguntas a Serem Respondidas**

- Qual é a expectativa de vida média das pessoas ao longo das décadas?
- Quais são as profissões mais comuns entre as pessoas na base de dados?
- Quais pessoas viveram mais de 90 anos e qual foi a longevidade delas?
- Há alguma tendência histórica no nascimento e falecimento das pessoas com base nos séculos e décadas?
- Quais pessoas viveram mais tempo?
- Como o tempo (século, década) impacta na longevidade das pessoas?



**3. Etapas do Projeto**

**3.1. Ingestão de Dados**

A ingestão dos dados será realizada de forma flexível, a partir de diferentes fontes de dados, que podem incluir uma tabela existente no Apache Spark ou arquivos no formato TSV. Caso os arquivos não estejam disponíveis ou haja necessidade de integração com outras fontes de dados, o Data Lake será utilizado como fonte alternativa para garantir a continuidade do fluxo de trabalho. O processo de ingestão será conduzido em etapas distintas, conforme descrito abaixo:

**Leitura de Tabelas:** O processo de leitura inicial consistirá em acessar a tabela _name_basics__1__tsv_gz_, que contém os dados de interesse sobre pessoas (como nome, nascimento, falecimento e profissão). Essa tabela será lida diretamente através do comando spark.table para importar os dados para o ambiente de processamento do Apache Spark. Caso a tabela esteja em um formato diferente, como TSV comprimido, será utilizado o comando apropriado para descompactação e leitura, garantindo a flexibilidade para lidar com múltiplos formatos de dados.

**Visualização Inicial:** Após a leitura e carga dos dados para o ambiente de trabalho, será realizado um processo de inspeção preliminar utilizando o comando _display(df)_. Este comando permite a visualização das primeiras linhas do DataFrame carregado, possibilitando uma análise rápida da estrutura dos dados, verificando se todas as colunas de interesse estão presentes e se os dados estão no formato correto. A visualização inicial é essencial para a identificação de inconsistências nos dados, como valores ausentes, formatos incorretos ou anomalias que possam impactar as etapas subsequentes de transformação e modelagem.

**Validação e Limpeza de Dados**: A partir da visualização inicial, serão realizadas verificações adicionais para garantir a integridade dos dados. Isso inclui a validação de tipos de dados, a identificação de registros duplicados e a limpeza de entradas errôneas ou inconsistentes. Caso sejam encontrados problemas, como campos ausentes ou dados fora de formato esperado, as correções serão aplicadas antes de avançar para a próxima fase de transformação dos dados.



In [0]:
# Ler a tabela diretamente
df = spark.table("default.name_basics__1__tsv_gz")

# Mostrar os dados
display(df)

nconst,primaryName,birthYear,deathYear,primaryProfession,knownForTitles
nm0000001,Fred Astaire,1899,1987,"actor,miscellaneous,producer","tt0072308,tt0050419,tt0027125,tt0031983"
nm0000002,Lauren Bacall,1924,2014,"actress,soundtrack,archive_footage","tt0037382,tt0075213,tt0117057,tt0038355"
nm0000003,Brigitte Bardot,1934,\N,"actress,music_department,producer","tt0057345,tt0049189,tt0056404,tt0054452"
nm0000004,John Belushi,1949,1982,"actor,writer,music_department","tt0072562,tt0077975,tt0080455,tt0078723"
nm0000005,Ingmar Bergman,1918,2007,"writer,director,actor","tt0050986,tt0069467,tt0050976,tt0083922"
nm0000006,Ingrid Bergman,1915,1982,"actress,producer,soundtrack","tt0034583,tt0038109,tt0036855,tt0038787"
nm0000007,Humphrey Bogart,1899,1957,"actor,producer,miscellaneous","tt0034583,tt0043265,tt0037382,tt0033870"
nm0000008,Marlon Brando,1924,2004,"actor,director,writer","tt0078788,tt0068646,tt0047296,tt0070849"
nm0000009,Richard Burton,1925,1984,"actor,producer,director","tt0061184,tt0087803,tt0059749,tt0057877"
nm0000010,James Cagney,1899,1986,"actor,director,producer","tt0029870,tt0031867,tt0042041,tt0034236"


**3.2. Preparação e Transformação dos Dados**

Após a ingestão dos dados, o processo de transformação e preparação das informações será fundamental para garantir que o conjunto de dados esteja estruturado de maneira eficiente e adequada para análises posteriores. O fluxo de trabalho seguirá várias etapas, segmentando e organizando os dados nas diferentes camadas do modelo de dados (Bronze, Silver e Gold), a fim de permitir uma análise detalhada das informações. As principais transformações incluem:

_Análise de Longevidade:_
Uma das primeiras transformações será calcular a longevidade (expectativa de vida) das pessoas, utilizando as colunas birthYear (ano de nascimento) e deathYear (ano de falecimento). Para isso, será criada uma nova coluna chamada lifespan, que representará a diferença entre o ano de falecimento e o ano de nascimento das pessoas falecidas. O cálculo será realizado apenas para aqueles registros em que a coluna deathYear não é nula, ou seja, para as pessoas que já faleceram. Para os indivíduos ainda vivos (onde deathYear é nulo), a coluna lifespan será preenchida com um valor nulo ou com o ano atual, dependendo da lógica adotada.

Essa análise permitirá entender a longevidade média da população representada no conjunto de dados, além de oferecer insights sobre possíveis padrões de longevidade associados a determinados períodos ou características.

_Estatísticas Descritivas:_ Em seguida, serão realizadas análises estatísticas descritivas sobre a variável lifespan, a fim de obter uma visão geral do comportamento dos dados. As estatísticas a serem calculadas incluem:

_Média:_ A expectativa média de vida para o conjunto de pessoas presentes no dataset.

_Desvio Padrão:_ Medirá a variabilidade ou dispersão dos valores de longevidade em relação à média.

_Mínimo e Máximo:_ Identificarão a pessoa mais jovem e a pessoa mais velha do conjunto de dados, oferecendo uma visão dos extremos da longevidade.

Essas estatísticas serão úteis para avaliar se há algum padrão ou tendência nas idades de falecimento e qual a variabilidade dessas idades entre os indivíduos da base de dados.

_Top 10 Pessoas que Viveram Mais:_
Para identificar os indivíduos com maior longevidade, será realizada uma ordenação decrescente dos dados pela coluna lifespan. As 10 pessoas que viveram mais serão selecionadas com base nesse critério. Essa análise é importante para destacar casos excepcionais de longevidade, podendo até fornecer informações adicionais sobre fatores que contribuem para uma vida mais longa, como profissão, período histórico ou outras variáveis associadas ao conjunto de dados. A seleção das 10 pessoas mais longevas será uma métrica de interesse para possíveis análises mais profundas, como a correlação entre longevidade e outras características demográficas ou socioeconômicas.

Além disso, todas essas transformações serão realizadas de forma incremental nas camadas Silver e Gold do modelo de dados, garantindo que os dados sejam gradualmente refinados e organizados para análises mais avançadas, como a criação de métricas de negócios e o desenvolvimento de dashboards analíticos.

Essa fase de transformação é essencial para refinar e organizar os dados, preparando-os para as consultas analíticas mais avançadas nas próximas fases do projeto.

In [0]:
from pyspark.sql import functions as F

# Criar DataFrame básico
df = spark.table("default.name_basics__1__tsv_gz").select(
    "primaryName", 
    "birthYear", 
    "deathyear"
)

# Análise básica de longevidade (para pessoas falecidas)
analysis_df = df.filter(F.col("deathyear").isNotNull()).withColumn(
    "lifespan", 
    F.col("deathyear") - F.col("birthYear")
)

# Estatísticas descritivas
display(analysis_df.select(
    F.mean("lifespan").alias("avg_lifespan"),
    F.stddev("lifespan").alias("stddev_lifespan"),
    F.min("lifespan").alias("min_lifespan"),
    F.max("lifespan").alias("max_lifespan")
))

# Top 10 pessoas que viveram mais
display(
    analysis_df.orderBy(F.desc("lifespan")).limit(10)
)

avg_lifespan,stddev_lifespan,min_lifespan,max_lifespan
71.29835795010746,17.325887464643536,-90.0,1945.0


primaryName,birthYear,deathyear,lifespan
Rosita Royce,9,1954,1945.0
Jeanne Louise Calment,1875,1997,122.0
Tatzumbia Dupea,1849,1970,121.0
Kabir Das,1398,1518,120.0
Sankardev,1449,1568,119.0
Durgabai Kamat,1879,1997,118.0
Fakir Lalon Shah,1772,1890,118.0
Maria Branyas,1907,2024,117.0
Misao Ogawa,1898,2015,117.0
Charlotte Hughes,1877,1993,116.0


**3.3. Criação das Tabelas Dimensões e Fato**

A modelagem dos dados seguirá o modelo dimensional tradicional, utilizando tabelas de dimensão e fato:

**DIM_PESSOAS:** Contém as informações sobre as pessoas, com um identificador único _pessoa_id_. As colunas incluem nome, ano de nascimento, ano de falecimento, profissão e outros dados relevantes.

**DIM_TEMPO:** Armazena as informações temporais, como o ano, década e século de nascimento e falecimento.

**FATO_BIOS:** Contém os eventos biográficos das pessoas, associando as tabelas de dimensões _DIM_PESSOAS_ e _DIM_TEMPO_ aos dados de nascimento e falecimento.

**3.4. Otimização das Tabelas**

A otimização das tabelas é uma etapa crucial para garantir um desempenho eficiente nas consultas ao longo do tempo, especialmente quando lidamos com grandes volumes de dados. A abordagem adotada no modelo envolve o uso do comando OPTIMIZE e da técnica ZORDER, que visam melhorar significativamente a velocidade de leitura e o tempo de resposta das consultas executadas sobre as tabelas armazenadas no formato Delta Lake.

_Comando OPTIMIZE:_

O comando OPTIMIZE é utilizado no Delta Lake para compactar os arquivos de dados e melhorar a performance das operações de leitura. Quando grandes volumes de dados são processados, especialmente em arquiteturas de Data Lake, os dados podem ser armazenados em muitos pequenos arquivos, o que torna a leitura desses dados mais lenta, pois o sistema precisa acessar múltiplos arquivos pequenos para responder a uma consulta. O comando OPTIMIZE reorganiza esses arquivos de maneira eficiente, consolidando-os e criando arquivos maiores e mais eficientes para leitura.

Ao aplicar o OPTIMIZE em tabelas Delta, é possível reduzir o tempo de resposta das consultas, aumentando a eficiência geral da plataforma. Esse processo também minimiza o custo computacional ao evitar o acesso a uma grande quantidade de arquivos pequenos durante a execução de queries.

_Técnica ZORDER:_

A técnica ZORDER é usada em conjunto com o OPTIMIZE para melhorar ainda mais o desempenho das consultas que envolvem filtros baseados em colunas específicas. O ZORDER cria um índice multidimensional, que organiza os dados em um padrão mais eficiente para consultas de leitura, agrupando os dados de maneira inteligente de acordo com uma ou mais colunas.

Ao ordenar os dados em um padrão ZORDER, as colunas mais utilizadas nas consultas são fisicamente agrupadas no armazenamento, o que melhora significativamente o desempenho nas consultas de leitura. Por exemplo, se as consultas frequentes filtram os dados com base na coluna pessoa_id ou birthYear, é possível aplicar o ZORDER para otimizar o acesso a essas colunas. Isso reduz a quantidade de dados lidos e melhora a eficiência da execução das queries, especialmente quando há grandes volumes de dados em tabelas Delta.

**Implementação Prática**

No contexto do projeto, a otimização será aplicada principalmente nas tabelas DIM_PESSOAS, DIM_TEMPO e FATO_BIOS. Abaixo estão os detalhes de como as otimizações podem ser aplicadas:

**DIM_PESSOAS:** Como o pessoa_id será um identificador chave em várias consultas, pode-se aplicar o ZORDER para ordenar os dados pela coluna pessoa_id. Isso facilitará o acesso rápido a dados individuais ou grupos específicos de pessoas, quando essas informações forem frequentemente consultadas.

**DIM_TEMPO:** Como as análises de tempo (por exemplo, década, século) são fundamentais para muitas consultas, o ZORDER pode ser aplicado à coluna ano, decada, ou seculo, melhorando a performance nas consultas que envolvem análise temporal.

**FATO_BIOS:** A tabela de fatos é onde os dados mais volumosos são armazenados, então otimizações nesta tabela são essenciais. A coluna pessoa_id (referenciando a tabela de dimensões DIM_PESSOAS) e a coluna birth_tempo_id (referenciando a tabela de dimensões DIM_TEMPO) podem ser priorizadas para o uso do ZORDER. Isso agiliza as consultas que buscam eventos específicos associados a uma pessoa e a uma linha do tempo.

**Benefícios Esperados**

Redução de tempo de resposta: Consultas mais rápidas ao acessar dados com base nas colunas mais utilizadas.

Eficiência de armazenamento: Compactação de dados, resultando em um uso mais eficiente do armazenamento em disco.

Escalabilidade: Melhor desempenho em consultas, mesmo com grandes volumes de dados, o que permite a escalabilidade do sistema à medida que mais dados são carregados.


**4. Detalhamento da Modelagem dos Dados**

**4.1. Tabelas de Dimensão**

**DIM_PESSOAS:**

_pessoa_id (BIGINT)_: Identificador único de cada pessoa.

_primary_name (STRING)_: Nome principal da pessoa.

_birthYear (INT)_: Ano de nascimento.

_deathYear (INT)_: Ano de falecimento (se aplicável).

_etl_date (TIMESTAMP)_: Data de carga dos dados.

_etl_user (STRING)_: Usuário responsável pela carga.

In [0]:
%sql
CREATE TABLE IF NOT EXISTS DIM_PESSOAS (
    pessoa_id BIGINT,
    primary_name STRING NOT NULL,
    etl_date TIMESTAMP,
    etl_user STRING
)
USING DELTA
COMMENT 'Tabela de dimensão para armazenar informações das pessoas';

-- Otimização para consultas frequentes por ID
OPTIMIZE DIM_PESSOAS ZORDER BY (pessoa_id);

path,metrics
dbfs:/user/hive/warehouse/dim_pessoas,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 0, List(minCubeSize(107374182400), List(0, 0), List(0, 0), 0, List(0, 0), 0, null), 0, 0, 0, false, 0, 0, 1743110546587, 1743110550129, 8, 0, null, List(0, 0), 4, 4, 0)"


**DIM_TEMPO:**

_tempo_id (BIGINT):_ Identificador único para cada registro de tempo.

_ano (INT):_ Ano.

_decada (STRING):_ Década em que o ano pertence (ex: "1990s").

_seculo (STRING):_ Século ao qual o ano pertence (ex: "20º século").

_etl_date (TIMESTAMP)_: Data de carga dos dados.


In [0]:
%sql
CREATE TABLE IF NOT EXISTS DIM_TEMPO (
    tempo_id BIGINT,
    ano INT NOT NULL,
    decada STRING NOT NULL,
    seculo STRING NOT NULL,
    etl_date TIMESTAMP
)
USING DELTA
COMMENT 'Tabela de dimensão temporal para análise histórica';

-- Otimização para consultas por ano
OPTIMIZE DIM_TEMPO ZORDER BY (tempo_id, ano);

path,metrics
dbfs:/user/hive/warehouse/dim_tempo,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 0, List(minCubeSize(107374182400), List(0, 0), List(0, 0), 0, List(0, 0), 0, null), 0, 0, 0, false, 0, 0, 1743110569244, 1743110571278, 8, 0, null, List(0, 0), 5, 5, 0)"


**4.2. Tabela de Fatos**

**FATO_BIOS:**

_fato_id (BIGINT):_ Identificador único para cada registro de fato.

_pessoa_id (BIGINT):_ Referência ao identificador da pessoa na tabela DIM_PESSOAS.

_birth_tempo_id (BIGINT)_: Referência ao tempo de nascimento na tabela DIM_TEMPO.

_death_tempo_id (BIGINT):_ Referência ao tempo de falecimento na tabela DIM_TEMPO.

_lifespan_years (INT):_ Longevidade da pessoa (em anos).

_is_alive (BOOLEAN):_ Indica se a pessoa está viva.

_etl_date (TIMESTAMP):_ Data de carga dos dados.

In [0]:
%sql
CREATE TABLE IF NOT EXISTS FATO_BIOS (
    fato_id BIGINT,
    pessoa_id BIGINT NOT NULL,
    birth_tempo_id BIGINT NOT NULL,
    death_tempo_id BIGINT,
    lifespan_years INT,
    is_alive BOOLEAN,
    etl_date TIMESTAMP
)
USING DELTA
COMMENT 'Tabela fato com eventos biográficos';

-- Otimização para joins frequentes
OPTIMIZE FATO_BIOS ZORDER BY (pessoa_id, birth_tempo_id, death_tempo_id);

path,metrics
dbfs:/user/hive/warehouse/fato_bios,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 0, List(minCubeSize(107374182400), List(0, 0), List(0, 0), 0, List(0, 0), 0, null), 0, 0, 0, false, 0, 0, 1743110587192, 1743110588668, 8, 0, null, List(0, 0), 7, 7, 0)"


**5. Processamento e Armazenamento no Delta Lake**

O Delta Lake será a plataforma escolhida para o armazenamento e processamento das tabelas do projeto, pois oferece uma série de funcionalidades essenciais para garantir a integridade e eficiência do fluxo de dados, mesmo em ambientes de grande escala. Entre as vantagens principais do Delta Lake estão as transações ACID, o gerenciamento de esquemas dinâmicos e o histórico de versões, que possibilitam uma abordagem robusta e escalável para a ingestão, transformação e análise de dados.

**Transações ACID**

O Delta Lake garante transações ACID (Atomicidade, Consistência, Isolamento e Durabilidade), o que significa que todas as operações de leitura e escrita nas tabelas são feitas de forma transacional, garantindo que o sistema esteja sempre em um estado consistente, mesmo em casos de falhas durante o processamento. Isso é especialmente importante em um ambiente de dados em constante atualização, como é o caso do processamento em Data Lakes, onde múltiplos processos podem escrever ou ler dados simultaneamente.

A atomicidade garante que uma série de operações de escrita seja tratada como uma única transação, evitando que os dados fiquem em um estado inconsistente caso algum erro ocorra durante o processo. A durabilidade assegura que, uma vez que os dados foram gravados, eles não serão perdidos, mesmo em caso de falhas no sistema.

**Gerenciamento de Esquemas**

O Delta Lake também oferece gerenciamento de esquemas dinâmico. Quando novos dados são ingeridos, o Delta Lake verifica automaticamente o esquema dos dados e pode aplicar alterações sem necessidade de reescrever toda a tabela. A capacidade de gerenciar e alterar esquemas de forma incremental permite que a estrutura de dados se adapte rapidamente a mudanças nos dados de origem, como a adição de novas colunas ou a alteração do tipo de dado de uma coluna existente.

Para garantir que o esquema das tabelas seja mantido atualizado, a opção mergeSchema=true será utilizada durante a operação de escrita dos dados. Isso permite que novas colunas ou alterações de esquema sejam automaticamente integradas nas tabelas, sem a necessidade de reescrever completamente os dados ou interromper as operações de leitura/escrita em andamento. Esse processo facilita a manutenção do esquema à medida que os dados evoluem ao longo do tempo.

**Histórico de Versões**

Outra funcionalidade importante do Delta Lake é o histórico de versões. Cada vez que uma tabela é alterada (seja por inserção, atualização ou exclusão de dados), o Delta Lake cria uma nova versão dos dados, mantendo um histórico completo de todas as modificações realizadas. Isso permite reverter as tabelas para estados anteriores, realizar auditorias detalhadas e garantir a rastreabilidade completa das transformações aplicadas aos dados ao longo do tempo.

Além disso, o histórico de versões oferece flexibilidade para realizar operações de time travel, ou seja, acessar o estado da tabela em um ponto específico no tempo. Esse recurso pode ser útil em casos de necessidade de recuperação de dados ou para análise de como os dados evoluíram ao longo do tempo.

**Processamento Incremental**

Com o Delta Lake, o processamento de dados pode ser feito de maneira incremental, o que significa que apenas os novos dados ou as alterações realizadas nas fontes de dados precisam ser carregados e processados. Esse tipo de abordagem reduz significativamente o tempo de processamento e os recursos necessários para a atualização das tabelas, uma vez que evita a necessidade de reprocessar dados antigos que não sofreram modificações.

O _mergeSchema=true_ é especialmente útil neste cenário, pois permite que novos dados sejam integrados de forma incremental ao modelo de dados existente, sem precisar recriar as tabelas ou reprocessar todo o conjunto de dados. Esse comportamento é ideal para cenários em que os dados estão sendo constantemente atualizados ou adicionados.

**Armazenamento no Formato Delta**

As tabelas de dados serão armazenadas no formato Delta, que é uma extensão do formato Parquet, permitindo otimizações de leitura e gravação. O Delta Lake organiza os dados em arquivos Parquet altamente compactados, que são ideais para grandes volumes de dados, garantindo que as leituras e gravações sejam rápidas e eficientes. O formato Delta adiciona a camada de transações e controle de versões sobre o Parquet, tornando-o adequado para sistemas que requerem consistência e flexibilidade no gerenciamento de dados.

**5.1. Criação de Métricas na Camada "Gold"**

Na camada Gold, o foco é gerar métricas e agregações que oferecem respostas detalhadas e valiosas às perguntas de negócios formuladas anteriormente. Essas métricas são cruciais para a análise de dados e para a extração de insights relevantes, que ajudarão a entender melhor o comportamento, as tendências e as características da população representada nos dados. A camada Gold é a camada final de processamento, onde os dados estão limpos, agregados e prontos para análise avançada.

- **Expectativa de Vida Média por Década:** 

Uma das principais métricas geradas na camada Gold será a expectativa de vida média por década. Este cálculo visa determinar a média da longevidade das pessoas com base no ano de nascimento. Para isso, será necessário:

Agrupar os dados de longevidade por década de nascimento. A década será derivada do ano de nascimento de cada pessoa, ou seja, por exemplo, para uma pessoa nascida em 1945, a década seria "1940s".

Calcular a média da expectativa de vida (coluna lifespan) para cada uma dessas décadas.

- **Profissões Mais Comuns**: 

Outro ponto importante é identificar as profissões mais comuns entre as pessoas na base de dados. A análise de profissão pode revelar padrões interessantes, como as ocupações mais prevalentes ao longo do tempo ou a relação entre profissão e longevidade.

Agrupar os dados por profissão (utilizando a coluna primaryProfession).

Calcular a frequência de cada profissão, identificando as mais recorrentes.

- **Pessoas que Viveram Mais de 90 Anos**: 

Uma análise importante para entender a longevidade das pessoas é identificar aquelas que viveram mais de 90 anos. Isso pode fornecer informações sobre características ou padrões de saúde e longevidade que merecem atenção. Para isso:

Filtrar os dados para selecionar as pessoas cuja expectativa de vida (coluna lifespan) seja maior que 90 anos.

Realizar uma análise adicional sobre esse subconjunto de pessoas, por exemplo, distribuindo-as por profissão, década de nascimento, ou outros atributos.

Esse filtro ajudará a destacar as pessoas que viveram por mais de 90 anos, um grupo que pode ter condições ou contextos de vida específicos que merecem ser analisados mais profundamente.

In [0]:
# Expectativa de vida por década
display(spark.sql("""
SELECT 
  t.decada,
  AVG(f.lifespan_years) as avg_lifespan,
  COUNT(*) as sample_size
FROM FATO_BIOS f
JOIN DIM_TEMPO t ON f.birth_tempo_id = t.tempo_id
WHERE f.lifespan_years IS NOT NULL
GROUP BY t.decada
ORDER BY t.decada
"""))

decada,avg_lifespan,sample_size
1800s,64.33333333333333,135
1810s,66.38888888888889,180
1820s,67.71162790697674,215
1830s,68.69364161849711,346
1840s,70.66934189406099,623
1850s,72.20498220640569,1405
1860s,71.40232047663845,3189
1870s,70.96968165740273,5937
1880s,71.33158765159868,10884
1890s,72.15223880597014,16080


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import LongType, IntegerType, StringType, TimestampType, BooleanType

# 1. Criar DIM_PESSOAS com schema explícito
nomes_df = spark.table("default.name_basics__1__tsv_gz").select(
    F.col("primaryName").alias("primary_name").cast(StringType())
).distinct()

window = Window.orderBy("primary_name")
pessoas_df = nomes_df.withColumn("pessoa_id", F.row_number().over(window).cast(LongType())) \
                   .withColumn("etl_date", F.current_timestamp().cast(TimestampType())) \
                   .withColumn("etl_user", F.lit("system_user").cast(StringType()))

# Escrever com mergeSchema para evitar conflitos
pessoas_df.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .option("overwriteSchema", "true") \
    .saveAsTable("DIM_PESSOAS")

# 2. Criar DIM_TEMPO com schema explícito
anos_df = spark.range(1800, 2101).select(F.col("id").alias("ano").cast(IntegerType()))

tempo_df = anos_df.withColumn("tempo_id", F.monotonically_increasing_id().cast(LongType())) \
                 .withColumn("decada", F.concat((F.floor(F.col("ano")/10)*10).cast(IntegerType()), F.lit("s"))) \
                 .withColumn("seculo", F.concat((F.floor(F.col("ano")/100)+1).cast(IntegerType()), F.lit("º século"))) \
                 .withColumn("etl_date", F.current_timestamp().cast(TimestampType()))

tempo_df.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .option("overwriteSchema", "true") \
    .saveAsTable("DIM_TEMPO")

# 3. Criar FATO_BIOS com schema explícito e tratamento de conflitos
dados_originais = spark.table("default.name_basics__1__tsv_gz").select(
    F.col("primaryName").alias("primary_name").cast(StringType()),
    F.col("birthYear").cast(IntegerType()),
    F.col("deathYear").cast(IntegerType())
)

fato_bios_df = dados_originais.join(
    spark.table("DIM_PESSOAS").select(
        F.col("pessoa_id").cast(LongType()),
        F.col("primary_name").cast(StringType())
    ),
    "primary_name",
    "inner"
).join(
    spark.table("DIM_TEMPO").select(
        F.col("tempo_id").cast(LongType()),
        F.col("ano").cast(IntegerType())
    ),
    F.col("birthYear") == F.col("ano"),
    "inner"
).join(
    spark.table("DIM_TEMPO").select(
        F.col("tempo_id").cast(LongType()).alias("death_tempo_id"),
        F.col("ano").cast(IntegerType()).alias("death_ano")
    ),
    F.col("deathYear") == F.col("death_ano"),
    "left"
).select(
    F.monotonically_increasing_id().cast(LongType()).alias("fato_id"),
    F.col("pessoa_id").cast(LongType()),
    F.col("tempo_id").cast(LongType()).alias("birth_tempo_id"),
    F.col("death_tempo_id").cast(LongType()),
    (F.col("deathYear") - F.col("birthYear")).cast(IntegerType()).alias("lifespan_years"),
    F.col("deathYear").isNull().cast(BooleanType()).alias("is_alive"),
    F.current_timestamp().cast(TimestampType()).alias("etl_date")
)

# Escrever com tratamento de schema
fato_bios_df.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .option("overwriteSchema", "true") \
    .saveAsTable("FATO_BIOS")

# Otimizar tabelas
for table in ["DIM_PESSOAS", "DIM_TEMPO", "FATO_BIOS"]:
    spark.sql(f"OPTIMIZE {table}")

In [0]:
# Verificar se as tabelas foram criadas corretamente
display(spark.sql("SHOW TABLES"))
display(spark.sql("DESCRIBE DETAIL DIM_PESSOAS"))
display(spark.sql("SELECT COUNT(*) FROM FATO_BIOS"))

database,tableName,isTemporary
default,dim_pessoas,False
default,dim_tempo,False
default,fato_bios,False
default,name_basics__1__tsv_gz,False


format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures,statistics
delta,23cd06e1-87b4-4dcb-923c-2a47ac183ce2,spark_catalog.default.dim_pessoas,Tabela de dimensão para armazenar informações das pessoas,dbfs:/user/hive/warehouse/dim_pessoas,2025-03-27T21:19:47.313+0000,2025-03-27T21:53:43.000+0000,List(),1,138595735,Map(),1,6,"List(appendOnly, changeDataFeed, checkConstraints, generatedColumns, identityColumns, invariants)",Map()


count(1)
641111


**6. Processos Adicionais**

**Verificação de Dados Duplicados:** Serão realizadas consultas para garantir que não há duplicações nas tabelas de dimensões, especialmente na tabela _DIM_PESSOAS_, que pode ter entradas repetidas com o mesmo _pessoa_id_.

In [0]:
# Verificar duplicatas em DIM_PESSOAS
display(spark.sql("""
SELECT pessoa_id, COUNT(*) as count 
FROM DIM_PESSOAS 
GROUP BY pessoa_id 
HAVING COUNT(*) > 1
"""))

pessoa_id,count


**Verificação de Correspondências Faltantes**: Será verificado se existem registros na tabela _FATO_BIOS_ que não possuem correspondência na tabela_ DIM_PESSOAS_, garantindo a integridade dos dados.

In [0]:
# Verificar pessoas sem correspondência
display(spark.sql("""
SELECT count(*) as missing_pessoas
FROM FATO_BIOS f
LEFT JOIN DIM_PESSOAS p ON f.pessoa_id = p.pessoa_id
WHERE p.pessoa_id IS NULL
"""))

missing_pessoas
0


In [0]:
# Listar todas as tabelas no schema default
tables = spark.sql("SHOW TABLES IN default").collect()
print("Tabelas disponíveis no schema default:")
for table in tables:
    print(f"- {table.tableName}")

Tabelas disponíveis no schema default:
- dim_pessoas
- dim_tempo
- fato_bios
- name_basics__1__tsv_gz


In [0]:
# Procurar tabelas com 'name' ou 'person' no nome
possible_tables = [t.tableName for t in tables 
                  if 'name' in t.tableName.lower() 
                  or 'person' in t.tableName.lower()
                  or 'basics' in t.tableName.lower()]

if possible_tables:
    print("\nTabelas possíveis encontradas:")
    for table in possible_tables:
        print(f"- {table}")
        # Mostrar estrutura da tabela
        print(f"  Estrutura: {spark.table(f'default.{table}').columns}")
else:
    print("\nNenhuma tabela com nome similar encontrada.")


Tabelas possíveis encontradas:
- name_basics__1__tsv_gz
  Estrutura: ['nconst', 'primaryName', 'birthYear', 'deathYear', 'primaryProfession', 'knownForTitles']


In [0]:
# Substitua pelo nome correto da tabela encontrada
df = spark.table(f"default.{possible_tables[0]}")
display(df.limit(5))

nconst,primaryName,birthYear,deathYear,primaryProfession,knownForTitles
nm0000001,Fred Astaire,1899,1987,"actor,miscellaneous,producer","tt0072308,tt0050419,tt0027125,tt0031983"
nm0000002,Lauren Bacall,1924,2014,"actress,soundtrack,archive_footage","tt0037382,tt0075213,tt0117057,tt0038355"
nm0000003,Brigitte Bardot,1934,\N,"actress,music_department,producer","tt0057345,tt0049189,tt0056404,tt0054452"
nm0000004,John Belushi,1949,1982,"actor,writer,music_department","tt0072562,tt0077975,tt0080455,tt0078723"
nm0000005,Ingmar Bergman,1918,2007,"writer,director,actor","tt0050986,tt0069467,tt0050976,tt0083922"


In [0]:
schemas = ["bronze", "silver", "gold", "raw", "source", "default", "staging"]

for schema in schemas:
    try:
        print(f"\nTabelas no schema {schema}:")
        display(spark.sql(f"SHOW TABLES IN {schema}"))
    except:
        print(f"Schema {schema} não existe ou não pode ser acessado")


Tabelas no schema bronze:
Schema bronze não existe ou não pode ser acessado

Tabelas no schema silver:
Schema silver não existe ou não pode ser acessado

Tabelas no schema gold:
Schema gold não existe ou não pode ser acessado

Tabelas no schema raw:
Schema raw não existe ou não pode ser acessado

Tabelas no schema source:
Schema source não existe ou não pode ser acessado

Tabelas no schema default:


database,tableName,isTemporary
default,dim_pessoas,False
default,dim_tempo,False
default,fato_bios,False
default,name_basics__1__tsv_gz,False



Tabelas no schema staging:
Schema staging não existe ou não pode ser acessado


In [0]:
from pyspark.sql import functions as F

def find_and_load_table(table_name):
    """Tenta encontrar a tabela em vários schemas possíveis"""
    schemas_to_try = ["bronze", "default", "raw", "source"]
    
    for schema in schemas_to_try:
        try:
            df = spark.table(f"{schema}.{table_name}")
            print(f"Tabela encontrada em: {schema}.{table_name}")
            return df
        except:
            continue
    
    # Se não encontrar, mostra as tabelas disponíveis
    print("Tabela não encontrada. Schemas e tabelas disponíveis:")
    display(spark.sql("SHOW SCHEMAS"))
    for schema in schemas_to_try:
        try:
            display(spark.sql(f"SHOW TABLES IN {schema}"))
        except:
            continue
    raise ValueError(f"Tabela {table_name} não encontrada nos schemas: {', '.join(schemas_to_try)}")

# 1. Tentar carregar a tabela
try:
    df = find_and_load_table("name_basics")
    
    # 2. Análise básica dos dados
    print("\nAnálise dos dados encontrados:")
    print(f"Total de registros: {df.count()}")
    print("\nVisualização dos dados:")
    display(df.limit(5))
    
    # 3. Criar dimensão temporal temporária
    min_year = df.select(F.min("birthYear")).collect()[0][0] or 1800
    max_year = df.select(F.max("deathYear")).collect()[0][0] or 2100
    
    dim_tempo = spark.range(min_year, max_year + 1).select(
        F.col("id").alias("ano"),
        F.concat((F.floor(F.col("id")/10)*10).cast("int"), F.lit("s")).alias("decada"),
        F.concat((F.floor(F.col("id")/100)+1).cast("int"), F.lit("º século")).alias("seculo")
    )
    
    # 4. Análise combinada
    analysis = df.join(
        dim_tempo.alias("nasc"),
        df["birthYear"] == F.col("nasc.ano"),
        "left"
    ).select(
        df["*"],
        F.col("nasc.decada").alias("decada_nascimento"),
        F.col("nasc.seculo").alias("seculo_nascimento"),
        (F.col("deathYear") - F.col("birthYear")).alias("anos_vida")
    )
    
    print("\nAnálise combinada com dimensão temporal:")
    display(analysis.limit(5))
    
except Exception as e:
    print(f"Erro: {str(e)}")

Tabela não encontrada. Schemas e tabelas disponíveis:


databaseName
default


database,tableName,isTemporary
default,dim_pessoas,False
default,dim_tempo,False
default,fato_bios,False
default,name_basics__1__tsv_gz,False


Erro: Tabela name_basics não encontrada nos schemas: bronze, default, raw, source


**7. Criação de Schemas e Carregamento de Dados**

A arquitetura de camadas do Data Lake será implementada com o objetivo de proporcionar um fluxo de dados eficiente, escalável e estruturado, utilizando as camadas Bronze, Silver e Gold. Cada uma dessas camadas tem um papel específico no processo de ingestão, transformação e análise de dados, garantindo que o processo de ETL (extração, transformação e carregamento) seja realizado de maneira eficiente e com qualidade.

**Bronze:** A camada Bronze é a camada inicial do pipeline de dados, responsável pela ingestão de dados brutos. Nessa camada, os dados são carregados diretamente de suas fontes originais, como arquivos TSV ou tabelas de um Data Lake. O objetivo principal dessa camada é armazenar os dados sem grandes transformações, o que garante que todas as informações originais sejam preservadas para auditoria, rastreabilidade e posteriores ajustes.

Processo de ingestão: Os dados são carregados de forma semi-estruturada ou bruta, com mínima limpeza ou alteração, o que mantém a flexibilidade para ajustes e transformações posteriores.

Armazenamento: Nessa camada, os dados podem ser armazenados em seu formato nativo, como arquivos CSV, TSV ou Parquet, e também podem ser convertidos para o formato Delta para garantir maior consistência e controle de versões.

**Silver:** Na camada Silver, os dados são limpos, transformados e estruturados. Aqui, são realizadas as primeiras transformações importantes para preparar os dados para análises mais complexas. As principais tarefas dessa camada incluem:

Limpeza de dados: Remoção de registros duplicados, tratamento de valores ausentes e correção de inconsistências nos dados. Isso assegura que os dados a serem utilizados nas próximas etapas estejam consistentes e prontos para análises.

Estruturação dos dados: Criação de tabelas de dimensões, como DIM_PESSOAS e DIM_TEMPO, onde as informações são modeladas e organizadas de acordo com as necessidades de negócios.

DIM_PESSOAS incluirá dados como identificação única das pessoas, nome, data de nascimento, data de falecimento, e outras informações relevantes.

DIM_TEMPO incluirá informações temporais, como ano, década e século, para facilitar análises históricas e tendências.

Armazenamento: Os dados transformados são armazenados em Delta Lake para garantir transações ACID e controle de versões, oferecendo maior flexibilidade para integração de novos dados e recuperação de versões anteriores, sem reescrever completamente as tabelas.

**Gold:** A camada Gold é a camada final, onde são realizadas as análises avançadas e a geração de métricas de valor para negócios. As principais atividades desta camada incluem:

Cálculo de métricas: Baseado nas tabelas transformadas nas camadas anteriores, são realizadas as agregações e cálculos necessários para responder às perguntas de negócios e gerar insights.

Por exemplo, o cálculo da expectativa de vida média por década, a análise das profissões mais comuns entre as pessoas da base de dados, e a identificação de pessoas que viveram mais de 90 anos.

Armazenamento de resultados finais: As métricas calculadas e as tabelas de análises são armazenadas na camada Gold, organizadas para consultas rápidas e eficientes. Esses dados são os mais "refinados", prontos para serem acessados por ferramentas analíticas ou dashboards de visualização de dados.

Utilização do Delta Lake: A camada Gold também se beneficia do Delta Lake para garantir que os dados permanecem consistentes e versionados, permitindo ajustes incrementais sem a necessidade de reprocessamento total dos dados.

In [0]:
# Criar schemas se não existirem
spark.sql("CREATE SCHEMA IF NOT EXISTS bronze")
spark.sql("CREATE SCHEMA IF NOT EXISTS silver")
spark.sql("CREATE SCHEMA IF NOT EXISTS gold")

Out[72]: DataFrame[]

In [0]:
from pyspark.sql import functions as F

# Carregar dados - ajuste para sua fonte real
try:
    df = spark.table("bronze.name_basics")
except:
    # Se não encontrar no bronze, tentar carregar de arquivo
    try:
        df = spark.read.format("csv") \
            .option("header", "true") \
            .option("sep", "\t") \
            .load("/FileStore/tables/name_basics.tsv")
    except Exception as e:
        print(f"Erro ao carregar dados: {str(e)}")
        raise

In [0]:
from pyspark.sql import functions as F

# 1. Criar schemas se não existirem
spark.sql("CREATE SCHEMA IF NOT EXISTS bronze")
spark.sql("CREATE SCHEMA IF NOT EXISTS silver") 
spark.sql("CREATE SCHEMA IF NOT EXISTS gold")

# 2. Carregar dados de origem (ajuste conforme necessário)
try:
    df = spark.table("bronze.name_basics")
except:
    df = spark.read.csv("/FileStore/tables/name_basics.tsv", header=True, sep="\t")

# 3. Criar DIM_PESSOAS
pessoas_df = df.select(
    F.col("nconst").alias("pessoa_id"),
    F.col("primaryName").alias("nome_principal"),
    F.col("birthYear").cast("int"),
    F.col("deathYear").cast("int"),
    F.col("primaryProfession").alias("profissao_principal"),
    F.col("knownForTitles").alias("titulos_conhecidos"),
    F.current_timestamp().alias("dt_carga")
)

pessoas_df.write.format("delta").mode("overwrite").saveAsTable("silver.DIM_PESSOAS")

# 4. Criar DIM_TEMPO (versão corrigida)
anos = range(1800, 2101)
tempo_data = [(ano, f"{ano//10*10}s", f"{(ano//100)+1}º século") for ano in anos]
tempo_df = spark.createDataFrame(tempo_data, ["ano", "decada", "seculo"])
tempo_df = tempo_df.withColumn("dt_carga", F.current_timestamp())

# Escrita da tabela em formato simplificado
tempo_df.write.format("delta").mode("overwrite").saveAsTable("silver.DIM_TEMPO")

# 5. Criar métricas na camada OURO
spark.sql("""
CREATE OR REPLACE TABLE gold.metrica_expectativa_vida
AS
SELECT 
    FLOOR(birthYear/10)*10 AS decada_nascimento,
    AVG(deathYear - birthYear) AS expectativa_vida_media,
    COUNT(*) AS quantidade_pessoas,
    CURRENT_TIMESTAMP() AS dt_metrica
FROM silver.DIM_PESSOAS
WHERE deathYear IS NOT NULL
GROUP BY FLOOR(birthYear/10)*10
ORDER BY decada_nascimento
""")

print("Modelo estrela criado com sucesso!")
display(spark.sql("SHOW TABLES IN silver"))
display(spark.sql("SHOW TABLES IN gold"))

Modelo estrela criado com sucesso!


database,tableName,isTemporary
silver,dim_pessoas,False
silver,dim_tempo,False


database,tableName,isTemporary
gold,metrica_expectativa_vida,False


In [0]:
# Métrica 1: Expectativa de vida por década
spark.sql("""
CREATE OR REPLACE TABLE gold.metrica_expectativa_vida
AS
SELECT 
    FLOOR(birthYear/10)*10 AS decada_nascimento,
    AVG(deathYear - birthYear) AS expectativa_vida_media,
    COUNT(*) AS quantidade_pessoas,
    CURRENT_TIMESTAMP() AS dt_metrica
FROM silver.DIM_PESSOAS
WHERE deathYear IS NOT NULL
GROUP BY FLOOR(birthYear/10)*10
ORDER BY decada_nascimento
""")

# Métrica 2: Profissões mais comuns
spark.sql("""
CREATE OR REPLACE TABLE gold.metrica_profissoes
AS
SELECT 
    profissao_principal,
    COUNT(*) AS total_pessoas,
    CURRENT_TIMESTAMP() AS dt_metrica
FROM silver.DIM_PESSOAS
WHERE profissao_principal IS NOT NULL
GROUP BY profissao_principal
ORDER BY total_pessoas DESC
""")

# Métrica 3: Pessoas que viveram mais de 90 anos
spark.sql("""
CREATE OR REPLACE TABLE gold.metrica_longevos
AS
SELECT 
    pessoa_id,
    nome_principal,
    (deathYear - birthYear) AS anos_vida,
    profissao_principal,
    CURRENT_TIMESTAMP() AS dt_metrica
FROM silver.DIM_PESSOAS
WHERE deathYear IS NOT NULL 
AND (deathYear - birthYear) >= 90
ORDER BY anos_vida DESC
""")

Out[76]: DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
print("Tabelas criadas com sucesso:")
display(spark.sql("SHOW TABLES IN silver"))
display(spark.sql("SHOW TABLES IN gold"))

print("\nExemplo de métricas:")
display(spark.table("gold.metrica_expectativa_vida").limit(5))
display(spark.table("gold.metrica_profissoes").limit(5))

Tabelas criadas com sucesso:


database,tableName,isTemporary
silver,dim_pessoas,False
silver,dim_tempo,False


database,tableName,isTemporary
gold,metrica_expectativa_vida,False
gold,metrica_longevos,False
gold,metrica_profissoes,False



Exemplo de métricas:


decada_nascimento,expectativa_vida_media,quantidade_pessoas,dt_metrica
,,15173,2025-03-27T22:27:43.323+0000
0.0,1003.0,2,2025-03-27T22:27:43.323+0000
20.0,30.0,1,2025-03-27T22:27:43.323+0000
30.0,58.0,1,2025-03-27T22:27:43.323+0000
40.0,25.0,2,2025-03-27T22:27:43.323+0000


profissao_principal,total_pessoas,dt_metrica
\N,2792699,2025-03-27T22:27:50.602+0000
actor,2521125,2025-03-27T22:27:50.602+0000
actress,1617925,2025-03-27T22:27:50.602+0000
miscellaneous,823627,2025-03-27T22:27:50.602+0000
producer,488979,2025-03-27T22:27:50.602+0000


In [0]:
from pyspark.sql import functions as F

# 1. Criar schemas se não existirem
spark.sql("CREATE SCHEMA IF NOT EXISTS silver")
spark.sql("CREATE SCHEMA IF NOT EXISTS gold")

# 2. Carregar dados de origem
try:
    df = spark.table("bronze.name_basics")
except:
    df = spark.read.csv("/FileStore/tables/name_basics.tsv", sep="\t", header=True)

# 3. Criar DIM_PESSOAS
pessoas_df = df.select(
    F.col("nconst").alias("pessoa_id"),
    F.col("primaryName").alias("nome_principal"),
    F.col("birthYear").cast("int"),
    F.col("deathYear").cast("int"),
    F.col("primaryProfession").alias("profissao_principal"),
    F.col("knownForTitles").alias("titulos_conhecidos"),
    F.current_timestamp().alias("dt_carga")
)

pessoas_df.write.format("delta").mode("overwrite").saveAsTable("silver.DIM_PESSOAS")

# 4. Criar DIM_TEMPO 
anos = range(1800, 2101)
tempo_data = [(ano, f"{ano//10*10}s", f"{(ano//100)+1}º século") for ano in anos]
tempo_df = spark.createDataFrame(tempo_data, ["ano", "decada", "seculo"])
tempo_df = tempo_df.withColumn("dt_carga", F.current_timestamp())

tempo_df.write.format("delta").mode("overwrite").saveAsTable("silver.DIM_TEMPO")

# 5. Criar métricas na camada OURO
# Métrica 1: Expectativa de vida
spark.sql("""
CREATE OR REPLACE TABLE gold.metrica_expectativa_vida
AS
SELECT 
    FLOOR(birthYear/10)*10 AS decada_nascimento,
    AVG(deathYear - birthYear) AS expectativa_vida_media,
    COUNT(*) AS quantidade_pessoas,
    CURRENT_TIMESTAMP() AS dt_metrica
FROM silver.DIM_PESSOAS
WHERE deathYear IS NOT NULL
GROUP BY FLOOR(birthYear/10)*10
ORDER BY decada_nascimento
""")

# Métrica 2: Profissões mais comuns
spark.sql("""
CREATE OR REPLACE TABLE gold.metrica_profissoes
AS
SELECT 
    profissao_principal,
    COUNT(*) AS total_pessoas,
    CURRENT_TIMESTAMP() AS dt_metrica
FROM silver.DIM_PESSOAS
WHERE profissao_principal IS NOT NULL
GROUP BY profissao_principal
ORDER BY total_pessoas DESC
""")

print("Processo concluído com sucesso!")
display(spark.sql("SHOW TABLES IN silver"))
display(spark.sql("SHOW TABLES IN gold"))

Processo concluído com sucesso!


database,tableName,isTemporary
silver,dim_pessoas,False
silver,dim_tempo,False


database,tableName,isTemporary
gold,metrica_expectativa_vida,False
gold,metrica_longevos,False
gold,metrica_profissoes,False


**8- Catálogo de Dados**

O Catálogo de Dados será uma peça fundamental para garantir que todas as partes interessadas no projeto compreendam a estrutura e o conteúdo do modelo de dados utilizado no sistema. O catálogo irá funcionar como uma documentação viva, oferecendo uma descrição detalhada e precisa de todas as tabelas, campos e relações dentro do Data Warehouse. Ele permitirá que engenheiros de dados, analistas e outros stakeholders tenham acesso rápido e direto às informações, ajudando a realizar consultas eficientes e análises aprofundadas.

A seguir, um detalhamento de como o Catálogo de Dados será estruturado e como ele será utilizado:

**Estrutura do Catálogo de Dados**

O catálogo será organizado de maneira lógica e prática, com a divisão em três principais seções: Tabelas de Dimensão, Tabelas de Fato e Relacionamentos. Cada seção terá uma documentação abrangente que explicita as funções, características e conexões entre as tabelas.

**Tabelas de Dimensão**

As tabelas de dimensão armazenam informações sobre as entidades que são de interesse na análise. No caso deste projeto, as tabelas de dimensão incluem DIM_PESSOAS e DIM_TEMPO. Abaixo estão os detalhes de cada uma:

- **DIM_PESSOAS**

Descrição: Armazena informações detalhadas sobre as pessoas, incluindo identificação única, nome, ano de nascimento, ano de falecimento e outras características relevantes.

Campos:

- pessoa_id (BIGINT): Identificador único de cada pessoa.
- primary_name (STRING): Nome da pessoa.
- birthYear (INT): Ano de nascimento.
- deathYear (INT): Ano de falecimento.
- etl_date (TIMESTAMP): Data da carga dos dados.
- etl_user (STRING): Usuário que carregou os dados.

Chaves Primárias e Estrangeiras: O campo pessoa_id será a chave primária, sendo utilizado em outras tabelas, como a FATO_BIOS, como chave estrangeira para estabelecer a relação entre as tabelas.

- **DIM_TEMPO**

Descrição: Armazena as informações temporais associadas aos eventos de nascimento e falecimento das pessoas. Ela permite que se analise o comportamento dos dados ao longo do tempo.

Campos:

- tempo_id (BIGINT): Identificador único de cada unidade de tempo.
- ano (INT): O ano específico.
- decada (STRING): A década do ano (exemplo: "1990s").
- seculo (STRING): O século do ano (exemplo: "20º século").
- etl_date (TIMESTAMP): Data da carga dos dados.

Chaves Primárias e Estrangeiras: O campo tempo_id é a chave primária e será referenciado como chave estrangeira nas tabelas de fato, como FATO_BIOS, para indicar o tempo de nascimento e falecimento.

Tabelas de Fato

As tabelas de fato armazenam informações que são medidas quantitativas e eventos importantes, como nascimento, falecimento, longevidade e outros dados de interesse analítico. A tabela FATO_BIOS será a principal tabela de fatos neste projeto.

- **FATO_BIOS**

Descrição: Esta tabela armazena eventos biográficos das pessoas, conectando as dimensões de tempo e pessoas, bem como calculando informações como a longevidade das pessoas.

Campos:

- fato_id (BIGINT): Identificador único do evento biográfico.
- pessoa_id (BIGINT): Referência à tabela DIM_PESSOAS, identificando a pessoa relacionada ao evento.
- birth_tempo_id (BIGINT): Referência à tabela DIM_TEMPO, associando o tempo de nascimento.
- death_tempo_id (BIGINT): Referência à tabela DIM_TEMPO, associando o tempo de falecimento.
- lifespan_years (INT): A longevidade da pessoa em anos.
- is_alive (BOOLEAN): Indica se a pessoa está viva ou não (se deathYear é nulo, a pessoa está viva).
- etl_date (TIMESTAMP): Data da carga dos dados.

Chaves Primárias e Estrangeiras: A chave primária é fato_id, e as chaves estrangeiras são pessoa_id, birth_tempo_id e death_tempo_id, referenciando as respectivas tabelas de dimensões.

Relacionamentos entre Tabelas

O catálogo detalhará as relações de chaves primárias e estrangeiras entre as tabelas, que são essenciais para garantir a integridade referencial e a eficiência das consultas. Alguns exemplos de relacionamentos importantes incluem:

**Relacionamento entre DIM_PESSOAS e FATO_BIOS:**

A tabela FATO_BIOS faz referência a DIM_PESSOAS por meio da chave estrangeira pessoa_id. Cada registro de evento biográfico está vinculado a uma pessoa na tabela de dimensões.

**Relacionamento entre DIM_TEMPO e FATO_BIOS:**

As tabelas de tempo de nascimento e falecimento na FATO_BIOS fazem referência à tabela DIM_TEMPO para associar eventos temporais, como o ano, década e século do nascimento e falecimento.

**Agregações e Junções:**

A documentação indicará como unir as tabelas de dimensões com a tabela de fatos para gerar análises e métricas. Por exemplo, para calcular a expectativa de vida média por década, será necessário realizar um join entre a tabela FATO_BIOS e a DIM_TEMPO, utilizando o campo birth_tempo_id.

**Acesso e Utilização do Catálogo de Dados**

O Catálogo de Dados será acessível por meio de interfaces como ferramentas de documentação colaborativa (por exemplo, Confluence ou GitHub Wiki), além de poder ser integrado diretamente nas interfaces de consulta, como o Apache Spark e as interfaces SQL. Além disso, ele será atualizado de forma contínua para refletir qualquer alteração no modelo de dados, como adição de novas tabelas ou transformação de dados, garantindo que a documentação seja sempre precisa e atualizada.

**9. A Garantia da Qualidade dos Dados**

É um componente essencial para assegurar que as informações processadas e armazenadas no sistema sejam confiáveis, consistentes e utilizáveis para análise. Para garantir que os dados atendam a padrões elevados de qualidade, será implementada uma série de práticas de validação, controle de versão, auditoria e documentação.

_Validação de Dados durante a Carga_

Antes de qualquer dado ser carregado no sistema, ele passará por um processo rigoroso de validação para garantir sua integridade e qualidade. A validação de dados é uma etapa crítica que pode ser dividida nas seguintes ações:

_Limpeza de Dados_: 

Todos os dados de entrada serão analisados em busca de valores nulos, inconsistentes ou malformados. Serão aplicados processos de limpeza, como a remoção de registros com campos obrigatórios ausentes ou com dados irrelevantes para o contexto de análise.

_Eliminação de Duplicatas:_ 

A detecção e remoção de duplicatas são essenciais para garantir a precisão das informações armazenadas. Serão utilizados algoritmos de detecção de duplicatas baseados em campos-chave como o nome e a data de nascimento, para garantir que cada pessoa ou evento biográfico seja registrado apenas uma vez.

_Verificação de Conformidade:_ 

Os dados serão validados para garantir que atendem às regras de conformidade definidas, como a verificação de intervalos válidos para anos de nascimento e falecimento (por exemplo, o ano de falecimento não pode ser anterior ao ano de nascimento).

_Automatização da Validação:_ 

Todo o processo de validação será automatizado através de scripts de ETL (Extract, Transform, Load), garantindo que qualquer dado carregado seja previamente verificado antes de ser inserido no sistema.

**Controle de Versão com Delta Lake**

O Delta Lake será a plataforma escolhida para garantir a consistência, rastreabilidade e integridade dos dados ao longo do tempo. O controle de versão, juntamente com os recursos de ACID (Atomicidade, Consistência, Isolamento, Durabilidade) transacionais, oferecerá as seguintes vantagens:

_Controle de Versão de Dados:_ O Delta Lake mantém um histórico de alterações nos dados, permitindo voltar a versões anteriores se necessário. Cada atualização, inserção ou exclusão de dados cria um novo "commit" no Delta Lake, preservando o histórico e facilitando a comparação entre versões.

_Rastreabilidade e Auditoria_: O controle de versão também proporciona uma rastreabilidade total de quem, quando e o que foi alterado nos dados. Isso será crucial para garantir a transparência no processo de manipulação de dados e para atender a qualquer auditoria futura.

_Transações ACID:_ Ao utilizar o Delta Lake, as transações ACID garantem que as operações de escrita e leitura nos dados sejam consistentes, mesmo em ambientes distribuídos, evitando a corrupção de dados durante operações de grande escala.

_Recuperação de Dados:_ Em caso de falha ou erro durante a carga de dados ou transformações, o Delta Lake facilita a recuperação dos dados, permitindo reverter para um estado anterior sem perda de integridade.


**Auditoria e Monitoramento**

Para garantir que os dados permaneçam íntegros e sem alterações indesejadas ao longo do tempo, será implementado um sistema de auditoria e monitoramento contínuo:

_Auditoria Periódica:_ Serão realizados processos regulares de auditoria para garantir que os dados carregados não tenham sido corrompidos ou alterados de maneira não autorizada. A auditoria incluirá a verificação de valores fora do esperado, inconsistências nos dados e problemas de integridade referencial entre as tabelas de dimensões e fatos.
_
Monitoramento de Integridade:_ A integridade dos dados será monitorada em tempo real através de ferramentas de monitoramento de dados. Isso incluirá a verificação de dados nulos, a validade de dados temporais (como datas de nascimento e falecimento) e a verificação da consistência entre as dimensões e as tabelas de fato.

_Alertas e Notificações_: Sistemas de alerta serão configurados para notificar os engenheiros de dados e analistas sempre que ocorrer uma anomalia no processo de carga ou qualquer violação nas regras de qualidade dos dados. Isso permite uma correção rápida e eficaz de quaisquer problemas identificados.

_Logs de Auditoria:_ Todos os processos de carga e transformação de dados serão registrados em logs detalhados, incluindo a origem dos dados, as transformações aplicadas e os responsáveis por cada etapa. Isso permitirá que qualquer discrepância seja facilmente rastreada até a origem, facilitando a resolução de problemas.

**Documentação e Registros de Alterações**

A documentação e o registro das transformações realizadas nos dados são vitais para garantir a rastreabilidade e a transparência do processo. A documentação será organizada e acessível a todos os membros da equipe e partes interessadas, e incluirá os seguintes elementos:

Histórico de Transformações: Cada transformação realizada nos dados será registrada, incluindo detalhes sobre os scripts utilizados, as tabelas afetadas e os resultados esperados. Essa documentação será acessível via ferramentas de gerenciamento de projetos, como Confluence ou GitHub, para garantir fácil acesso e rastreabilidade.

Versões de Scripts de ETL: Todos os scripts de ETL (processos de extração, transformação e carga) serão versionados e armazenados em um repositório de controle de versão, garantindo que qualquer alteração nos processos de dados seja documentada e audível.

Alterações de Modelo de Dados: Sempre que o modelo de dados sofrer alterações significativas, como a adição de novas tabelas ou alterações na estrutura de tabelas existentes, essas mudanças serão documentadas e comunicadas aos stakeholders. A documentação incluirá a descrição do motivo das mudanças e o impacto esperado no processo de análise de dados.

Definição de Políticas de Qualidade de Dados: A documentação também incluirá uma definição clara das políticas de qualidade dos dados, incluindo os critérios de aceitação para dados brutos, as regras de transformação e validação, e os procedimentos para manutenção da qualidade dos dados a longo prazo.


**Ferramentas e Tecnologia Utilizada**

Para garantir a execução eficaz das práticas descritas, serão utilizadas as seguintes ferramentas e tecnologias:

Apache Spark: Para processamento em larga escala e execução de validações e transformações dos dados.

Delta Lake: Para garantir transações ACID, controle de versão e manutenção da qualidade e integridade dos dados.

Apache Airflow ou Azure Data Factory: Para orquestrar e automatizar as cargas e transformações de dados, garantindo a execução eficiente e rastreável das pipelines de ETL.

Ferramentas de Monitoramento e Alertas: Como Datadog ou Prometheus, para monitoramento da integridade dos dados e notificação de falhas em tempo real.

Benefícios da Garantia da Qualidade dos Dados

Confiabilidade dos Dados: A validação rigorosa dos dados assegura que apenas dados corretos e completos sejam carregados e utilizados nas análises, melhorando a confiabilidade das métricas geradas.

Rastreabilidade Total: O controle de versão e a documentação detalhada das transformações e alterações garantem total rastreabilidade de qualquer dado ou transformação no sistema, facilitando auditorias e verificações de conformidade.

Prevenção de Erros: A auditoria contínua e o monitoramento em tempo real permitem detectar e corrigir problemas de forma proativa, evitando erros nos dados que possam impactar a tomada de decisões.

Transparência e Colaboração: A documentação completa e acessível sobre os dados e processos de transformação promove uma maior transparência e colaboração entre as equipes de desenvolvimento, análise e negócios.

**10. Conclusão**

Este projeto oferece uma visão abrangente e prática de todo o ciclo de engenharia de dados, cobrindo desde a ingestão inicial até a criação e otimização de um modelo dimensional no Delta Lake. Ele permite realizar análises detalhadas sobre um conjunto histórico de dados de pessoas, com foco em métricas como longevidade e profissões. Ao empregar o Delta Lake como formato de armazenamento, o projeto assegura que os dados sejam não apenas armazenados de maneira eficiente, mas também atualizados de forma incremental, sem perda de consistência. Isso garante que os dados sejam acessíveis para consultas rápidas e precisas, mesmo à medida que novas informações são integradas. O uso de Delta Lake também habilita transações ACID, o que proporciona integridade, rastreabilidade e a capacidade de fazer atualizações dinâmicas nas tabelas sem comprometer a consistência do sistema. Como resultado, o projeto oferece uma plataforma robusta e escalável para realizar análises profundas e gerar insights valiosos sobre o comportamento e características das pessoas em diferentes períodos históricos, além de garantir a qualidade e confiabilidade contínuas dos dados ao longo do tempo.