# Building Analytical Database

**Author:**  

Pavel Grigoryev

**Project Description:**

Our startup's messenger and news feed app generates extensive user interaction data. To enable product monitoring, we need to build an optimized analytical database that will power our business intelligence dashboards.

**Project Goal:**

To design and implement a performant database structure that serves as the backend for product dashboards, enabling tracking of key metrics like DAU, retention, and engagement across both messaging and feed features.

**Data Sources:**

- `feed_actions` - News feed activity
- `message_actions` - Messaging activity 

**Main Conclusion:**

- **Star Schema Designed:** Developed a star schema for the analytical database
- **Metric Queries Created:** Built queries to the product database for calculating required metrics for the analytical database
- **Star Schema Tables Implemented:** Created all necessary tables for the star schema in the analytical database
- **Data Migration Completed:** Extracted required data from the product database and loaded it into the analytical database
- **Dashboard Optimization:** Created materialized views to optimize dashboard performance
- **Automation Pipeline Built:** Developed Airflow DAG for daily incremental data loading for yesterday's data
- **Goal Achieved:** Established analytical database and ETL process for continuous data updates

# Importing Libraries

In [None]:
from datetime import datetime, timedelta
import pandas as pd
from io import StringIO
import requests
from dotenv import load_dotenv
from sqlalchemy import create_engine
from sqlalchemy import text
import os
import itertools
from clickhouse_driver import Client
from tqdm import tqdm
import time
load_dotenv();

# Data Description

In the product database on ClickHouse, the data is stored in the following tables

Table feed_actions

Field | Description
-|-
user_id | User ID
post_id | Post ID
action | Action: view or like
time | Timestamp
gender | User's gender
age | User's age (1 = Male)
os | User's OS
source | Traffic source
country | User's country
city | User's city
exp_group | A/B test group

Table message_actions

Field | Description
-|-
user_id | Sender's ID
receiver_id | Receiver's ID
time | Send timestamp
gender | Sender's gender
age | Sender's age (1 = Male)
os | Sender's OS
source | Sender's traffic source
country | Sender's country
city | Sender's city
exp_group | Sender's A/B test group

# Database Connection

Let's create clients for connection to source and destination databases

In [None]:
src_ch_client = Client(
    host=os.getenv('SRC_HOST'),
    user=os.getenv('SRC_USER'),
    password=os.getenv('SRC_PASSWORD'), 
    database=os.getenv('SRC_DATABASE'),   
    compression=False, 
)

dst_ch_client = Client(
    host=os.getenv('DST_HOST'),
    user=os.getenv('DST_USER'),
    password=os.getenv('DST_PASSWORD'), 
    database=os.getenv('DST_DATABASE'),  
    compression=False,     
)

Create a function that will migrate data from one database to another in chunks

In [None]:
def ch_migrate_db_chunked(
    src_client,
    dst_client,
    src_query,
    dst_table_name,
    chunk_size=50_000,
):
    """
    Migrates data from source to destination in chunks
    
    Args:
        src_client: clickhouse_driver.Client for source
        dst_client: clickhouse_driver.Client for destination  
        src_query (str): SQL query to source
        dst_table_name (str): Table name in destination
        chunk_size (int): Chunk size for data migration
    """
    
    total_rows = 0
    chunk_number = 0
    
    print(f"🚀 Starting data migration to table: {dst_table_name}")
    print(f"📊 Chunk size: {chunk_size:,} rows")
    
    try:
        # Determine total row count
        print(f"📈 Calculating total rows for migration...")     
        count_query = f"SELECT count() FROM ({src_query})"
        total_count = src_client.execute(count_query)[0][0]
        print(f"📈 Total rows to migrate: {total_count:,}")        
        # Use execute_iter for streaming data reading
        result_iter = src_client.execute_iter(
            src_query, 
            with_column_types=False  
        )    
        progress_bar = tqdm(
            total=total_count,
            desc="📦 Data migration",
            unit=" rows",
            unit_scale=True,
            ncols=100,
            mininterval=1.0, 
            maxinterval=5.0             
        )      
        
        while True:
            # Read chunk_size rows per operation
            # Creates iterator slice as another iterator
            chunk = list(itertools.islice(result_iter, chunk_size))
            # If chunk is empty - exit loop
            if not chunk:
                break            
            chunk_number += 1
            dst_client.execute(
                f"INSERT INTO {dst_table_name} VALUES",
                chunk,
                types_check=True
            )
            
            total_rows += len(chunk)
            progress_bar.update(len(chunk))
            progress_bar.set_postfix({
                'chunk': chunk_number,
                'total': f"{total_rows:,}"
            })
            
        progress_bar.close()
        print(f"🎉 Migration completed!\nTotal migrated: {total_rows:,} rows\nChunks: {chunk_number}")
                
    except Exception as e:
        print(f"❌ Error migrating table {dst_table_name}: {e}")
        if 'progress_bar' in locals():
            progress_bar.close()                    
        raise

# Designing the Analytical Database

Let's create an analytical database for the dashboard.

We will use a star schema as it is optimally suited for analytical databases.

## Creating fact_daily_feed

Create a fact table for news feed metrics.

Create a query to the product database to calculate the required metrics.

