# Snowflake

1. Instalando bibliotecas e inicializando variáveis do envs
2. Conectando ao Data Warehouse Snowflake
3. Criando o banco de dados e se conectando a ele
4. Criando tabelas e populando com dados dos scripts SQL (1 a 4)
5. Criando views
6. Carregando dados externos para o Snowflake do Bucket S3
7. Consultas com TimeTravel e Fail-safe
8. Criando Tasks
9. Criando tabelas de stream

<div class="alert alert-info">
     
**Observações**
 
- Para executar o notebook, é necessário ter a conta Snowflake configurado e rodando na servidor da AWS, além de ter criado um Bucket S3 para armazenar os dados.

- O arquivo de exemplos de cargas de dados estará na pasta dados.

</div>

<div class="alert alert-danger">
     
**Nota**
 
- Este notebook está utilizando o Python 3.10.9 para ser executado, versão superiores a essa ocorre um erro na conexão do snowflake (open ssl 3.0.9).
     
</div>


In [1]:
%pip install snowflake-connector-python
%pip install python-dotenv

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
import snowflake.connector
from dotenv import load_dotenv
import os

load_dotenv()

True

## Inicializando variáveis utiilizandas no código com os valores do arquivo env

In [3]:
user = os.getenv('SNOWFLAKE_USER')
password = os.getenv('SNOWFLAKE_PASSWORD')
account = os.getenv('SNOWFLAKE_ACCOUNT')
warehouse = os.getenv('SNOWFLAKE_WAREHOUSE')
database = os.getenv('SNOWFLAKE_DATABASE')
schema = os.getenv('SNOWFLAKE_SCHEMA')

s3_bucket = os.getenv('SNOWFLAKE_S3_BUCKET')
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')

In [47]:
conn = snowflake.connector.connect(
    user=user,
    password=password,
    account=account
)

cur = conn.cursor()

## Criando o banco de dados, schema e conectando ao schema criado


In [5]:
create_database_sql = f"CREATE DATABASE IF NOT EXISTS {database}"
cur.execute(create_database_sql)

<snowflake.connector.cursor.SnowflakeCursor at 0x7f6b20fd87f0>

In [4]:
conn = snowflake.connector.connect(
    user=user,
    password=password,
    account=account,
    warehouse=warehouse,
    database=database,
    schema=schema
)

cur = conn.cursor()

## Criando tabelas e populandos com dados

### Criando tabelas no SnowFlake


In [9]:
query = """
CREATE TABLE categories (
    category_id SMALLINT NOT NULL,
    category_name STRING NOT NULL,
    description STRING
);"""

cur.execute(query)

query = """
CREATE TABLE customers (
    customer_id STRING NOT NULL,
    company_name STRING NOT NULL,
    contact_name STRING,
    contact_title STRING,
    address STRING,
    city STRING,
    region STRING,
    postal_code STRING,
    country STRING,
    phone STRING
);
"""

cur.execute(query)

query = """
CREATE TABLE employees (
    employee_id SMALLINT NOT NULL,
    last_name STRING NOT NULL,
    first_name STRING NOT NULL,
    title STRING,
    title_of_courtesy STRING,
    birth_date DATE,
    hire_date DATE,
    address STRING,
    city STRING,
    region STRING,
    postal_code STRING,
    country STRING,
    home_phone STRING,
    extension STRING,
    notes STRING,
    reports_to SMALLINT,
    photo_path STRING,
    salary FLOAT
);
"""

cur.execute(query)

query = """
CREATE TABLE order_details (
    order_id SMALLINT NOT NULL,
    product_id SMALLINT NOT NULL,
    unit_price FLOAT NOT NULL,
    quantity SMALLINT NOT NULL,
    discount FLOAT NOT NULL
);
"""

cur.execute(query)

<snowflake.connector.cursor.SnowflakeCursor at 0x7f6b7e441420>

In [10]:
query = """
CREATE TABLE orders (
    order_id SMALLINT NOT NULL,
    customer_id STRING,
    employee_id SMALLINT,
    order_date DATE ,
    required_date DATE,
    shipped_date DATE,
    ship_via SMALLINT,
    freight FLOAT,
    ship_name STRING,
    ship_address STRING,
    ship_city STRING,
    ship_region STRING,
    ship_postal_code STRING,
    ship_country STRING
)
CLUSTER BY (order_date);
"""

