In [None]:
from datetime import datetime, timedelta
import pandas as pd
from io import StringIO
import requests
from datetime import date


from airflow.decorators import dag, task
from airflow.operators.python import get_current_context


In [2]:
# Get data from clickhouse


def ch_get_data(query='Select 1',
              host='https://clickhouse.lab.karpov.courses',
              user='student',
              password='dpo_python_2020'):
    r = requests.post(host,
                      data=query.encode("utf-8"),
                      auth=(user, password),
                      verify=False)
    result = pd.read_csv(StringIO(r.text), sep='\t')
    return result

In [3]:
# Tasks default parameters
# more info by the link
# https://airflow.apache.org/docs/apache-airflow/1.10.6/_api/airflow/models/index.html#airflow.models.BaseOperator

default_args = {
    'owner': 'v-hudokormov-21',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2022, 2, 11),
}

In [87]:
def extract():
        query = """
-- We will process two tables in parallel. In the feed_actions table, we will calculate the number of views and likes
-- for each user's content. In the message_actions table, we will calculate how many messages each user receives and
-- sends, how many people he writes to, and how many people write to him. Each extraction should be in a separate task.

SELECT * 
FROM (
    SELECT *
    FROM (
        --the full list of users from message_actions
        SELECT user_id,
            any(age) AS age,
            any(gender) AS gender,
            any(os) AS os
        FROM simulator_20230120.message_actions
        GROUP BY user_id
        ) AS info_from_messages
    FULL OUTER JOIN
        (
        --the full list of users from feed_actions
        SELECT user_id,
        any(age) AS age,
        any(gender) AS gender,
        any(os) AS os
        FROM simulator_20230120.feed_actions
        GROUP BY user_id
        ) AS info_from_feed
    USING user_id, age, gender, os
    ) AS user_dimensions
RIGHT JOIN 
    (-- right join to the table with metrics information
        SELECT *
        FROM 
        (SELECT user_id, 
        views,
        likes,
        messages_sent,
        messages_received
        FROM (
        -- list of user likes&views from feed
        SELECT
            user_id,
            countIf(action = 'view') AS views,
            countIf(action = 'like') AS likes
        FROM simulator_20230120.feed_actions
        WHERE toDate(time) = toDate(today())-1
        GROUP BY user_id
            ) AS user_likes_views
    FULL OUTER JOIN
        (
        -- list of user messages sent/received 
        SELECT user_id,
            q1.messages_sent,
            q2.messages_received
        FROM (
            -- list of user messages sent
            SELECT user_id,
                    COUNT(user_id) AS messages_sent
            FROM simulator_20230120.message_actions
            WHERE toDate(time) = today() - 1
            GROUP BY user_id
            ) AS q1
        FULL OUTER JOIN (
            -- list of user messages received
            SELECT reciever_id AS user_id,
                    COUNT(reciever_id) AS messages_received
            FROM simulator_20230120.message_actions
            WHERE toDate(time) = today() -1
            GROUP BY user_id
            ) AS q2
        USING user_id
        ) AS user_messages
        USING user_id
        ) AS user_likes_views_messages
        FULL OUTER JOIN 
        (
        SELECT user_id,
        users_sent,
        users_received
        FROM (
            -- count users_sent - how much users received the message from the current user
            SELECT user_id, COUNT(DISTINCT reciever_id) AS users_sent
            FROM simulator_20230120.message_actions
            WHERE toDate(time) = today() - 1
            GROUP BY user_id
            ) AS user_messages_sent
        FULL OUTER JOIN 
            (
            -- count users_received - how much users sent the messages to the current user
            SELECT reciever_id AS user_id, users_received
            FROM (
                SELECT reciever_id, COUNT(DISTINCT user_id) AS users_received
                FROM simulator_20230120.message_actions
                WHERE toDate(time) = today() - 1
                GROUP BY reciever_id
                )
            ) AS user_messages_received
        USING user_id
        ) AS user_likes_views_messages_sender
        USING user_id
    ) AS user_messages_views_combined
    USING user_id
format TSVWithNames
        """
        data = ch_get_data(query=query)
        return data

In [88]:
df = extract()

In [92]:
df.shape

(20453, 10)

In [91]:
df.head()

Unnamed: 0,user_id,age,gender,os,views,likes,messages_sent,messages_received,users_sent,users_received
0,116662,34,1,iOS,0,0,5,0,2,0
1,116156,28,1,iOS,16,6,0,0,0,0
2,123072,31,1,Android,75,14,0,0,0,0
3,115112,25,1,Android,0,0,4,5,4,5
4,107534,21,1,Android,25,4,0,0,0,0


In [7]:
yesterday = date.today() - timedelta(days = 1)
df_age = df[['age', 'views', 'likes', 'messages_received', 'messages_sent', 'users_received', 'users_sent']]\
    .groupby('age', as_index=False)\
    .sum()