In [None]:
QUERY_FACT_DAILY_FEED = """
    WITH user_actions as (
        SELECT
            user_id
            , post_id
            , action
            , time
            , lagInFrame(time) OVER (PARTITION BY  user_id, toDate(time), action ORDER BY time) prev_time
            , minIf(time, action = 'view') OVER (PARTITION BY  user_id, toDate(time), post_id) first_view_in_day
            , minIf(time, action = 'like') OVER (PARTITION BY  user_id, toDate(time), post_id) first_like_in_day
        FROM 
            feed_actions 
        WHERE 
            toDate(time) <= yesterday()  
    )
    SELECT
        toDate(time) as date
        , user_id
        , minIf(time, action = 'view') as first_view_time
        , minIf(time, action = 'like') as first_like_time
        , maxIf(time, action = 'view') as last_view_time
        , maxIf(time, action = 'like') as last_like_time    
        , uniq(post_id) as posts
        , countIf(action = 'view') as views
        , countIf(action = 'like') as likes
        , avgIf(time - prev_time, action = 'view' and prev_time != toDateTime(0)) as avg_time_between_views
        , avgIf(time - prev_time, action = 'like' and prev_time != toDateTime(0)) as avg_time_between_likes
        , avgIf(
            first_like_in_day - first_view_in_day
            , first_like_in_day != toDateTime(0) 
            and first_view_in_day != toDateTime(0) 
            and first_like_in_day >= first_view_in_day
        ) as avg_view_to_like_seconds    
    FROM 
        user_actions
    GROUP BY
        date
        , user_id
    ORDER BY 
        date
        , user_id     
"""

Create a table in the analytical database.

In [None]:
query_create = """
    CREATE TABLE fact_daily_feed (
        date Date
        , user_id UInt32
        , first_view_time DateTime
        , first_like_time DateTime
        , last_view_time DateTime
        , last_like_time DateTime        
        , posts UInt16
        , views UInt16
        , likes UInt16
        , avg_time_between_views Float32
        , avg_time_between_likes Float32
        , avg_view_to_like_seconds Float32
    ) ENGINE = MergeTree()
    PARTITION BY toYYYYMM(date)
    ORDER BY (date, user_id, views, likes)
"""

In [None]:
dst_ch_client.execute(query_create)

Populate the table with data.

In [None]:
ch_migrate_db_chunked(
    src_client=src_ch_client,
    dst_client=dst_ch_client,
    src_query=QUERY_FACT_DAILY_FEED,
    dst_table_name='fact_daily_feed',
)

## Creating fact_daily_posts

Create a fact table for post metrics.

Create a query to the product database to calculate the required metrics.

In [None]:
QUERY_FACT_DAILY_POSTS = """
    WITH post_actions as (
        SELECT
            user_id
            , post_id
            , action
            , time
            , lagInFrame(time) OVER (PARTITION BY  post_id, toDate(time), action ORDER BY time) prev_time
            , minIf(time, action = 'view') OVER (PARTITION BY toDate(time), post_id) first_view_in_day
            , minIf(time, action = 'like') OVER (PARTITION BY toDate(time), post_id) first_like_in_day
        FROM 
            feed_actions 
        WHERE 
            toDate(time) <= yesterday()    
    )
    SELECT
        toDate(time) as date
        , post_id
        , min(time) as first_action_time
        , max(time) as last_action_time
        , countIf(action = 'view') as views
        , countIf(action = 'like') as likes
        , uniqIf(user_id, action = 'view') as unique_viewers
        , uniqIf(user_id, action = 'like') as unique_likers
        , avgIf(time - prev_time, action = 'view' and prev_time != toDateTime(0)) as avg_time_between_views
        , avgIf(time - prev_time, action = 'like' and prev_time != toDateTime(0)) as avg_time_between_likes
        , avgIf(
            first_like_in_day - first_view_in_day
            , first_like_in_day != toDateTime(0) 
            and first_view_in_day != toDateTime(0) 
            and first_like_in_day >= first_view_in_day
        ) as avg_view_to_like_seconds    
    FROM 
        post_actions
    GROUP BY
        date
        , post_id
    ORDER BY 
        date
        , post_id     
"""    

Create a table in the analytical database.

In [None]:
query_create = """
    CREATE TABLE fact_daily_posts (
        date Date
        , post_id UInt32
        , first_action_time DateTime
        , last_action_time DateTime
        , views UInt32
        , likes UInt16
        , unique_viewers UInt16
        , unique_likers UInt16
        , avg_time_between_views Float32
        , avg_time_between_likes Float32
        , avg_view_to_like_seconds Float32
    ) ENGINE = MergeTree()
    PARTITION BY toYYYYMM(date)
    ORDER BY (date, post_id, views, likes)
"""

In [None]:
dst_ch_client.execute(query_create)

Populate the table with data.

In [None]:
ch_migrate_db_chunked(
    src_client=src_ch_client,
    dst_client=dst_ch_client,
    src_query=QUERY_FACT_DAILY_POSTS,
    dst_table_name='fact_daily_posts',
)

## Creating fact_daily_messenger

Create a fact table for messenger metrics.

Create a query to the product database to calculate the required metrics.

In [None]:
QUERY_FACT_DAILY_MESSENGER = """
    WITH user_actions as (
        SELECT 
            time
            , user_id
            , receiver_id 
            , lagInFrame(time) OVER (PARTITION BY  toDate(time), user_id ORDER BY time) user_prev_time
            , lagInFrame(time) OVER (PARTITION BY  toDate(time), receiver_id ORDER BY time) receiver_prev_time
        FROM 
            message_actions 
        WHERE 
            toDate(time) <= yesterday()    
        )
        , senders as (
        SELECT
            toDate(time) as date
            , user_id 
            , min(time) as first_sent_time
            , max(time) as last_sent_time    
            , uniq(receiver_id) as users_sent
            , count() as messages_sent
            , avgIf(time - user_prev_time, user_prev_time != toDateTime(0)) avg_time_between_messages_sent
        FROM 
            user_actions   
        GROUP BY 
            date
            , user_id
        )
    , receivers as (
        SELECT
            toDate(time) as date
            , receiver_id as user_id 
            , min(time) as first_received_time
            , max(time) as last_received_time      
            , uniq(user_id) as users_received
            , count() as messages_received
            , avgIf(time - receiver_prev_time, receiver_prev_time != toDateTime(0)) avg_time_between_messages_received
        FROM 
            user_actions  
        GROUP BY 
            date
            , receiver_id
    )
    SELECT 
        if(s.user_id != 0, s.date, r.date) as date
        , if(s.user_id != 0, s.user_id, r.user_id) as user_id
        , s.first_sent_time
        , s.last_sent_time
        , s.users_sent
        , s.messages_sent
        , s.avg_time_between_messages_sent
        , r.first_received_time
        , r.last_received_time
        , r.users_received
        , r.messages_received
        , r.avg_time_between_messages_received  
    FROM 
        senders s
        FULL JOIN receivers r ON s.date = r.date AND s.user_id = r.user_id
    ORDER BY 
        date
        , user_id      
"""    