cur.execute(query)

query = """
CREATE TABLE products (
    product_id SMALLINT NOT NULL,
    product_name STRING NOT NULL,
    supplier_id SMALLINT,
    category_id SMALLINT,
    quantity_per_unit STRING,
    unit_price FLOAT,
    units_in_stock SMALLINT,
    units_on_order SMALLINT,
    reorder_level SMALLINT,
    discontinued BOOLEAN NOT NULL
);
"""

cur.execute(query)

query = """
CREATE TABLE shippers (
    shipper_id SMALLINT NOT NULL,
    company_name STRING NOT NULL,
    phone STRING
);
"""

cur.execute(query)

query = """
CREATE TABLE suppliers (
    supplier_id SMALLINT NOT NULL,
    company_name STRING NOT NULL,
    contact_name STRING,
    contact_title STRING,
    address STRING,
    city STRING,
    region STRING,
    postal_code STRING,
    country STRING,
    phone STRING,
    fax STRING,
    homepage STRING
);
"""

cur.execute(query)

<snowflake.connector.cursor.SnowflakeCursor at 0x7f6b7e441420>

In [11]:
cur.execute(
    """SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE TABLE_SCHEMA = 'PUBLIC';""")

tables = cur.fetchall()

for table in tables:
    print(table)

('PUBLIC', 'CUSTOMERS')
('PUBLIC', 'ORDERS')
('PUBLIC', 'PRODUCTS')
('PUBLIC', 'SHIPPERS')
('PUBLIC', 'CATEGORIES')
('PUBLIC', 'EMPLOYEES')
('PUBLIC', 'SUPPLIERS')
('PUBLIC', 'ORDER_DETAILS')


### Carregar os dados das tabelas no SnowFlake através dos scripts SQL (1 a 4)


In [25]:
with open('1.categories_shippers_suppliers.sql', 'r') as file:
    query1 = file.read()

with open('2.customers_employees_products.sql', 'r') as file:
    query2 = file.read()

with open('3.orders.sql', 'r') as file:
    query3 = file.read()

with open('4.orderdetails.sql', 'r') as file:
    query4 = file.read()

cur.execute(query1, num_statements=3)
cur.execute(query2, num_statements=3)
cur.execute(query3)
cur.execute(query4)

<snowflake.connector.cursor.SnowflakeCursor at 0x7f6b7e441420>

## Views


In [8]:
cur.execute("""
create view sales_by_category as
select 
cat.category_name,
sum(od.unit_price * od.quantity) as total_vendas
from order_details od 
join products p on od.product_id = p.product_id
join categories cat on p.category_id = cat.category_id
group by cat.category_name
""")

<snowflake.connector.cursor.SnowflakeCursor at 0x7fe99bbc6b60>

In [9]:
cur.execute("""select * from sales_by_category""")

tables = cur.fetchall()

for table in tables:
    print(table)

('Dairy Products', 251330.5)
('Grains/Cereals', 100726.8)
('Produce', 105268.6)
('Seafood', 141623.09)
('Condiments', 113694.75)
('Confections', 177099.1)
('Beverages', 286526.95)
('Meat/Poultry', 178188.80000000002)


## Produtos mais vendidos por categoria

Liste os 3 produtos mais vendidos por categoria.


In [10]:
cur.execute("""
with sales_data as (
    select
    p.product_name,
    c.category_name,
    sum(od.quantity),
    rank() over (partition by c.category_name order by sum(od.quantity) desc) as rank
    from order_details od
    join products p on p.product_id = od.product_id
    join categories c on c.category_id = p.category_id
    group by p.product_name, category_name
)
select * from sales_data
where rank <= 3
order by category_name, rank
""")

tables = cur.fetchall()

for table in tables:
    print(table)