df_age['event_date'] = yesterday
df_age['dimension'] = 'age'
df_age = df_age.rename(columns = {'age':'dimension_value'})
df_age = df_age[['event_date', 'dimension', 'dimension_value', 'views', 'likes', 'messages_received', 'messages_sent', 'users_received', 'users_sent']]

Date - event_date  
Slice name - dimension  
Slice value - dimension_value  
Number of views - views  
Number of likes - likes  
Number of received messages - messages_received  
Number of sent messages - messages_sent  
Number of users who received messages - users_received  
Number of users who sent messages - users_sent  
The breakdown is by OS, gender, and age.  

In [8]:
df_age.head()

Unnamed: 0,event_date,dimension,dimension_value,views,likes,messages_received,messages_sent,users_received,users_sent
0,2023-02-12,age,14,11857,2508,59,210,57,127
1,2023-02-12,age,15,24251,5262,277,470,229,372
2,2023-02-12,age,16,27648,6044,321,565,296,511
3,2023-02-12,age,17,38108,8130,499,670,467,642
4,2023-02-12,age,18,41114,9083,522,621,502,594


In [12]:
data =df

In [13]:
#context = get_current_context()
#date_previuos_day  = context['ds'] - timedelta(days = 1)

yesterday = date.today() - timedelta(days = 1)
df_gender = data[['gender', 'views', 'likes', 'messages_received', 'messages_sent', 'users_received', 'users_sent']]\
            .groupby('gender', as_index=False)\
            .sum()
df_gender['event_date'] = yesterday
df_gender['dimension'] = 'gender'
df_gender = df_gender.rename(columns = {'gender':'dimension_value'})
df_gender = df_gender[['event_date', 'dimension', 'dimension_value', 'views', 'likes', 'messages_received', 'messages_sent', 'users_received', 'users_sent']]


In [71]:
date_previuos_day  = '2023-02-12'
df_age = data[['age', 'views', 'likes', 'messages_received', 'messages_sent', 'users_received', 'users_sent']]\
        .groupby('age', as_index=False)\
        .sum()
df_age['event_date'] = date_previuos_day
df_age['dimension'] = 'age'
df_age = df_age.rename(columns = {'age':'dimension_value'})
df_age = df_age[['event_date', 'dimension', 'dimension_value', 'views', 'likes', 'messages_received', 'messages_sent', 'users_received', 'users_sent']]


In [84]:
df_age.head()

Unnamed: 0,event_date,dimension,dimension_value,views,likes,messages_received,messages_sent,users_received,users_sent
0,2023-02-12,age,14,11857,2508,59,210,57,127
1,2023-02-12,age,15,24251,5262,277,470,229,372
2,2023-02-12,age,16,27648,6044,321,565,296,511
3,2023-02-12,age,17,38108,8130,499,670,467,642
4,2023-02-12,age,18,41114,9083,522,621,502,594


In [14]:
df_gender

Unnamed: 0,event_date,dimension,dimension_value,views,likes,messages_received,messages_sent,users_received,users_sent
0,2023-02-12,gender,0,333563,72714,5771,5728,4524,4570
1,2023-02-12,gender,1,406092,88082,7129,7172,5706,5660


In [15]:
#context = get_current_context()
#date_previuos_day  = context['ds'] - timedelta(days = 1)

yesterday = date.today() - timedelta(days = 1)
df_os = data[['os', 'views', 'likes', 'messages_received', 'messages_sent', 'users_received', 'users_sent']]\
            .groupby('os', as_index=False)\
            .sum()
df_os['event_date'] = yesterday
df_os['dimension'] = 'os'
df_os = df_os.rename(columns = {'os':'dimension_value'})
df_os = df_os[['event_date', 'dimension', 'dimension_value', 'views', 'likes', 'messages_received', 'messages_sent', 'users_received', 'users_sent']]


In [16]:
df_os

Unnamed: 0,event_date,dimension,dimension_value,views,likes,messages_received,messages_sent,users_received,users_sent
0,2023-02-12,os,Android,487219,105673,7564,8164,6276,6470
1,2023-02-12,os,iOS,252436,55123,5336,4736,3954,3760


In [85]:
df = pd.concat([df_age, df_gender, df_os], ignore_index=True)

In [68]:
create_table_query = """
        CREATE TABLE IF NOT EXISTS test.v_khudokormov_etl_result
            (
            dimension String,
            dimension_value String,
            event_date Date,
            likes UInt64,
            views UInt64,
            messages_received UInt64,
            messages_sent UInt64,
            users_received UInt64,
            users_sent UInt64
            )
            ENGINE = MergeTree()
            ORDER BY event_date
        """

In [69]:
# creating a table if not exist
ph.execute(create_table_query, connection=connection_test)

b''

In [56]:
ph.to_clickhouse(df = df, table = 'test_connection', index = False, connection=connection_test)

4