Create a table in the analytical database.

In [None]:
query_create = """
    CREATE TABLE fact_daily_messenger (
        date Date
        , user_id UInt32
        , first_sent_time DateTime
        , last_sent_time DateTime
        , users_sent UInt16
        , messages_sent UInt16
        , avg_time_between_messages_sent Float32
        , first_received_time DateTime
        , last_received_time DateTime
        , users_received UInt16
        , messages_received UInt16
        , avg_time_between_messages_received Float32
    ) ENGINE = MergeTree()
    PARTITION BY toYYYYMM(date)
    ORDER BY (date, user_id, messages_sent, messages_received)
"""

In [None]:
dst_ch_client.execute(query_create)

Populate the table with data.

In [None]:
ch_migrate_db_chunked(
    src_client=src_ch_client,
    dst_client=dst_ch_client,
    src_query=QUERY_FACT_DAILY_MESSENGER,
    dst_table_name='fact_daily_messenger',
)

## Creating fact_user_connections

Create a fact table for user connections in the messenger.

Create a query to the product database to calculate the required metrics.

In [None]:
QUERY_FACT_USER_CONNECTIONS = """
    SELECT 
        toDate(time) as date
        , user_id as sender_id
        , receiver_id
        , count() as messages_count
    FROM 
        message_actions
    WHERE 
        toDate(time) <= yesterday()  
    GROUP BY 
        date
        , sender_id
        , receiver_id
    ORDER BY 
        date
        , sender_id
        , receiver_id     
"""  

Create a table in the analytical database.

In [None]:
query_create = """
    CREATE TABLE fact_user_connections (
        date Date
        , sender_id UInt32
        , receiver_id UInt32
        , messages_count UInt16
    ) ENGINE = MergeTree()
    PARTITION BY toYYYYMM(date)
    ORDER BY (date, sender_id, receiver_id, messages_count)
"""

In [None]:
dst_ch_client.execute(query_create)

Populate the table with data.

In [None]:
ch_migrate_db_chunked(
    src_client=src_ch_client,
    dst_client=dst_ch_client,
    src_query=QUERY_FACT_USER_CONNECTIONS,
    dst_table_name='fact_user_connections',
)

## Creating dim_users

Create a dimension table for users.

Create a query to the product database to calculate the required metrics.

In [None]:
QUERY_DIM_USERS = """
    SELECT 
        user_id
        , argMax(gender, time) as gender
        , argMax(age, time) as age
        , argMax(source, time) as source
        , argMax(os, time) as os
        , argMax(city, time) as city
        , argMax(country, time) as country
        , argMax(exp_group, time) as exp_group
        , toDate(now()) as version
    FROM (
        SELECT
            time
            , user_id
            , gender
            , age
            , source
            , os 
            , city 
            , country
            , exp_group 
        FROM 
            feed_actions 
        UNION ALL
        SELECT
            time
            , user_id
            , gender
            , age
            , source
            , os 
            , city 
            , country
            , exp_group       
        FROM 
            message_actions    
    )
    WHERE 
        toDate(time) <= yesterday()
    GROUP BY 
        user_id
    ORDER BY 
        user_id
"""  

Create a table in the analytical database.

In [None]:
query_create = """
    CREATE TABLE dim_users (
        user_id UInt32
        , gender UInt8
        , age UInt8
        , source LowCardinality(String)
        , os LowCardinality(String)
        , city String
        , country LowCardinality(String)
        , exp_group UInt8
        , version Date  
    ) ENGINE = ReplacingMergeTree(version)
    ORDER BY (user_id)
"""

In [None]:
dst_ch_client.execute(query_create)

Populate the table with data.

In [None]:
ch_migrate_db_chunked(
    src_client=src_ch_client,
    dst_client=dst_ch_client,
    src_query=QUERY_DIM_USERS,
    dst_table_name='dim_users',
)

## Creating dim_dates

Create a dimension table for dates.

In [None]:
query_create = """
    CREATE TABLE dim_dates (
        date Date
        , day_of_week UInt8
        , day_name String
        , month UInt8
        , month_name String
        , quarter UInt8
        , year UInt16
        , is_weekend UInt8
        , week_number UInt8
    ) ENGINE = MergeTree()
    ORDER BY (date)
    PRIMARY KEY (date);
"""

In [None]:
dst_ch_client.execute(query_create)

Populate the table with data.

In [None]:
query_insert = """
    INSERT INTO dim_dates
    SELECT 
        date
        , toDayOfWeek(date) as day_of_week
        , dateName('day', date) as day_name
        , toMonth(date) as month
        , dateName('month', date) as month_name
        , toQuarter(date) as quarter
        , toYear(date) as year
        , if(day_of_week IN (6, 7), 1, 0) as is_weekend
        , toISOWeek(date) as week_number
    FROM (
        SELECT toDate('2025-01-01') + number as date
        FROM numbers(730)  
);
"""

In [None]:
dst_ch_client.execute(query_insert)

As a result, we obtained the following schema.

<img src="er.png" alt="">

# Creating Materialized Views

To optimize data loading into the dashboard, let's create materialized views.

## App Daily Activity Materialized View

Create materialized мiew for app daily activity.

