
## [PRÉ REQUISITO] Atividades Desenvolvidas Fora do Ambiente Databricks

|Sequência|Ação|Detalhamento
|---|---|---|
|SEQ-01|Provisionamento do Azure SQL|Provisionamento de um banco de dados no Azure|
|SEQ-02|Configuração do Ambiente Azure|Criação de toda a estrutura de tabelas e dados|


## Atividades Desenvolvidas no Notebook - 01_ingestao_dados_azure_sql

|Sequência|Ação|Detalhamento
|---|---|---|
|SEQ-01 / SEQ-02|Configuração de Biblioteca|Instalação da Biblioteca "sqlalchemy"|
|SEQ-03|Consumindo Arquivo JSON|Arquivo com as credenciais de acesso ao Azure SQL Database|
|SEQ-04|Selação das Tabelas|Identificar quais serão as tabelas usadas durante o processo de ingestão|
|SEQ-05|Extração dos Dados do Azure SQL|Coleta dos Dados do Azure SQL|
|SEQ-06|Persistir os Dados em Parquet|Os dados deverão ser persistidos no diretório de cada tabela em formato parquet.|



In [0]:
pip install sqlalchemy

Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
from sqlalchemy import __version__ as sa_version, create_engine, text
import json
import pandas as pd



In [0]:
dfjson =  pd.read_json("https://raw.githubusercontent.com/dbaassists/Projeto_BI_Zero_TO_DW/main/04_ARQUIVO_CONFIG/config_azure_sql.json")

server = dfjson['Config']['server']
database = dfjson['Config']['database']
username = dfjson['Config']['username']
password = dfjson['Config']['password']

In [0]:
df = spark.read \
.format("jdbc") \
.option("url", f"jdbc:sqlserver://{server};database={database}") \
.option("query", """SELECT s.name + '.' +  t.NAME AS Nome_Tabela 
                        FROM sys.tables t
                        INNER JOIN sys.schemas s
                        ON t.schema_id = s.schema_id
                        AND s.name = 'dbo'""") \
.option("user", f"{username}") \
.option("password", f"{password}") \
.load()

In [0]:
display(df)

Nome_Tabela
dbo.TB_CATEGORIA_PRODUTO
dbo.TB_CLIENTE
dbo.TB_FORMA_PAGAMENTO
dbo.TB_ITEM_VENDA
dbo.TB_LOJA
dbo.TB_PRODUTO
dbo.TB_VENDA
dbo.TB_VENDEDOR
dbo.Tempo


In [0]:
lista_tabelas = df.collect()

for tabela in lista_tabelas:

    tabela_paquet = tabela['Nome_Tabela'].replace('dbo.','').lower()

    print(tabela_paquet)

    diretorio_parquet = "dbfs:/FileStore/tables/landing_zone/{0}".format(tabela_paquet)

    df = (spark.read
    .format("jdbc")
    .option("url", f"jdbc:sqlserver://{server};database={database}") \
    .option("query", "SELECT * FROM {0}".format(tabela['Nome_Tabela']))
    .option("user", f"{username}") \
    .option("password", f"{password}") \
    .load()
    )
    
    (df.write
    .format('parquet')
    .mode("overwrite")
    .save(diretorio_parquet)
    )

tb_categoria_produto
tb_cliente
tb_forma_pagamento
tb_item_venda
tb_loja
tb_produto
tb_venda
tb_vendedor
tempo


In [0]:
dbutils.fs.ls("dbfs:/FileStore/tables/landing_zone")

Out[9]: [FileInfo(path='dbfs:/FileStore/tables/landing_zone/tb_categoria_produto/', name='tb_categoria_produto/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/landing_zone/tb_cliente/', name='tb_cliente/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/landing_zone/tb_forma_pagamento/', name='tb_forma_pagamento/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/landing_zone/tb_item_venda/', name='tb_item_venda/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/landing_zone/tb_loja/', name='tb_loja/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/landing_zone/tb_produto/', name='tb_produto/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/landing_zone/tb_venda/', name='tb_venda/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/landing_zone/tb_vendedor/', name='tb_vendedor/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables