**Napomena:** SQL upite i skripte možete sačuvati kao .sql fajlove i izvršavati ih u DBeaver SQL editoru.

### 1. Kreiranje izvora podataka

Napravite source koji će čitati poruke sa Kafka topica "user_events" iz prethodne vježbe.

In [None]:
CREATE SOURCE IF NOT EXISTS user_events_source (
    timestamp TIMESTAMP,
    user_id INT,
    action VARCHAR,
    bank VARCHAR,
    transaction_type VARCHAR,
    amount REAL,
    message_key VARCHAR
)
WITH (
   connector='kafka',
   topic='user_events',
   properties.bootstrap.server='redpanda-0:9092'
) FORMAT PLAIN ENCODE JSON;

### 2. Kreiranje materijalizovanih pogleda

Kreirajte materijalizovane poglede (materialized views) na osnovu podataka u user_events_source, tako da svaki tip poruka ima odvojen pogled. Jedan za activity tip poruka i jedan za transactions tip poruka.

In [None]:
CREATE MATERIALIZED VIEW activity_data AS
SELECT timestamp, user_id, action, bank
FROM user_events_source
WHERE message_key = 'activity';

CREATE MATERIALIZED VIEW transaction_data AS
SELECT timestamp, user_id, transaction_type, amount
FROM user_events_source
WHERE message_key = 'transaction';

### 3. Agregiranje podataka po korisniku

Nakon što ste kreirali poglede, potrebno je da obavite agregaciju podataka iz activity_data i transaction_data. Kreirajte sljedeće materijalizovane poglede:

3.1. Broj akcija po korisniku (user_activity_count)  
Kreirati materijalizovani pogled koji prikazuje ukupan broj akcija za svakog korisnika, bez obzira na tip akcije.


In [None]:
CREATE MATERIALIZED VIEW user_activity_count AS
SELECT user_id, COUNT(*) AS total_actions
FROM activity_data
GROUP BY user_id;

3.2. Agregiranje aktivnosti korisnika (user_activity_summary)  
Kreirati materijalizovani pogled koji prikazuje statistiku aktivnosti po korisniku. Za svakog korisnika potrebno je izračunati:  
- Broj uzimanja proizvoda (`take_product`)  
- Broj vraćanja proizvoda (`leave_product`)  
- ... 

In [None]:
CREATE MATERIALIZED VIEW user_activity_summary AS
SELECT user_id,
       COUNT(CASE WHEN action = 'take_product' THEN 1 END) AS take_product_count,
       COUNT(CASE WHEN action = 'leave_product' THEN 1 END) AS leave_product_count
FROM activity_data
GROUP BY user_id;

In [None]:
CREATE MATERIALIZED VIEW user_activity_summary AS
SELECT user_id,
       COUNT(*) FILTER (WHERE action = 'take_product') AS take_product_count,
       COUNT(*) FILTER (WHERE action = 'leave_product') AS leave_product_count
FROM activity_data
GROUP BY user_id;

3.3. Pregled transakcija po korisniku (user_transaction_summary)  
Kreirati materijalizovani pogled koji sumira finansijske aktivnosti korisnika. Za svakog korisnika potrebno je izračunati:  
- Broj i ukupan iznos depozita  
- Broj i ukupan iznos povlačenja sredstava
- ... 

In [None]:
CREATE MATERIALIZED VIEW user_transaction_summary AS
SELECT user_id,
       COUNT(CASE WHEN transaction_type = 'deposit' THEN 1 END) AS deposit_count,
       SUM(CASE WHEN transaction_type = 'deposit' THEN amount ELSE 0 END) AS deposit_amount,

       COUNT(CASE WHEN transaction_type = 'withdrawal' THEN 1 END) AS withdrawal_count,
       SUM(CASE WHEN transaction_type = 'withdrawal' THEN amount ELSE 0 END) AS withdrawal_amount,

       COUNT(CASE WHEN transaction_type = 'transfer' THEN 1 END) AS transfer_count,
       SUM(CASE WHEN transaction_type = 'transfer' THEN amount ELSE 0 END) AS transfer_amount,
       
       COUNT(CASE WHEN transaction_type = 'payment' THEN 1 END) AS payment_count,
       SUM(CASE WHEN transaction_type = 'payment' THEN amount ELSE 0 END) AS payment_amount
FROM transaction_data
GROUP BY user_id;