In [None]:
QUERY_APP_DAILY_ACTIVITY_MV = """
    CREATE MATERIALIZED VIEW mv_app_daily_activity AS
    WITH merged_stats AS ( 
        SELECT 
            COALESCE(f.date, m.date) AS date
            , COALESCE(f.user_id, m.user_id) AS user_id
            , f.likes
            , f.views
            , m.messages_sent
        FROM
            fact_daily_feed f
            FULL JOIN fact_daily_messenger m ON f.date = m.date AND f.user_id = m.user_id 
    )
    , daily_metrics AS (
        SELECT 
            date
            , user_id
            , date - MIN(date) OVER (PARTITION BY user_id) AS lifetime
            , views
            , likes 
            , posts    
            , messages_sent 
        FROM
            merged_stats
    )
    , base_metrics AS (
        SELECT 
            m.date
            , u.gender
            , u.age 
            , u.source
            , u.os
            , u.city 
            , u.country
            , COUNT(DISTINCT m.user_id) AS total_users
            , COUNT(DISTINCT m.user_id) FILTER (WHERE messages_sent IS NULL) AS feed_only_users
            , COUNT(DISTINCT m.user_id) FILTER (WHERE views IS NULL) AS messenger_only_users
            , COUNT(DISTINCT m.user_id) FILTER (WHERE views IS NOT NULL AND messages_sent IS NOT NULL) AS both_services_users
            , COUNT(DISTINCT m.user_id) FILTER (WHERE messages_sent IS NOT NULL) AS messenger_users
            , COUNT(DISTINCT m.user_id) FILTER (WHERE lifetime = 0) AS new_users_lifetime_0
            , COUNT(DISTINCT m.user_id) FILTER (WHERE lifetime = 7) AS users_lifetime_7d
            , SUM(views) AS views
            , SUM(likes) AS likes
            , SUM(messages_sent) AS messages_sent
        FROM
            daily_metrics m
            LEFT JOIN dim_users u ON m.user_id = u.user_id 
        GROUP BY
            m.date
            , u.gender
            , u.age 
            , u.source
            , u.os
            , u.city 
            , u.country
    )
    , lag_metrics AS (
        SELECT 
            date + INTERVAL '7 days' as date
            , gender
            , age 
            , source
            , os
            , city 
            , country
            , new_users_lifetime_0 as new_users_7_days_ago
        FROM 
            base_metrics
    )
    SELECT 
        COALESCE(b.date, l.date) as date
        , COALESCE(b.gender, l.gender) as gender
        , COALESCE(b.age, l.age) as age
        , COALESCE(b.source, l.source) as source
        , COALESCE(b.os, l.os) as os
        , COALESCE(b.city, l.city) as city
        , COALESCE(b.country, l.country) as country
        , COALESCE(b.total_users, 0) as total_users
        , COALESCE(b.feed_only_users, 0) as feed_only_users
        , COALESCE(b.messenger_only_users, 0) as messenger_only_users
        , COALESCE(b.both_services_users, 0) as both_services_users
        , COALESCE(b.messenger_users, 0) as messenger_users
        , COALESCE(b.new_users_lifetime_0, 0) as new_users_lifetime_0
        , COALESCE(b.users_lifetime_7d, 0) as users_lifetime_7d
        , COALESCE(l.new_users_7_days_ago, 0) as new_users_7_days_ago
        , COALESCE(b.views, 0) as views
        , COALESCE(b.likes, 0) as likes
    FROM 
        base_metrics b
        FULL JOIN lag_metrics l ON 
            b.date = l.date 
            AND b.gender = l.gender
            AND b.age = l.age
            AND b.source = l.source
            AND b.os = l.os
            AND b.city = l.city
            AND b.country = l.country
    ORDER BY
        date
        , gender
        , age
        , source
        , os
        , city
        , country
"""    

In [None]:
dst_ch_client.execute(QUERY_APP_DAILY_ACTIVITY_MV)

## Feed Daily Activity Materialized View

Create materialized мiew for feed daily activity

In [None]:
QUERY_FEED_DAILY_ACTIVITY_MV = """
    CREATE MATERIALIZED VIEW mv_feed_daily_activity AS
    WITH daily_metrics AS (
        SELECT 
            date
            , user_id
            , date - MIN(date) OVER (PARTITION BY user_id) AS lifetime
            , views
            , likes 
            , posts
            , avg_view_to_like_seconds        
        FROM
            fact_daily_feed
    )
    , base_metrics AS (
        SELECT 
            m.date
            , u.gender
            , u.age 
            , u.source
            , u.os
            , u.city 
            , u.country
            , COUNT(DISTINCT m.user_id) AS feed_users
            , COUNT(DISTINCT m.user_id) FILTER (WHERE lifetime = 0) AS feed_new_users_lifetime_0
            , COUNT(DISTINCT m.user_id) FILTER (WHERE lifetime = 7) AS feed_users_lifetime_7d
            , SUM(views) AS views
            , SUM(likes) AS likes
            , SUM(posts) AS posts
            , AVG(avg_view_to_like_seconds) AS avg_view_to_like_seconds
        FROM
            daily_metrics m
            LEFT JOIN public.dim_users u ON m.user_id = u.user_id 
        GROUP BY
            m.date
            , u.gender
            , u.age 
            , u.source
            , u.os
            , u.city 
            , u.country
    )
    , lag_metrics AS (
        SELECT 
            date + INTERVAL '7 days' as date
            , gender
            , age 
            , source
            , os
            , city 
            , country
            , feed_new_users_lifetime_0 as feed_new_users_7_days_ago
        FROM 
            base_metrics
    )
    SELECT 
        COALESCE(b.date, l.date) as date
        , COALESCE(b.gender, l.gender) as gender
        , COALESCE(b.age, l.age) as age
        , COALESCE(b.source, l.source) as source
        , COALESCE(b.os, l.os) as os
        , COALESCE(b.city, l.city) as city
        , COALESCE(b.country, l.country) as country
        , COALESCE(b.views, 0) as views
        , COALESCE(b.likes, 0) as likes
        , COALESCE(b.posts, 0) as posts
        , COALESCE(b.avg_view_to_like_seconds, 0) as avg_view_to_like_seconds
        , COALESCE(b.feed_users, 0) as feed_users
        , COALESCE(b.feed_new_users_lifetime_0, 0) as feed_new_users_lifetime_0
        , COALESCE(b.feed_users_lifetime_7d, 0) as feed_users_lifetime_7d
        , COALESCE(l.feed_new_users_7_days_ago, 0) as feed_new_users_7_days_ago
    FROM 
        base_metrics b
        FULL JOIN lag_metrics l ON 
            b.date = l.date 
            AND b.gender = l.gender
            AND b.age = l.age
            AND b.source = l.source
            AND b.os = l.os
            AND b.city = l.city
            AND b.country = l.country
    ORDER BY
        date
        , gender
        , age
        , source
        , os
        , city
        , country
"""        

