In [None]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from clickhouse_driver import Client
from datetime import datetime, timedelta

# Параметры подключения к ClickHouse
CLICKHOUSE_CONN_ID = 'clickhouse_conn_id'
SOURCE_TABLE = 'smlogs'
TARGET_TABLE = 'smboard'

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'smlogs_to_smboard_clickhouse',
    default_args=default_args,
    description='Extract data from smlogs, transform and load into smboard in ClickHouse',
    schedule_interval='@hourly',
    catchup=False
)

def transform_and_load_data():
    # Подключение к ClickHouse
    clickhouse_conn = Client(host='nz1lf3twa0.europe-west4.gcp.clickhouse.cloud', port=8443, user='user', password='password', database='SM')
    
    # Преобразование и загрузка данных
    query = f'''
        INSERT INTO {TARGET_TABLE} 
            (timestamp, 
            user, 
            communication_number, 
            communication_id, 
            script_id, script_name, 
            mrf, 
            client_mrf,
            script_owner,
            current_script_owner,
            script_responsible,
            current_script_responsible,
            crm_departament,
            ACCOUNT_NUMBER,
            CALLER_ID,
            COMMUNICATION_THEME,
            COMMUNICATION_DETAIL,
            COMMUNICATION_RESULT)
        SELECT 
            toDateTime(floor(toInt64(toFloat64(s.timestamp)))),
            user,
            communication_number,
            communication_id,
            script_id,
            script_name,
            mrf,
            client_mrf,
            script_owner,
            current_script_owner,
            script_responsible,
            current_script_responsible,
            crm_departament,
            JSONExtractString(s.parameters, 'ACCOUNT_NUMBER') as ACCOUNT_NUMBER,
            JSONExtractString(s.parameters, 'CALLER_ID') as CALLER_ID,
            JSONExtractString(s.parameters, 'COMMUNICATION_THEME') as COMMUNICATION_THEME,
            JSONExtractString(s.parameters, 'COMMUNICATION_DETAIL') as COMMUNICATION_DETAIL,
            JSONExtractString(s.parameters, 'COMMUNICATION_RESULT') as COMMUNICATION_RESULT
        FROM {SOURCE_TABLE}
    '''
    clickhouse_conn.execute(query)

transform_and_load_task = PythonOperator(
    task_id='transform_and_load_data',
    python_callable=transform_and_load_data,
    dag=dag
)

transform_and_load_task