('Rhnbru Klosterbier', 'Beverages', 1155, 1)
('Guaran Fantstica', 'Beverages', 1125, 2)
('Chang', 'Beverages', 1057, 3)
('Original Frankfurter grne Soe', 'Condiments', 791, 1)
('Louisiana Fiery Hot Pepper Sauce', 'Condiments', 745, 2)
('Sirop drable', 'Condiments', 603, 3)
('Pavlova', 'Confections', 1158, 1)
('Tarte au sucre', 'Confections', 1083, 2)
('Sir Rodneys Scones', 'Confections', 1016, 3)
('Camembert Pierrot', 'Dairy Products', 1577, 1)
('Raclette Courdavault', 'Dairy Products', 1496, 2)
('Gorgonzola Telino', 'Dairy Products', 1397, 3)
('Gnocchi di nonna Alice', 'Grains/Cereals', 1263, 1)
('Wimmers gute Semmelkndel', 'Grains/Cereals', 740, 2)
('Singaporean Hokkien Fried Mee', 'Grains/Cereals', 697, 3)
('Alice Mutton', 'Meat/Poultry', 978, 1)
('Pt chinois', 'Meat/Poultry', 903, 2)
('Tourtire', 'Meat/Poultry', 755, 3)
('Manjimup Dried Apples', 'Produce', 886, 1)
('Uncle Bobs Organic Dried Pears', 'Produce', 763, 2)
('Rssle Sauerkraut', 'Produce', 640, 3)
('Boston Crab Meat', 'Sea

## Total e média de vendas por mês


In [11]:
cur.execute("""
select 
extract(year from o.order_date) as ano,
extract(month from o.order_date) as mes,
sum(od.unit_price * od.quantity) as total,
avg(od.unit_price * od.quantity) as media
from order_details od
join orders o on od.order_id = o.order_id
group by ano, mes
order by ano, mes
""")

tables = cur.fetchall()

for table in tables:
    print(table)

(2020, 7, 30192.1, 511.73050847457625)
(2020, 8, 26609.399999999998, 385.64347826086953)
(2020, 9, 27636.0, 484.8421052631579)
(2020, 10, 41203.6, 564.4328767123287)
(2020, 11, 49704.0, 753.0909090909091)
(2020, 12, 50953.4, 629.0543209876544)
(2021, 1, 66692.8, 784.6211764705882)
(2021, 2, 41207.2, 521.6101265822784)
(2021, 3, 39979.9, 519.2194805194805)
(2021, 4, 55699.39, 687.6467901234568)
(2021, 5, 56823.7, 591.9135416666667)
(2021, 6, 39088.0, 514.3157894736842)
(2021, 7, 55464.93, 720.3237662337663)
(2021, 8, 49981.69, 595.0201190476191)
(2021, 9, 59733.020000000004, 628.7686315789474)
(2021, 10, 70328.5, 663.4764150943396)
(2021, 11, 45913.36, 515.8804494382023)
(2021, 12, 77476.26, 679.6163157894737)
(2022, 1, 100854.72, 663.5178947368421)
(2022, 2, 104561.95, 857.0651639344262)
(2022, 3, 109825.45, 616.9969101123595)
(2022, 4, 134630.56, 747.9475555555556)
(2022, 5, 19898.66, 337.2654237288136)


## Carga de dados externos para o Snowflake do Bucket S3

Importação de dados de um arquivo parquet.


In [15]:
cur.execute("""
CREATE TABLE employeespq (
    employeeid SMALLINT,
    lastname STRING,
    firstname STRING,
    title STRING,
    titleofcourtesy STRING,
    birthdate STRING,
    hiredate STRING,
    address STRING,
    city STRING,
    region STRING,
    postalcode STRING,
    country STRING,
    homephone STRING,
    extension STRING,
    notes STRING,
    reportsto SMALLINT,
    photopath STRING,
    salary FLOAT
)""")

<snowflake.connector.cursor.SnowflakeCursor at 0x7fe93da2c5e0>

In [17]:
cur.execute(f"""
COPY INTO employeespq
FROM {s3_bucket}redshift/employees.parquet
CREDENTIALS=(aws_key_id='{aws_access_key_id}' aws_secret_key='{aws_secret_access_key}')
FILE_FORMAT=(type='PARQUET')
MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE""")

<snowflake.connector.cursor.SnowflakeCursor at 0x7fe93da2c5e0>

In [18]:
cur.execute("""select * from employeespq""")

tables = cur.fetchall()

for table in tables:
    print(table)

(1, 'Davolio', 'Nancy', 'Sales Representative', 'Ms.', '-664761600000000', '704678400000000', '507 - 20th Ave. E.Apt. 2A', 'Seattle', 'WA', '98122', 'USA', '(206) 555-9857', '5467', 'Education includes a BA in psychology from Colorado State University in 1970.  She also completed "The Art of the Cold Call."  Nancy is a member of Toastmasters International.', 2, 'http://accweb/emmployees/davolio.bmp', 2954.55)
(2, 'Fuller', 'Andrew', 'Vice President, Sales', 'Dr.', '-563846400000000', '713750400000000', '908 W. Capital Way', 'Tacoma', 'WA', '98401', 'USA', '(206) 555-9482', '3457', 'Andrew received his BTS commercial in 1974 and a Ph.D. in international marketing from the University of Dallas in 1981.  He is fluent in French and Italian and reads German.  He joined the company as a sales representative, was promoted to sales manager in January 1992 and to vice president of sales in March 1993.  Andrew is a member of the Sales Management Roundtable, the Seattle Chamber of Commerce, and t

## TimeTravel e Fail-safe

Inserindo dados na tabela e consultando com 90 segundos atrás, para verificar que os dados nesse tempo ainda não estavam no banco.

Consultando dados por datas anteriores para recuperar dados perdidos (recurso de Fail-safe do Snowflake).


In [19]:
cur.execute("""select * from shippers""")

tables = cur.fetchall()

for table in tables:
    print(table)

(1, 'Speedy Express', '(503) 555-9831')
(2, 'United Package', '(503) 555-3199')
(3, 'Federal Shipping', '(503) 555-9931')


In [20]:
cur.execute(
    f"""insert into shippers values (4, 'SnowFlake Express', '(51) 4545-89745')""")

<snowflake.connector.cursor.SnowflakeCursor at 0x7fe93da2c5e0>

In [22]:
cur.execute("""select * from shippers at (offset => -90)""")

tables = cur.fetchall()

for table in tables:
    print(table)

(1, 'Speedy Express', '(503) 555-9831')
(2, 'United Package', '(503) 555-3199')
(3, 'Federal Shipping', '(503) 555-9931')


In [50]:
cur.execute("""select current_timestamp();""")

tables = cur.fetchall()

for table in tables:
    print(table)

(datetime.datetime(2023, 10, 18, 3, 25, 14, 848000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>),)


In [54]:
cur.execute(
    """select * from shippers before (timestamp => '2023-10-18 3:24:47.142 -0700'::timestamp)""")

tables = cur.fetchall()

for table in tables:
    print(table)

(1, 'Speedy Express', '(503) 555-9831')
(2, 'United Package', '(503) 555-3199')
(3, 'Federal Shipping', '(503) 555-9931')
(4, 'SnowFlake Express', '(51) 4545-89745')


## Task e Tasks agrupadas

### Task única


Listando as tasks já criadas no banco


In [30]:
cur.execute("""show tasks""")

tables = cur.fetchall()

for table in tables:
    print(table)

(datetime.datetime(2023, 10, 21, 10, 49, 53, 393000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), 'INSERT_ID', '01afcd0d-ceb1-ea21-0000-000000000012', 'NORTHWIND2', 'PUBLIC', 'ACCOUNTADMIN', '', 'WAREHOUSEDEV', '1 MINUTE', '[]', 'started', 'INSERT INTO IDS SELECT random()', None, 'false', 'null', datetime.datetime(2023, 10, 21, 10, 49, 53, 577000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), None, 'ROLE', None, None)


Criando uma tabela simples, e uma task para inserir dados nessa tabela a cada 1 minuto.

A task é inserida no banco com status 'suspend', para iniciar a task deve ser alterado o status dela para 'resume'.


In [24]:
cur.execute("""create or replace table ids (my_id int)""")

cur.execute("""
create or replace task insert_id 
warehouse = WAREHOUSEDEV
SCHEDULE = '1 MINUTE' 
AS INSERT INTO IDS SELECT random()""")

cur.execute("""alter task insert_id resume""")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f545ac7f3d0>

Visualizando os dados inseridos na tabela pela task.

In [31]:
cur.execute("""select * from ids;""")

tables = cur.fetchall()

for table in tables:
    print(table)

(-1289401867754936316,)


### Task agrupada

Tasks agrupadas são task que dependem de outras tasks para serem executadas, ou seja, uma task só será executada quando a task anterior tiver sido executada.

Criando uma tabela simples, e uma task agrupada para inserir dados nessa tabela a cada 1 minuto, a task só será executada quando a task insert_id for executada.

Para criar uma task agrupada, a task que será executada primeiro deve estar no estado de suspend para permitir alterações, então antes de vincular ela a outra task é necessário alterar o status dela.


In [35]:
cur.execute("""alter task insert_id suspend""")

cur.execute("""create or replace table ids2 (my_id int)""")

cur.execute("""
create or replace task insert_id2 
warehouse = WAREHOUSEDEV
after insert_id AS INSERT INTO IDS2 SELECT random()""")

cur.execute("""alter task insert_id2 resume""")
cur.execute("""alter task insert_id resume""")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f545ac7f3d0>

In [40]:
cur.execute("""show tasks""")

tables = cur.fetchall()

for table in tables:
    print(table)

(datetime.datetime(2023, 10, 21, 10, 49, 53, 393000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), 'INSERT_ID', '01afcd0d-ceb1-ea21-0000-000000000012', 'NORTHWIND2', 'PUBLIC', 'ACCOUNTADMIN', '', 'WAREHOUSEDEV', '1 MINUTE', '[]', 'started', 'INSERT INTO IDS SELECT random()', None, 'false', 'null', datetime.datetime(2023, 10, 21, 10, 51, 56, 91000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), datetime.datetime(2023, 10, 21, 10, 51, 33, 567000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), 'ROLE', None, None)
(datetime.datetime(2023, 10, 21, 10, 51, 55, 818000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), 'INSERT_ID2', '01afcd0f-4e2c-dacc-0000-000000000019', 'NORTHWIND2', 'PUBLIC', 'ACCOUNTADMIN', '', 'WAREHOUSEDEV', None, '[\n  "NORTHWIND2.PUBLIC.INSERT_ID"\n]', 'started', 'INSERT INTO IDS2 SELECT random()', None, 'null', 'null', None, None, 'ROLE', None, None)


In [41]:
cur.execute("""select * from ids2""")

tables = cur.fetchall()

for table in tables:
    print(table)

(-7091646375063312208,)


Alterando o status das stasks para 'suspend' e excluindo as tasks.

In [43]:
cur.execute("""alter task insert_id suspend""")
cur.execute("""alter task insert_id2 suspend""")

cur.execute("""drop task insert_id""")
cur.execute("""drop task insert_id2""")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f545ac7f3d0>

## Stream e tabela de logs

Será criada uma tabela de logs para armazenar os logs de alterações de uma tabela, e um stream para capturar esses logs e inserir na tabela de logs.

Os stream serão feitos de maneira manual, mas em um ambiente de produção o mais correto seria criar uma task que executaria o stream de tempos em tempos para capturar os dados e colocar na tabela de logs.


In [44]:
cur.execute("""
CREATE TABLE shippers_log (
    shipper_id SMALLINT NOT NULL,
    company_name STRING NOT NULL,
    phone STRING
);""")

cur.execute("""create stream shipper_stream on table shippers""")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f545ac7f3d0>

Realizando operações de insert, update e delete na tabela shippers para executar as operações do stream e gravar os dados da tabela de logs.

In [51]:
cur.execute("""insert into shippers values (5, 'XYZ Express', '77-7777-7777')""")
cur.execute("""insert into shippers_log
select shipper_id, company_name, phone from shipper_stream
where metadata$action = 'INSERT' and metadata$isupdate = false""")


<snowflake.connector.cursor.SnowflakeCursor at 0x7f545ac7f3d0>

In [52]:
cur.execute("""update shippers set company_name = 'XYZW Express' where shipper_id = 5""")
cur.execute("""update shippers_log sl
set sl.company_name = s.company_name,
sl.phone = s.phone
from shipper_stream s
where s.shipper_id = sl.shipper_id
and s.metadata$action = 'INSERT' and s.metadata$isupdate = true""")


<snowflake.connector.cursor.SnowflakeCursor at 0x7f545ac7f3d0>

Verificando tabela de logs onde os stream foram gravados.

In [53]:
cur.execute("""select * from shippers_log""")

tables = cur.fetchall()

for table in tables:
    print(table)

(5, 'XYZW Express', '77-7777-7777')


In [54]:
cur.execute("""delete from shippers where shipper_id = 5""")
cur.execute("""delete from shippers_log
where shippers_log.shipper_id in (select shipper_id from shipper_stream where
shipper_stream.metadata$action = 'DELETE' and shipper_stream.metadata$isupdate = false)""")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f545ac7f3d0>