In [None]:
dst_ch_client.execute(QUERY_FEED_DAILY_ACTIVITY_MV)

## Messenger Daily Activity Materialized View

Create materialized мiew for messenger daily activity

In [None]:
QUERY_MESSENGER_DAILY_ACTIVITY_MV = """
    CREATE MATERIALIZED VIEW mv_messenger_daily_activity AS
    WITH daily_metrics AS (
        SELECT 
            date
            , user_id
            , date - MIN(date) OVER (PARTITION BY user_id) AS lifetime
            , messages_sent
            , messages_received
            , users_sent
            , users_received
            , avg_time_between_messages_sent
        FROM 
            fact_daily_messenger
    )
    , base_metrics AS (
        SELECT 
            m.date
            , u.gender
            , u.age 
            , u.source
            , u.os
            , u.city 
            , u.country
            , COUNT(DISTINCT m.user_id) FILTER (WHERE users_sent != 0) AS total_senders
            , COUNT(DISTINCT m.user_id) FILTER (WHERE users_sent = 0) AS total_receivers
            , COUNT(DISTINCT m.user_id) FILTER (WHERE lifetime = 0) AS new_users_lifetime_0
            , COUNT(DISTINCT m.user_id) FILTER (WHERE lifetime = 7) AS users_lifetime_7d        
            , SUM(messages_sent) AS messages_sent
            , AVG(avg_time_between_messages_sent) AS avg_time_between_messages_sent
        FROM 
            daily_metrics m
            LEFT JOIN public.dim_users u ON m.user_id = u.user_id 
        GROUP BY
            m.date
            , u.gender
            , u.age 
            , u.source
            , u.os
            , u.city 
            , u.country
    )
    , lag_metrics AS (
        SELECT 
            date + INTERVAL '7 days' as date
            , gender
            , age 
            , source
            , os
            , city 
            , country
            , new_users_lifetime_0 as new_messenger_users_7_days_ago
        FROM 
            base_metrics
    )
    SELECT 
        COALESCE(b.date, l.date) as date
        , COALESCE(b.gender, l.gender) as gender
        , COALESCE(b.age, l.age) as age
        , COALESCE(b.source, l.source) as source
        , COALESCE(b.os, l.os) as os
        , COALESCE(b.city, l.city) as city
        , COALESCE(b.country, l.country) as country
        , COALESCE(b.total_senders, 0) as total_senders
        , COALESCE(b.total_receivers, 0) as total_receivers
        , COALESCE(b.new_users_lifetime_0, 0) as new_users_lifetime_0
        , COALESCE(b.users_lifetime_7d, 0) as users_lifetime_7d
        , COALESCE(l.new_messenger_users_7_days_ago, 0) as new_messenger_users_7_days_ago
        , COALESCE(b.messages_sent, 0) as messages_sent
        , COALESCE(b.avg_time_between_messages_sent, 0) as avg_time_between_messages_sent
    FROM 
        base_metrics b
        FULL JOIN lag_metrics l ON 
            b.date = l.date 
            AND b.gender = l.gender
            AND b.age = l.age
            AND b.source = l.source
            AND b.os = l.os
            AND b.city = l.city
            AND b.country = l.country
    ORDER BY
        date
        , gender
        , age
        , source
        , os
        , city
        , country
"""        

In [None]:
dst_ch_client.execute(QUERY_MESSENGER_DAILY_ACTIVITY_MV)

# ETL Process (Airflow DAG)

To add data for yesterday, we will create an Airflow DAG.

- We will create a file with queries to extract data for yesterday.
- The queries for the tables will be analogous to the queries used to create the analytical database. Only the data will be taken for yesterday.

In [None]:
# analytics_daily_etl_queries.py
"""
SQL queries for analytics_daily_etl dag
"""

# ==========================================================================
# Queries for extract
# ==========================================================================

QUERY_FACT_DAILY_FEED_EXTRACT = """
    WITH user_actions as (
        SELECT
            user_id
            , post_id
            , action
            , time
            , lagInFrame(time) OVER (PARTITION BY  user_id, toDate(time), action ORDER BY time) prev_time
            , minIf(time, action = 'view') OVER (PARTITION BY  user_id, toDate(time), post_id) first_view_in_day
            , minIf(time, action = 'like') OVER (PARTITION BY  user_id, toDate(time), post_id) first_like_in_day
        FROM 
            feed_actions 
        WHERE 
            toDate(time) = yesterday()    
    )
    SELECT
        toDate(time) as date
        , user_id
        , minIf(time, action = 'view') as first_view_time
        , minIf(time, action = 'like') as first_like_time
        , maxIf(time, action = 'view') as last_view_time
        , maxIf(time, action = 'like') as last_like_time    
        , uniq(post_id) as posts
        , countIf(action = 'view') as views
        , countIf(action = 'like') as likes
        , avgIf(time - prev_time, action = 'view' and prev_time != toDateTime(0)) as avg_time_between_views
        , avgIf(time - prev_time, action = 'like' and prev_time != toDateTime(0)) as avg_time_between_likes
        , avgIf(
            first_like_in_day - first_view_in_day
            , first_like_in_day != toDateTime(0) 
            and first_view_in_day != toDateTime(0) 
            and first_like_in_day >= first_view_in_day
        ) as avg_view_to_like_seconds    
    FROM 
        user_actions
    GROUP BY
        date
        , user_id
    ORDER BY 
        date
        , user_id     
"""

