Задание:

В данных вы найдете информацию о событиях, которые произошли с объявлением 121288 за два дня. Рассчитайте следующие метрики в разрезе каждого дня: количество показов, количество кликов, CTR, сумма потраченных денег за 2019-04-01 и 2019-04-02, найдите процентную разницу между этими метриками. 

Создайте текстовый файл, в котором будет собрана информация о том, какие метрики наблюдаются 2 апреля, а также, на сколько процентов они уменьшились по сравнению со вчера. Отправьте получившийся текст к себе в личные сообщения во ВКонтакте в виде сообщения со сводкой метрик. Все предыдущие шаги оформите в виде исполняемого скрипта (скриптов) и скрипта для DAG-а, в котором при помощи BashOperator-а (или других операторов на ваше усмотрение) будет вызываться исполняемый скрипт (скрипты). В расписании для крона укажите каждый понедельник в 12 утра. Так, чтобы ваш скрипт с рассчетом дневных метрик отправлялся вам в личку каждый понедельник в 12 утра из Airflow.

Импорт библиотек

In [None]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import pandas as pd
import numpy as np
from datetime import datetime
import vk_api
import random

Дефолтные аргументы

In [None]:
default_args = {
    'owner': 'dbabynicheva',
    'depends_on_past': False,
    'start_date': datetime(2020, 10, 1),
    'retries': 0
}

Создание DAG

In [None]:
dag = DAG(dag_id='dag_vk_project_dbabynicheva',
          default_args=default_args,
          catchup=False,
          schedule_interval='00 12 * * 1')

Функция, которая делает подсчет метрик и отправляет их в vk по api

In [None]:
def vk_report():

    #  считывание данных
    input_data = pd.read_csv('https://docs.google.com/spreadsheets/d/e/2PACX-1vR-ti6Su94955DZ4Tky8EbwifpgZf_dTjpBdiVH0Ukhsq94jZdqoHuUytZsFZKfwpXEUCKRFteJRc9P/pub?gid=889004448&single=true&output=csv')

    print('Данные импортированны')

    # подсчет уникальных рекламных событий
    events_number = input_data \
        .groupby(['date', 'event'], as_index=False) \
        .agg({'ad_id': 'count'}) \
        .rename(columns={'ad_id': 'events_count'}).reset_index()

    # транспонирование в pivot таблицу
    ad_data = events_number.pivot(index='date', columns='event', values='events_count')

    # подсчет метрик: CTR, суммы потраченных денег, процентной разницы
    ad_data['CTR'] = ad_data['click'] / ad_data['view']
    ad_data['budget'] = input_data.ad_cost[1]/1000 * ad_data.view
    click_ratio = round((ad_data.click[1] - ad_data.click[0]) / ad_data.click[0] * 100)
    view_ratio = round((ad_data.view[1] - ad_data.view[0]) / ad_data.view[0] * 100)
    CTR_ratio = round((ad_data.CTR[1] - ad_data.CTR[0]) / ad_data.CTR[0] * 100)
    budget_ratio = round((ad_data.budget[1] - ad_data.budget[0]) / ad_data.budget[0] * 100)

    print('Рекламные метрики рассчитаны'

Создание отчета vk

In [None]:
vk_message = f'''Отчет по объявлению 121288 за 2 апреля\n
            Траты: {ad_data.budget[1]} ({budget_ratio}%)
            Показы: {ad_data.view[1]} ({view_ratio}%)
            Клики: {ad_data.click[1]} ({click_ratio}%)
            CTR: {ad_data.CTR[1]} ({CTR_ratio}%)'''

    print('Отчет сформирован')

Отправка отчета в vk

In [None]:
token = '_____'
    chat_id = 1
    vk_session = vk_api.VkApi(token=token)
    vk = vk_session.get_api()

    vk.messages.send(
        chat_id=chat_id,
        random_id=random.randint(1, 2 ** 31),
        message=vk_message)

    print('Отчет отправлен')

Создаем таску

In [None]:
t1 = PythonOperator(task_id='vk_report', # создаем таску
                        python_callable=vk_report, # передаем функцию в первую таску для DAG'a
                        dag=dag) # передаем dag