# <h1 align="center"><font color="gree">Building a Data Pipeline in Databricks and using Spark</font></h1>
---

<font color="pink">Senior Data Scientist.: Dr. Eddy Giusepe Chirinos Isidro</font>

Link de estudo:

* [Databricks Caderno de anota√ß√µes](https://docs.databricks.com/aws/pt/notebooks/)

# <font color="red">üöÄ Configura√ß√£o do Databricks Connect</font>

Este notebook est√° configurado para usar **Databricks Connect**, permitindo executar c√≥digo localmente mas processando dados no `cluster Databricks`.

## <font color="blue">üîê Informa√ß√µes necess√°rias</font>

1. `Server hostname`: URL do seu workspace Databricks
2. `Personal Access Token`: Token de acesso (User Settings ‚Üí Developer ‚Üí Access Tokens)
3. `Cluster ID`: ID do cluster ativo (copie da URL quando abrir um cluster)

## <font color="blue">Como configurar?</font>

* `Op√ß√£o A`: Voc√™ pode criar vari√°veis de ambiente:

```bash
export DATABRICKS_HOST="https://seu-workspace.cloud.databricks.com"
export DATABRICKS_TOKEN="dapi-seu-token-aqui"  
export DATABRICKS_CLUSTER_ID="cluster-id-aqui"
```

* `Op√ß√£o B`: Crie um arquivo `.env` na pasta do projeto:

```bash
DATABRICKS_HOST=https://seu-workspace.cloud.databricks.com
DATABRICKS_TOKEN=dapi-seu-token-aqui
DATABRICKS_CLUSTER_ID=cluster-id-aqui
```

**‚ö†Ô∏è IMPORTANTE**: Nunca fa√ßa commit de tokens/credenciais para Git!

# <font color="red">üìã COMO OBTER AS CREDENCIAIS DO DATABRICKS</font>

## <font color="blue">üîç 1. Server Hostname</font>

- Va para seu workspace Databricks

- Copie a URL do navegador (ex: `https://dbc-a1b2c3d4-e5f6.cloud.databricks.com`)


Voc√™ pode copiar a partir do browser s√≥ ate `...com`.

## <font color="blue">üîë 2. Personal Access Token</font>

1. No Databricks workspace ‚Üí clique no seu **avatar** (canto superior direito)

2. **User Settings**

3. **Developer** (no menu lateral)

4. **Access Tokens**

5. **Generate New Token**

6. D√™ um nome (ex: "Local Development")

7. Defina expira√ß√£o (recomendo 90 dias)

8. **Generate** ‚Üí copie o token (come√ßa com `dapi-...`)

## <font color="blue">üíª 3. Cluster ID</font>

1. No Databricks workspace ‚Üí **Compute** (menu lateral)

2. Clique no cluster que deseja usar

3. Copie o **Cluster ID** da URL ou das configura√ß√µes do cluster


`NOTA`:

Eu s√≥ consegui obter o ID do meu Cluster executando o seguinte comando na pr√≥pria c√©lula do Databricks:

```python
databricks_cluster_id = spark.conf.get("spark.databricks.clusterUsageTags.clusterId")

print(f"Databricks Cluster ID: {databricks_cluster_id}")
```

# <font color="red">ETL Pipeline com dados armazenados no meu Volume Databricks</font>

In [1]:
import os
from dotenv import load_dotenv, find_dotenv


_ = load_dotenv(find_dotenv())

databricks_host = os.environ['DATABRICKS_HOST']
databricks_token = os.environ['DATABRICKS_TOKEN']
databricks_cluster_id = os.environ['DATABRICKS_CLUSTER_ID']

print("üîó Carregado minhas credenciais do Databricks, com sucesso!")

üîó Carregado minhas credenciais do Databricks, com sucesso!


A seguir vamos usar `Databricks Connect` para conectar ao Databricks e executar c√≥digo `Spark` localmente, mas processando os dados remotamente no cluster Databricks

In [2]:
from databricks.connect import DatabricksSession # Esta classe √© usada para conectar ao Databricks remotamente

spark = DatabricksSession.builder.remote(host=databricks_host, token=databricks_token, cluster_id=databricks_cluster_id).getOrCreate()


print("‚úÖ Conectado ao Databricks com sucesso!")
print(f"üîß Vers√£o Spark: {spark.version}")

‚úÖ Conectado ao Databricks com sucesso!
üîß Vers√£o Spark: 4.0.0


A seguir vamos carregar dados do `Volume Databricks`. Basicamente, vou carregar esses dados a partir do `Catalog` do meu workspace Databricks.

In [3]:
volume_path = "/Volumes/workspace/default_eddy/volumeeddy-tmp-sampledata/sample_data.csv"
print(f"\nüìÇ Carregando dados do Volume: {volume_path}")

df_spark = spark.read.csv(volume_path, header=True, inferSchema=True)


üìÇ Carregando dados do Volume: /Volumes/workspace/default_eddy/volumeeddy-tmp-sampledata/sample_data.csv


In [4]:
print("üé≤ DADOS DO VOLUME CARREGADOS COM SUCESSO!")
print("\nüìä Primeiros registros:")
df_spark.show()

üé≤ DADOS DO VOLUME CARREGADOS COM SUCESSO!

üìä Primeiros registros:
+-------+---+-----------+
|   Name|Age|       City|
+-------+---+-----------+
|  Alice| 25|   New York|
|    Bob| 17|Los Angeles|
|Charlie| 35|    Chicago|
|  Diana| 16|    Houston|
| Edward| 45|    Phoenix|
+-------+---+-----------+



In [6]:
print(f"üìà Informa√ß√µes do Dataset que tenho no meu Volume Databricks:\n")
print(f"   ‚Ä¢ Quantidade de linhas: {df_spark.count()}")
print(f"   ‚Ä¢ Quantidade de colunas: {len(df_spark.columns)}")
print(f"   ‚Ä¢ Nomes das colunas: {df_spark.columns}")

üìà Informa√ß√µes do Dataset que tenho no meu Volume Databricks:

   ‚Ä¢ Quantidade de linhas: 5
   ‚Ä¢ Quantidade de colunas: 3
   ‚Ä¢ Nomes das colunas: ['Name', 'Age', 'City']


In [7]:
print("üìã Schema do Dataset:\n")

df_spark.printSchema()

üìã Schema do Dataset:

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- City: string (nullable = true)



<font color="orange">A seguir vamos converter para `Pandas` para an√°lises adicionais (`opcional`).</font>

In [9]:
print(f"‚úÖ Dados tamb√©m dispon√≠veis como Pandas DataFrame para an√°lises locais!\n")

df_pandas = df_spark.toPandas()

df_pandas.head()

‚úÖ Dados tamb√©m dispon√≠veis como Pandas DataFrame para an√°lises locais!



Unnamed: 0,Name,Age,City
0,Alice,25,New York
1,Bob,17,Los Angeles
2,Charlie,35,Chicago
3,Diana,16,Houston
4,Edward,45,Phoenix


## <font color="gree">üìä An√°lise avan√ßada dos dados do Volume Databricks</font>

### <font color="yellow">Estat√≠sticas descritivas com Spark</font>

In [11]:
df_spark.describe().show()

+-------+------+------------------+-------+
|summary|  Name|               Age|   City|
+-------+------+------------------+-------+
|  count|     5|                 5|      5|
|   mean|  NULL|              27.6|   NULL|
| stddev|  NULL|12.361229712289955|   NULL|
|    min| Alice|                16|Chicago|
|    max|Edward|                45|Phoenix|
+-------+------+------------------+-------+



### <font color="yellow">An√°lises com Spark SQL</font>

In [12]:
# Registrar como view tempor√°ria para usar SQL:
df_spark.createOrReplaceTempView("pessoas") # Nome da view tempor√°ria

In [16]:
# A minha view tempor√°ria:
spark.sql("SELECT * FROM pessoas").show()

+-------+---+-----------+
|   Name|Age|       City|
+-------+---+-----------+
|  Alice| 25|   New York|
|    Bob| 17|Los Angeles|
|Charlie| 35|    Chicago|
|  Diana| 16|    Houston|
| Edward| 45|    Phoenix|
+-------+---+-----------+



OBS:

```sql
spark.sql("SELECT * FROM pessoas")           # Nome da tabela criada na view tempor√°ria
```

In [13]:
# 1. An√°lise de idade
print("üë• An√°lise de Idade:\n")

idade_stats = spark.sql("""
    SELECT 
        AVG(Age) as idade_media,
        MIN(Age) as idade_minima,
        MAX(Age) as idade_maxima,
        COUNT(*) as total_pessoas
    FROM pessoas
""")

idade_stats.show()

üë• An√°lise de Idade:

+-----------+------------+------------+-------------+
|idade_media|idade_minima|idade_maxima|total_pessoas|
+-----------+------------+------------+-------------+
|       27.6|          16|          45|            5|
+-----------+------------+------------+-------------+



In [14]:
# 2. Maiores de idade
print("üîû Pessoas maiores de idade:\n")

adults = spark.sql("SELECT * FROM pessoas WHERE Age >= 18")
adults.show()

üîû Pessoas maiores de idade:

+-------+---+--------+
|   Name|Age|    City|
+-------+---+--------+
|  Alice| 25|New York|
|Charlie| 35| Chicago|
| Edward| 45| Phoenix|
+-------+---+--------+



In [18]:
adult_count = adults.count()
total_count = df_spark.count()

print(f"{adult_count} de {total_count} pessoas s√£o maiores de idade ({adult_count/total_count*100:.1f}%)")


3 de 5 pessoas s√£o maiores de idade (60.0%)


In [None]:
# 3. An√°lise por cidade
print("üèôÔ∏è Contagem por Cidade:\n")

city_analysis = spark.sql("""
    SELECT City, COUNT(*) as quantidade
    FROM pessoas 
    GROUP BY City 
    ORDER BY quantidade DESC
""")
city_analysis.show()

In [None]:


# 4. An√°lises avan√ßadas com Window Functions
print("\nüìà An√°lises Avan√ßadas:")
advanced_analysis = spark.sql("""
    SELECT 
        Name,
        Age,
        City,
        CASE 
            WHEN Age >= 18 THEN 'Adulto'
            ELSE 'Menor de idade'
        END as categoria_idade,
        ROW_NUMBER() OVER (PARTITION BY City ORDER BY Age DESC) as ranking_idade_cidade
    FROM pessoas
    ORDER BY City, Age DESC
""")
advanced_analysis.show()

# Converter para Pandas para an√°lises complementares
print("\nüêº An√°lises complementares com Pandas:")
df_pandas = df_spark.toPandas()
print(f"   ‚Ä¢ Dataset convertido: {df_pandas.shape[0]} linhas, {df_pandas.shape[1]} colunas")
print(f"   ‚Ä¢ Idade m√©dia: {df_pandas['Age'].mean():.2f} anos")
print(f"   ‚Ä¢ Cidades √∫nicas: {df_pandas['City'].nunique()}")

print("\n‚úÖ Pipeline ETL com Databricks Volume conclu√≠do com sucesso!")
print("   ‚Ä¢ üå©Ô∏è Processamento: Cluster Databricks")
print("   ‚Ä¢ üìÅ Fonte: Volume Databricks")
print(f"   ‚Ä¢ üìä Registros: {df_spark.count()}")
print(f"   ‚Ä¢ üöÄ Engine: Spark {spark.version}")
print("   ‚Ä¢ üíæ Dispon√≠vel em: Spark DataFrame + Pandas DataFrame")