QUERY_FACT_DAILY_POSTS_EXTRACT = """
    WITH post_actions as (
        SELECT
            user_id
            , post_id
            , action
            , time
            , lagInFrame(time) OVER (PARTITION BY  post_id, toDate(time), action ORDER BY time) prev_time
            , minIf(time, action = 'view') OVER (PARTITION BY toDate(time), post_id) first_view_in_day
            , minIf(time, action = 'like') OVER (PARTITION BY toDate(time), post_id) first_like_in_day
        FROM 
            feed_actions 
        WHERE 
            toDate(time) = yesterday()    
    )
    SELECT
        toDate(time) as date
        , post_id
        , min(time) as first_action_time
        , max(time) as last_action_time
        , countIf(action = 'view') as views
        , countIf(action = 'like') as likes
        , uniqIf(user_id, action = 'view') as unique_viewers
        , uniqIf(user_id, action = 'like') as unique_likers
        , avgIf(time - prev_time, action = 'view' and prev_time != toDateTime(0)) as avg_time_between_views
        , avgIf(time - prev_time, action = 'like' and prev_time != toDateTime(0)) as avg_time_between_likes
        , avgIf(
            first_like_in_day - first_view_in_day
            , first_like_in_day != toDateTime(0) 
            and first_view_in_day != toDateTime(0) 
            and first_like_in_day >= first_view_in_day
        ) as avg_view_to_like_seconds    
    FROM 
        post_actions
    GROUP BY
        date
        , post_id
    ORDER BY 
        date
        , post_id           
"""    

QUERY_FACT_DAILY_MESSENGER_EXTRACT = """
    WITH user_actions as (
        SELECT 
            time
            , user_id
            , receiver_id 
            , lagInFrame(time) OVER (PARTITION BY  toDate(time), user_id ORDER BY time) user_prev_time
            , lagInFrame(time) OVER (PARTITION BY  toDate(time), receiver_id ORDER BY time) receiver_prev_time
        FROM 
            message_actions 
        WHERE 
            toDate(time) = yesterday()    
        )
        , senders as (
        SELECT
            toDate(time) as date
            , user_id 
            , min(time) as first_sent_time
            , max(time) as last_sent_time    
            , uniq(receiver_id) as users_sent
            , count() as messages_sent
            , avgIf(time - user_prev_time, user_prev_time != toDateTime(0)) avg_time_between_messages_sent
        FROM 
            user_actions   
        GROUP BY 
            date
            , user_id
        )
    , receivers as (
        SELECT
            toDate(time) as date
            , receiver_id as user_id 
            , min(time) as first_received_time
            , max(time) as last_received_time      
            , uniq(user_id) as users_received
            , count() as messages_received
            , avgIf(time - receiver_prev_time, receiver_prev_time != toDateTime(0)) avg_time_between_messages_received
        FROM 
            user_actions  
        GROUP BY 
            date
            , receiver_id
    )
    SELECT 
        if(s.user_id != 0, s.date, r.date) as date
        , if(s.user_id != 0, s.user_id, r.user_id) as user_id
        , s.first_sent_time
        , s.last_sent_time
        , s.users_sent
        , s.messages_sent
        , s.avg_time_between_messages_sent
        , r.first_received_time
        , r.last_received_time
        , r.users_received
        , r.messages_received
        , r.avg_time_between_messages_received  
    FROM 
        senders s
        FULL JOIN receivers r ON s.date = r.date AND s.user_id = r.user_id
    ORDER BY 
        date
        , user_id            
"""    

QUERY_FACT_USER_CONNECTIONS_EXTRACT = """
    SELECT 
        toDate(time) as date
        , user_id as sender_id
        , receiver_id
        , count() as messages_count
    FROM 
        message_actions
    WHERE 
        toDate(time) = yesterday()  
    GROUP BY 
        date
        , sender_id
        , receiver_id
    ORDER BY 
        date
        , sender_id
        , receiver_id         
"""  

QUERY_DIM_USERS_EXTRACT = """
    SELECT 
        user_id
        , argMax(gender, time) as gender
        , argMax(age, time) as age
        , argMax(source, time) as source
        , argMax(os, time) as os
        , argMax(city, time) as city
        , argMax(country, time) as country
        , argMax(exp_group, time) as exp_group
        , toDate(now()) as version
    FROM (
        SELECT
            time
            , user_id
            , gender
            , age
            , source
            , os 
            , city 
            , country
            , exp_group 
        FROM 
            feed_actions 
        UNION ALL
        SELECT
            time
            , user_id
            , gender
            , age
            , source
            , os 
            , city 
            , country
            , exp_group       
        FROM 
            message_actions    
    )
    WHERE 
        toDate(time) = yesterday()
    GROUP BY 
        user_id
    ORDER BY 
        user_id     
"""  

# ==========================================================================
# Queries for load
# ==========================================================================

QUERY_FACT_DAILY_FEED_LOAD = """
    INSERT INTO fact_daily_feed VALUES    
"""

QUERY_FACT_DAILY_POSTS_LOAD = """
    INSERT INTO fact_daily_posts VALUES      
"""    

QUERY_FACT_DAILY_MESSENGER_LOAD = """
    INSERT INTO fact_daily_messenger VALUES        
"""    

QUERY_FACT_USER_CONNECTIONS_LOAD = """
    INSERT INTO fact_user_connections VALUES
"""  

QUERY_DIM_USERS_LOAD = """
    INSERT INTO dim_users VALUES 
"""  