In [None]:
CREATE MATERIALIZED VIEW user_transaction_summary AS
SELECT user_id,
       COUNT(*) FILTER (WHERE transaction_type = 'deposit') AS deposit_count,
       SUM(amount) FILTER (WHERE transaction_type = 'deposit') AS deposit_amount,
       
       COUNT(*) FILTER (WHERE transaction_type = 'withdrawal') AS withdrawal_count,
       SUM(amount) FILTER (WHERE transaction_type = 'withdrawal') AS withdrawal_amount,
       
       COUNT(*) FILTER (WHERE transaction_type = 'transfer') AS transfer_count,
       SUM(amount) FILTER (WHERE transaction_type = 'transfer') AS transfer_amount,
       
       COUNT(*) FILTER (WHERE transaction_type = 'payment') AS payment_count,
       SUM(amount) FILTER (WHERE transaction_type = 'payment') AS payment_amount
FROM transaction_data
GROUP BY user_id;

### 4. Spajanje podataka

Kreirajte materijalizovani pogled koji sadrži sve informacije izračunate prethodnim agregiranjem: user_activity_summary i user_transaction_summary.

In [None]:
CREATE MATERIALIZED VIEW merged_dataset AS
SELECT ua.user_id,
       ua.take_product_count,
       ua.leave_product_count,
       ts.deposit_count,
       ts.deposit_amount,
       ts.withdrawal_count,
       ts.withdrawal_amount,
       ts.transfer_count,
       ts.transfer_amount,
       ts.payment_count,
       ts.payment_amount
FROM user_activity_summary ua
JOIN user_transaction_summary ts ON ua.user_id = ts.user_id;

### 5. Praćenje posljednje aktivnosti korisnika (latest_activity_per_user)
Kreirati materijalizovani pogled koji za svakog korisnika prikazuje njegov posljednji zapis aktivnosti.   
Za svaki zapis čuvaju se svi detalji aktivnosti (user_id, action, timestamp) gdje se izbor vrši na osnovu najnovijeg timestamp-a po korisniku.

In [None]:
CREATE MATERIALIZED VIEW IF NOT EXISTS latest_activity_per_user AS
SELECT * 
FROM activity_data ad
WHERE timestamp = (
    SELECT MAX(timestamp)
    FROM activity_data
    WHERE ad.user_id = user_id
);

### 6. TUMBLE funkcija

6.1. Generisanje vremenskih prozora   
Kreirati upit koji dijeli podatke iz `activity_data` u vremenske prozore od 5 sekundi, koristeći TUMBLE funkciju.

In [None]:
SELECT *
FROM TUMBLE(activity_data, timestamp, INTERVAL '5 SECONDS');

6.2. Filtriranje najnovijeg vremenskog prozora  
Prikazati sve aktivnosti koje pripadaju posljednjem vremenskom prozoru.

In [None]:
SELECT *
FROM TUMBLE(activity_data, timestamp, INTERVAL '5 SECONDS')
WHERE window_start = (
    SELECT MAX(window_start)
    FROM TUMBLE(activity_data, timestamp, INTERVAL '5 SECONDS')
);

6.3. Agregacija po vremenskim prozorima  
Izračunati broj aktivnosti po korisniku u svakom vremenskom prozoru.

In [None]:
SELECT
    user_id,
    window_start,
    window_end,
    count(action) as actions_count
FROM TUMBLE(activity_data, timestamp, INTERVAL '5 SECONDS')
GROUP BY user_id, window_start, window_end;

### 7. Definisanje funkcije za izvršavanje SQL upita

Definisati funkciju `execute_query` koja izvršava SQL upit nad zadatom bazom. Koristiti `psycopg2` biblioteku za povezivanje na bazu i izvršavanje upita.  
Izvršiti prethodne upite koristeći ovu funkciju.

In [1]:
import psycopg2


RISINGWAVE_DB = "dev"
RISINGWAVE_HOST = "localhost"
RISINGWAVE_USER = "root"
RISINGWAVE_PASSWORD = "root"
RISINGWAVE_PORT = 4566


def execute_query(query):
    connection_params = {
        "host": RISINGWAVE_HOST,
        "port": RISINGWAVE_PORT,
        "user": RISINGWAVE_USER,
        "password": RISINGWAVE_PASSWORD,
        "database": RISINGWAVE_DB
    }
    
    try:
        with psycopg2.connect(**connection_params) as conn:
            with conn.cursor() as cursor:
                cursor.execute(query)
                
                is_select = query.strip().upper().startswith('SELECT')
                
                if is_select:
                    result = cursor.fetchall()
                    return True, result
                else:
                    conn.commit()
                    affected_rows = cursor.rowcount
                    return True, affected_rows
    except Exception as e:
        print(f"Error executing query: {e}")
        return False


### Zadatak: Kreiranje RisingWave utils modula

Kreirati Python modul `rw_utils.py` koji će sadržati pomoćne funkcije za rad sa RisingWave-om koristeći psycopg2 paket.

*Proučiti [psycopg - PostgreSQL database adapter for Python dokumentaciju](https://www.psycopg.org/docs/).*

Modul treba da sadrži:
- Funkciju za povezivanje na bazu
- Jednu ili više funkcija za izvršavanje SQL upita