# Incremental update by partition

## Get a session

In [None]:
# Import python packages
import streamlit as st
import pandas as pd

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()


## Create a table with data to work with

In [None]:
session.create_dataframe([{
    "store": store,
    "transaction" : transaction,
    "day": day
}
    for store in ["MikeyD", "BobPants", "SuperM"]
    for transaction in range (5)
    for day in ["2024-06-01", "2024-06-02"]
]
).write.save_as_table("source", mode="overwrite")

## Set change tracking true if we want to use timestamps instead of a stream

In [None]:
alter table source SET CHANGE_TRACKING = TRUE;

## Create a stream for simplicity in the demo

In [None]:
create or replace  stream source_stream on table source SHOW_INITIAL_ROWS = TRUE APPEND_ONLY = False

## Initialize the aggregation

In [None]:
create or replace table aggregation as 
select store, day, count(transaction) as nb_transactions from source_stream group by all;

## Look at the data

In [None]:
select * from aggregation

## Delete data for the demo

In [None]:
delete from source where transaction = 2 and store = 'MikeyD'

## Show the change feed using the stream
Note that selecting doesn't advance the stream so the changed data will still be available.

In [None]:
select * from source_stream

## Identify the partitions to recalculate

In [None]:
create table changed_store_days as 
select distinct store, day from source_stream

## The stream advanced

In [None]:
select * from source_stream

## Do the aggregation for the partitions that changed

In [None]:
create or replace table staging_aggregation as 
select store, day, count(transaction) as nb_transactions from source
where exists (
select 1 from changed_store_days where source.store = changed_store_days.store and source.day = changed_store_days.day
)

group by all;

## Show the new data for the changed partitions

In [None]:
select * from staging_aggregation

## Update the aggregations by overwriting the partitions with a delete + insert in a transacton

In [None]:
begin;
delete from aggregation using (select distinct store, day from staging_aggregation) as logical_partitions_to_overwrite
where aggregation.store = logical_partitions_to_overwrite.store
and aggregation.day = logical_partitions_to_overwrite.day;
insert into aggregation select * from staging_aggregation;
commit;

## The resulting table is the same as doing the aggregation from scratch

In [None]:
select * from aggregation order by store, day

In [None]:
select store, day, count(transaction) as nb_transactions from source group by all order by store, day;