Create an Airflow DAG that will daily add data from the previous day to the analytical database.

In [None]:
# analytics_daily_etl.py
"""
Analytics Daily ETL DAG

This DAG extracts, validates and loads daily analytics data with quality checks
from operational database to analytics warehouse.
"""
from datetime import datetime, timedelta
from airflow.sdk import dag, task
from airflow_clickhouse_plugin.hooks.clickhouse import ClickHouseHook
import itertools
import pandas as pd
import numpy as np
import logging
from textwrap import dedent
from elt_queries import (
    QUERY_FACT_DAILY_FEED_EXTRACT
    , QUERY_FACT_DAILY_POSTS_EXTRACT
    , QUERY_FACT_DAILY_MESSENGER_EXTRACT
    , QUERY_FACT_USER_CONNECTIONS_EXTRACT
    , QUERY_DIM_USERS_EXTRACT
    , QUERY_FACT_DAILY_FEED_LOAD
    , QUERY_FACT_DAILY_POSTS_LOAD
    , QUERY_FACT_DAILY_MESSENGER_LOAD
    , QUERY_FACT_USER_CONNECTIONS_LOAD
    , QUERY_DIM_USERS_LOAD    
)

# Database connections
SRC_DB_ID = 'ch_src'
DST_DB_ID = 'ch_dst'
src_hook = ClickHouseHook(clickhouse_conn_id=SRC_DB_ID)
dst_hook = ClickHouseHook(clickhouse_conn_id=DST_DB_ID)
logger = logging.getLogger(__name__)

# ==========================================================================
# DAG config
# ==========================================================================

default_args = {
    'owner': 'analytics_team',
    'depends_on_past': False,
    'start_date': datetime(2025, 9, 25),
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
    'email_on_retry': False,
}

dag_config = {
    'default_args': default_args,
    'description': 'Daily ETL pipeline for loading data from operational DB to analytics warehouse',
    'schedule': '0 3 * * *', # Runs at 3 AM daily
    'catchup': False,
    'tags': ['analytics', 'etl'],
    'max_active_runs': 1,
    'doc_md': dedent("""
    # Analytics Daily ETL with Data Validation
    
    Extracts, validates and loads daily analytics data with quality checks.
    """)
}

# ==========================================================================
# Constants for validation
# ==========================================================================

REQUIRED_COLUMNS = {
    'daily_feed': [
        'date', 'user_id', 'first_view_time', 'first_like_time', 
        'last_view_time', 'last_like_time', 'posts', 'views', 'likes',
        'avg_time_between_views', 'avg_time_between_likes', 'avg_view_to_like_seconds'
    ],
    'daily_posts': [
        'date', 'post_id', 'first_action_time', 'last_action_time', 
        'views', 'likes', 'unique_viewers', 'unique_likers',
        'avg_time_between_views', 'avg_time_between_likes', 'avg_view_to_like_seconds'
    ],
    'daily_messenger': [
        'date', 'user_id', 'first_sent_time', 'last_sent_time', 'users_sent', 
        'messages_sent', 'avg_time_between_messages_sent', 'first_received_time', 
        'last_received_time', 'users_received', 'messages_received', 
        'avg_time_between_messages_received'
    ],
    'user_connections': ['date', 'sender_id', 'receiver_id', 'messages_count'],
    'users': [
        'user_id', 'gender', 'age', 'source', 'os', 'city', 
        'country', 'exp_group', 'version'
    ]
}

# ==========================================================================
# Helper functions
# ==========================================================================

def extract_data(query: str, data_type: str) -> pd.DataFrame:
    """Common extraction logic"""
    try:
        logger.info(f"🚀 Starting {data_type} extraction...")
        records, column_types = src_hook.execute(query, with_column_types=True)
        columns = [col[0] for col in column_types]
        df = pd.DataFrame(records, columns=columns)
        logger.info(f"✅ Successfully extracted {len(df)} {data_type} records")
        logger.info(f"DataFrame shape: {df.shape}, columns: {list(df.columns)}")
        return df
    except Exception as e:
        logger.error(f"❌ Failed to extract {data_type} data: {str(e)}")
        raise

def validate_data(df: pd.DataFrame, data_type: str) -> None:
    """Common validation logic"""
    try:
        logger.info(f"🔍 Validating {data_type} data...")
        
        if df.empty:
            raise ValueError(f"❌ {data_type.capitalize()} DataFrame is empty")
        
        required_columns = REQUIRED_COLUMNS[data_type]
        missing_columns = [col for col in required_columns if col not in df.columns]
        if missing_columns:
            raise ValueError(f"❌ Missing required columns in {data_type}: {missing_columns}")
        
        logger.info(f"✅ {data_type.capitalize()} validation passed: {len(df)} records")
        
    except Exception as e:
        logger.error(f"❌ {data_type.capitalize()} validation failed: {str(e)}")
        raise

def load_data(df: pd.DataFrame, load_query: str, data_type: str) -> None:
    """Common loading logic"""
    try:
        logger.info(f"📥 Loading {len(df)} {data_type} records...")
        dst_hook.execute(load_query, df.values.tolist())  
        logger.info(f"✅ Successfully loaded {len(df)} {data_type} records")
    except Exception as e:
        logger.error(f"❌ Failed to load {data_type} records: {str(e)}")
        raise

def handle_etl_failure(context):
    """Enhanced error handling for ETL tasks"""
    task_instance = context['task_instance']
    exception = context.get('exception')
    execution_date = context['execution_date']

    logger.error(f"ETL Task {task_instance.task_id} failed on {execution_date}")
    logger.error(f"Exception: {str(exception)}")
    logger.error(f"Task try number: {task_instance.try_number}")

    # Send email notification (uses Airflow's default email config)
    try:
        task_instance.email_on_failure(subject=f"ETL Failure: {task_instance.task_id}", html_content=None)
    except Exception as e:
        logger.error(f"Failed to send failure email: {e}")
        
@dag(**dag_config)
def analytics_daily_etl():
    # ==========================================================================
    # Extract Tasks
    # ==========================================================================
    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def extract_daily_feed() -> pd.DataFrame:
        """Extracts daily feed data"""
        return extract_data(QUERY_FACT_DAILY_FEED_EXTRACT, 'daily_feed')
    
    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def extract_daily_posts() -> pd.DataFrame:
        """Extracts daily posts data"""
        return extract_data(QUERY_FACT_DAILY_POSTS_EXTRACT, 'daily_posts')
    
    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def extract_daily_messenger() -> pd.DataFrame:
        """Extracts daily messenger data"""
        return extract_data(QUERY_FACT_DAILY_MESSENGER_EXTRACT, 'daily_messenger')
    
    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def extract_user_connections() -> pd.DataFrame:
        """Extracts user connection data"""
        return extract_data(QUERY_FACT_USER_CONNECTIONS_EXTRACT, 'user_connections')
    
    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def extract_users() -> pd.DataFrame:
        """Extracts users data"""
        return extract_data(QUERY_DIM_USERS_EXTRACT, 'users')

    # ==========================================================================
    # Validate Tasks
    # ==========================================================================
    @task(
        retries=2,
        retry_delay=timedelta(minutes=2),
        on_failure_callback=handle_etl_failure
    )
    def validate_daily_feed(df_daily_feed: pd.DataFrame) -> bool:
        """Validates daily feed data quality"""
        return validate_data(df_daily_feed, 'daily_feed')

    @task(
        retries=2,
        retry_delay=timedelta(minutes=2),
        on_failure_callback=handle_etl_failure
    )
    def validate_daily_posts(df_daily_posts: pd.DataFrame) -> bool:
        """Validates daily posts data quality"""
        return validate_data(df_daily_posts, 'daily_posts')

    @task(
        retries=2,
        retry_delay=timedelta(minutes=2),
        on_failure_callback=handle_etl_failure
    )
    def validate_daily_messenger(df_daily_messenger: pd.DataFrame) -> bool:
        """Validates daily messenger data quality"""
        return validate_data(df_daily_messenger, 'daily_messenger')

    @task(
        retries=2,
        retry_delay=timedelta(minutes=2),
        on_failure_callback=handle_etl_failure
    )
    def validate_user_connections(df_user_connections: pd.DataFrame) -> bool:
        """Validates user connections data quality"""
        return validate_data(df_user_connections, 'user_connections')

    @task(
        retries=2,
        retry_delay=timedelta(minutes=2),
        on_failure_callback=handle_etl_failure
    )
    def validate_users(df_users: pd.DataFrame) -> bool:
        """Validates users dimension data quality"""
        return validate_data(df_users, 'users')
        
    # ==========================================================================
    # Load Tasks
    # ==========================================================================
    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def load_daily_feed(df_daily_feed: pd.DataFrame) -> None:
        """Loads daily feed data"""
        load_data(df_daily_feed, QUERY_FACT_DAILY_FEED_LOAD, 'daily_feed')
    
    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def load_daily_posts(df_daily_posts: pd.DataFrame) -> None:
        """Loads daily posts data"""
        load_data(df_daily_posts, QUERY_FACT_DAILY_POSTS_LOAD, 'daily_posts')
   
    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def load_daily_messenger(df_daily_messenger: pd.DataFrame) -> None:
        """Loads daily messenger data"""
        load_data(df_daily_messenger, QUERY_FACT_DAILY_MESSENGER_LOAD, 'daily_messenger')
        
    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def load_user_connections(df_user_connections: pd.DataFrame) -> None:
        """Loads user connections data"""
        load_data(df_user_connections, QUERY_FACT_USER_CONNECTIONS_LOAD, 'user_connections')
        
    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def load_users(df_users: pd.DataFrame) -> None:
        """Loads users data"""
        load_data(df_users, QUERY_DIM_USERS_LOAD, 'users')
                                       
    # ==========================================================================
    # WORKFLOW
    # ==========================================================================
    # Extract
    df_daily_feed = extract_daily_feed()
    df_daily_posts = extract_daily_posts()
    df_daily_messenger = extract_daily_messenger()
    df_user_connections = extract_user_connections()
    df_users = extract_users()

    # Validate
    feed_valid = validate_daily_feed(df_daily_feed)
    posts_valid = validate_daily_posts(df_daily_posts)
    messenger_valid = validate_daily_messenger(df_daily_messenger)
    connections_valid = validate_user_connections(df_user_connections)
    users_valid = validate_users(df_users)

    # Load facts
    load_daily_feed_task = load_daily_feed(df_daily_feed)
    load_daily_posts_task = load_daily_posts(df_daily_posts)
    load_daily_messenger_task = load_daily_messenger(df_daily_messenger)
    load_user_connections_task = load_user_connections(df_user_connections)
    
    # Load dimensions
    load_users_task = load_users(df_users)

    # Tasks dependences
    feed_valid >> load_daily_feed_task
    posts_valid >> load_daily_posts_task
    messenger_valid >> load_daily_messenger_task
    connections_valid >> load_user_connections_task
    users_valid >> load_users_task
    
# ==========================================================================
# Instantiate the DAG
# ==========================================================================
analytics_daily_etl()

# General Conclusion

- **Star Schema Designed:** Developed a star schema for the analytical database
- **Metric Queries Created:** Built queries to the product database for calculating required metrics for the analytical database
- **Star Schema Tables Implemented:** Created all necessary tables for the star schema in the analytical database
- **Data Migration Completed:** Extracted required data from the product database and loaded it into the analytical database
- **Dashboard Optimization:** Created materialized views to optimize dashboard performance
- **Automation Pipeline Built:** Developed Airflow DAG for daily incremental data loading for yesterday's data
- **Goal Achieved:** Established analytical database and ETL process for continuous data updates