In [1]:
import time
import requests
from bs4 import BeautifulSoup
from selenium import webdriver
import lxml
import os
import json
from tqdm import tqdm
from selenium.webdriver.chrome.service import Service

import pandas as pd
import gspread
from oauth2client.service_account import ServiceAccountCredentials
from datetime import date

In [2]:
# указать пользователя
user = 'MerinovDV'
path_to_credential = f'C:/Users/{user}/Downloads/auto-monitoring-367212-64ec4ad9d3a5.json' 

In [3]:
# Данные для доступа к Google Spreadsheets

# Specify path to your file with credentials
path_to_credential = path_to_credential 

# Specify name of table in google sheets
table_name = 'Сбор данных для презы hh2022'

scope = ['https://spreadsheets.google.com/feeds',
         'https://www.googleapis.com/auth/drive']

credentials = ServiceAccountCredentials.from_json_keyfile_name(path_to_credential, scope)

gs = gspread.authorize(credentials)
work_table = gs.open(table_name)


In [4]:
def get_links(url, folder, user, add_folder=None, prof_obl=False):
    
    """
    функция проходит с помощью selenium по всем ссылкам, считает кол-во страниц и на основании этого собирает
    все страницы с вакансиями для заданного запроса. После этого сохраняет страницы в папки
    
    url - str, ссылка с запросом
    folder - str, название папки
    prof_obl - boolб если парсим из вкладки профобласти - True
    user - str, текущий пользователь
    """
    
    options = webdriver.ChromeOptions()
    options.add_argument("user-agent=Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:84.0) Gecko/20100101 Firefox/84.0")
    options.add_argument("--disable-blink-features=AutomationControlled")

    s = Service(executable_path=f"C:/Users/{user}/Parsing_folder/chromedriver/chromedriver.exe")
    driver = webdriver.Chrome(service=s, options=options)
    try:
        driver.get(url)
        soup = BeautifulSoup(driver.page_source, 'lxml')
        try:
            pages = int(soup.find_all(attrs={'class':'bloko-button','rel': 'nofollow', 'data-qa':'pager-page'})[-1].getText())
        except:
            pages = 1
        for page in range(pages):
            if prof_obl:
                driver.get(url+ '?page=' + str(page))
                time.sleep(0.25)
            else:
                driver.get(url+ '&page=' + str(page))
                time.sleep(0.25)
            
            if add_folder is not None:
                with open(f"C:/Users/{user}/hh/{add_folder}/{folder}/page_{page}_{date.today()}.html", 
                            "w", encoding='utf-8') as file:
                    file.write(driver.page_source)
            else:
                with open(f"C:/Users/{user}/hh/{folder}/page_{page}_{date.today()}.html", 
                            "w", encoding='utf-8') as file:
                    file.write(driver.page_source)
    except Exception as ex:
                print(ex)

    finally:
        driver.close()
        driver.quit()

In [5]:
def get_list_of_vacancy(folder, user, add_folder=None):
    
    """
    вовзращает список с ссылками на на вакансии
    folder - str, название папки
    user - str, текущий пользователь
    """
    
    links_list = []
    if add_folder is None:
        path = f"C:/Users/{user}/hh/{folder}"
    else:
        path = f"C:/Users/{user}/hh/{add_folder}/{folder}"
    for html in os.listdir(path):
        with open(f"{path}/{html}", encoding='utf-8') as file:
            src = file.read()

            try:
                soup = BeautifulSoup(src, "lxml")
                result = soup.find_all("a", class_="serp-item__title")
                if len(result) != 0:
                    for i in range(len(result)):
                        link = result[i].get('href')
                        links_list.append(link)
                else:
                    print(f'[INFO_PARSER] нет данных по вакансиям {html}')

            except:
                print('[INFO] проблема с парсингом soup')  
    return links_list

In [6]:
def get_vacancy_info(dict_with_links_list):
    
    """
    функция возвращает DataFrame с описанием вакансии
    dict_with_links_list - dict, словарь, в котором ключ - папка, а значение список ссылок на вакансии по заданному ключу
    """
    
    headers = {"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36"}
    
    dict_with_df = {}
    
    for key in dict_with_links_list:
        name = []
        salary_from = []
        salary_to = []
        salary_currency = []
        salary_gross = []
        experience_name = []
        schedule_name = []
        employment_name = []
        description = []
        key_skills = []
        employer = []
        published_at = []
        url = []
        views = []
        
        for link in tqdm(dict_with_links_list[key]):
            flag = False
            retry = 0
            while flag == False:
                r = requests.get(link, headers=headers).text
                soup = BeautifulSoup(r, 'lxml')
                if soup.find(attrs={'data-qa': 'vacancy-title'}) is not None:
                    
                    # название вакансии
                    title = soup.find(attrs={'data-qa': 'vacancy-title'})
                    if title is not None:
                        name.append(title.getText())
                    else:
                        name.append('null')

                    # зарплата
                    salary = soup.find(attrs={'data-qa': 'vacancy-salary'})
                    if salary is not None:
                        salary = salary.getText().replace(u'\xa0', u'').split(' ')
                        if 'от' in salary:
                            salary_from_i = salary.index('от') + 1
                            salary_from.append(salary[salary_from_i])
                        else:
                            salary_from.append('null')
                        if 'до' in salary:
                            salary_to_i = salary.index('до') + 1
                            if salary[salary_to_i] == 'вычета':
                                salary_to.append('null')
                            else:
                                salary_to.append(salary[salary_to_i])
                        else:
                            salary_to.append('null')
                    else:
                        salary_from.append('null')
                        salary_to.append('null')

                    # валюта
                    sal_currency = soup.find(attrs={'data-qa': 'vacancy-salary'})
                    if sal_currency is not None:
                        sal_currency = sal_currency.getText().split(' ')
                        if 'руб.' in sal_currency:
                            currency = 'RUB'
                        elif 'USD' in sal_currency:
                            currency = 'USD'
                        elif 'EUR' in sal_currency:
                            currency = 'EUR'
                        elif 'KZT' in sal_currency:
                            currency = 'KZT'
                        elif 'бел. руб' in sal_currency:
                            currency = 'BYN'
                        elif 'KGS' in sal_currency:
                            currency = 'KGS'
                        elif 'сум' in sal_currency:
                            currency = 'UZS'
                        elif 'AZN' in sal_currency:
                            currency = 'AZN'
                        else:
                            currency = 'null'
                        salary_currency.append(currency)
                    else:
                        salary_currency.append('null')

                    # тип зарплаты
                    salary_type = soup.find('span', attrs={'class': 'vacancy-salary-compensation-type'})
                    if salary_type is not None:
                        salary_gross.append(salary_type.getText().strip())
                    else: 
                            salary_gross.append('null')

                    # experience_name
                    experience = soup.find(attrs={'data-qa': 'vacancy-experience'})
                    if experience is not None:
                        experience_name.append(experience.getText())
                    else: 
                        experience_name.append('null')

                    # employment_name, schedule_name
                    employment_schedule = soup.find(attrs={'data-qa': 'vacancy-view-employment-mode'})
                    if employment_schedule is not None:
                        employment_name.append(employment_schedule.getText().split(', ')[0])
                        schedule_name.append(employment_schedule.getText().split(', ')[1])
                    else: 
                        employment_name.append('null')
                        schedule_name.append('null')

                    # описание вакансии
                    descrip = soup.find('div', class_='vacancy-section')
                    if descrip is not None:
                        description.append(descrip.getText())
                    else:
                        description.append('null')

                    # key_skills
                    key_skills_i = []
                    skills = soup.find_all(attrs={'class': 'bloko-tag bloko-tag_inline'})
                    if len(skills) != 0:
                        for i in soup.find_all(attrs={'class': 'bloko-tag bloko-tag_inline'}):
                            key_skills_i.append(i.getText())
                        key_skills.append(key_skills_i)
                    else:
                        key_skills.append('null')

                    # название компании
                    emp = soup.find('span', class_='vacancy-company-name')
                    if emp is not None:
                        employer.append(emp.getText())
                    else:
                        employer.append('null')

                    # кол-во просмотров сейчас
                    view = soup.find('span', class_='vacancy-viewers-count')
                    if view is not None:
                        views.append(view.getText().replace(u'\xa0', u' ').split(' ')[0])
                    else:
                        views.append('null')

                    # опубликована
                    dt = soup.find('p', class_='vacancy-creation-time-redesigned')
                    if dt is not None:
                        published_at.append(dt.getText().replace(u'\xa0', u' ')\
                                                    .split('опубликована')[1]\
                                                    .split('в')[0].strip())
                    else:
                        published_at.append('null')

                    url.append(link)

                    time.sleep(0.25)

                    
                    flag = True

                    
                retry += 1
                if retry == 5:
                    break

        output = pd.DataFrame()
        output['name'] = name
        output['salary_from'] = salary_from
        output['salary_to'] = salary_to
        output['salary_currency'] = salary_currency
        output['salary_gross'] = salary_gross
        output['experience_name'] = experience_name
        output['schedule_name'] = schedule_name
        output['employment_name'] = employment_name
        output['description'] = description
        output['key_skills'] = key_skills
        output['employer'] = employer
        output['published_at'] = published_at
        output['url'] = url
        output['views'] = views
        output['type'] = key
        
        dict_with_df[key] = output

    return dict_with_df

In [7]:
# получение словаря с кол-вом ключевых навыков
def count_skills(column, to_df=True):
    
    """
    функция возвращает словарь: ключ - навык, значение - кол-во в выгрузке
    
    column - столбец в df, который содержит ключевые навыки
    to_df - bool, если
                    True - возвращает df
                    False - возвращает словарь
    """
    dct = {}
    for skill_list in column:
        for skill in skill_list:
            if skill not in dct.keys():
                dct[skill] = 1
            elif skill in dct.keys():
                dct[skill] += 1
    if to_df:
        df_skills = pd.DataFrame.from_dict(dct, orient='index')\
            .rename(columns={0:'num_skills'})\
            .sort_values('num_skills', ascending=False)
        return df_skills
    else:
        return dct

In [8]:
def upload_df(df, google_sheet, create_sheet=False):
    
    """
    функция загружает df в Google Spreadsheets
    
    df - DataFrame
    google_sheet - str, навзание листа
    create_sheet - bool, если
                    True - создает новый лист
                    False - загружает данные в текущий лист
    """
    
    df.key_skills = df.key_skills.astype('str')
    df.description = df.description.str.replace(u'\xa0', u' ').str.strip().str.replace(u'\n', u' ')
    
    if create_sheet:
        ws = work_table.add_worksheet(title=google_sheet, rows=100, cols=20)
    else:
        pass
    
    worksheet = work_table.worksheet(google_sheet)
    worksheet.append_row(df.columns.tolist(), value_input_option='USER_ENTERED')
    worksheet.append_rows(df.values.tolist(), value_input_option='USER_ENTERED')
    
    return print (f'Данные загружены на лист {google_sheet}')

# Пример выгрузки вакансий

In [46]:
# Получаем список ссылок
worksheet = work_table.worksheet('Ссылки')
val = worksheet.get_values('B204:C213')
columns = val.pop(0)
val = pd.DataFrame(val, columns=columns)

# создаем из них словарь
g_links_2 = dict(zip(val['название'].values, val['ссылки'].values))

# название папки для выгрузки
add_folder = 'аналитика'

# создаем папки для страниц, если таких папок еще не было
for new_folder in val['название'].values:
    try:
        # Create target Directory
        os.mkdir(f'C:/Users/{user}/hh/{add_folder}/{new_folder}')
        print("Directory " , new_folder ,  " Created ") 
    except FileExistsError:
        print("Directory " , new_folder ,  " already exists")
        
# проходим по всем ключам словаря (папка - url) и вызываем функцию
for key in g_links_2:
    get_links(g_links_2[key], key, add_folder=add_folder, user=user)
    
# создаем новый словарь, в котором ключ - папка, а значение список ссылок на вакансии по заданному ключу
dict_with_links_list_g_2 = {}
for key in g_links_2:
    dict_with_links_list_g_2[key] = get_list_of_vacancy(key, add_folder=add_folder, user=user)
    
all_df = get_vacancy_info(dict_with_links_list_g_2)

Directory  Системный аналитик  Created 
Directory  Бизнес - аналитик  Created 
Directory  Аналитик 1С  Created 
Directory  Аналитик данных  Created 
Directory  Аналитик информационной безопасности  Created 
Directory  Маркетолог-аналитик  Created 
Directory  Продуктовый аналитик  Created 
Directory  Data scientist  Created 
Directory  BI аналитик  Created 


100%|██████████████████████████████████████████████████████████████████████████████| 1157/1157 [16:13<00:00,  1.19it/s]
100%|████████████████████████████████████████████████████████████████████████████████| 879/879 [12:17<00:00,  1.19it/s]
100%|████████████████████████████████████████████████████████████████████████████████| 424/424 [05:56<00:00,  1.19it/s]
100%|████████████████████████████████████████████████████████████████████████████████| 259/259 [03:33<00:00,  1.21it/s]
100%|██████████████████████████████████████████████████████████████████████████████████| 47/47 [00:40<00:00,  1.17it/s]
100%|████████████████████████████████████████████████████████████████████████████████| 137/137 [01:51<00:00,  1.23it/s]
100%|████████████████████████████████████████████████████████████████████████████████| 163/163 [02:13<00:00,  1.22it/s]
100%|████████████████████████████████████████████████████████████████████████████████| 199/199 [02:40<00:00,  1.24it/s]
100%|███████████████████████████████████

In [47]:
best_df = pd.DataFrame(columns=['name', 'salary_from', 'salary_to', 'salary_currency', 'salary_gross',
       'experience_name', 'schedule_name', 'employment_name', 'description',
       'key_skills', 'employer', 'published_at', 'url', 'views', 'type'])
for key in all_df:
    best_df = pd.concat([best_df, all_df[key]])

In [61]:
skills = pd.DataFrame()
for key in all_df:
    t = count_skills(all_df[key].key_skills).assign(type = key)
    skills = pd.concat([t, skills])
    

In [62]:
skills

Unnamed: 0,num_skills,type
SQL,54,BI аналитик
l,30,BI аналитик
Power BI,28,BI аналитик
Python,25,BI аналитик
MS SQL,20,BI аналитик
...,...,...
Казначейство,1,Системный аналитик
Финансовый рынок,1,Системный аналитик
Депозитарная деятельность,1,Системный аналитик
Маркетинговые исследования,1,Системный аналитик


In [48]:
upload_df(best_df, 'аналитика', create_sheet=True)

Данные загружены на лист аналитика


In [65]:
ws = work_table.add_worksheet(title='аналитика_skills', rows=100, cols=20)
worksheet = work_table.worksheet('аналитика_skills')
worksheet.append_row(skills.reset_index().columns.tolist(), value_input_option='USER_ENTERED')
worksheet.append_rows(skills.reset_index().values.tolist(), value_input_option='USER_ENTERED')

{'spreadsheetId': '1DawPp9-eSygPpFe95pgOtVLF4x6lUaimtaH0ykg0DDA',
 'tableRange': "'аналитика_skills'!A1:C1",
 'updates': {'spreadsheetId': '1DawPp9-eSygPpFe95pgOtVLF4x6lUaimtaH0ykg0DDA',
  'updatedRange': "'аналитика_skills'!A2:C3445",
  'updatedRows': 3444,
  'updatedColumns': 3,
  'updatedCells': 10332}}

In [None]:
from datetime import datetime, timedelta
import pandas as pd
from airflow.decorators import dag, task
import pandahouse as ph
import telegram
import matplotlib.pyplot as plt
import seaborn as sns
import io

connection = {'host': 'https://clickhouse.lab.karpov.courses',
                      'database':'simulator_20221120',
                      'user':'student', 
                      'password':'dpo_python_2020'
                     }

from matplotlib import style
sns.set_theme(({**style.library["fivethirtyeight"]}))
plt.rcParams["figure.figsize"] = (15,8)


my_token = '5831544767:AAE-9VA_reObxmIb_FDZYeh9N4TiCslx-yc' 
bot = telegram.Bot(token=my_token) 

chat_id = -817095409

default_args = {
    'owner': 'd-merinov-24',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2022, 12, 16),
    'schedule_interval': '0 11 * * *'
    }

@dag(default_args=default_args, catchup=False)
def lesson_7_dag_2_merinov():

    @task()
    def get_dau_df_2():
        query = '''
                    SELECT COUNT (DISTINCT user_id ) as uniq_users,
                    day, os, gender, age, source 
                    FROM (

                            SELECT user_id,
                              toStartOfDay(toDateTime(time)) AS  day, os, gender, age, source 
                            FROM simulator_20221120.feed_actions 
                            GROUP BY user_id, day,os, gender, age, source 
                            HAVING day > (today()-1) - 7 and day != today()

                            UNION ALL

                            SELECT user_id,
                              toStartOfDay(toDateTime(time)) AS  day, os, gender, age, source 
                            FROM simulator_20221120.message_actions 
                            GROUP BY user_id, day, os, gender, age, source 
                            HAVING day > (today()-1) - 7 and day != today()
                              )
                    GROUP BY day, os, gender, age, source 
                '''
        dau_df = ph.read_clickhouse(query=query, connection=connection)
        dau_df.day = dau_df.day.dt.date
        return dau_df

    @task()
    def get_dau_info(dau_df):
        dau = dau_df.groupby('day', as_index=False).agg({'uniq_users':'sum'})
        dau['growth_rate'] = dau.uniq_users.pct_change()

        date = dau.day.max()
        dau_value = dau.query('day == @dau_df.day.max()').iloc[0][1]
        diff = round((dau.query('day == @dau_df.day.max()').iloc[0][2]*100), 2)

        if diff > 0:
            change = 'больше'
        else:
            change = 'меньше'

        title = '👥<b>Пользователи</b>'

        text_1 = f'За {date} DAU составил {dau_value}, что на {abs(diff)}% {change}, чем днем ранее.'

        source = dau_df.groupby(['day', 'source'], as_index=False)\
                    .agg({'uniq_users':'sum'})\
                    .sort_values('day')
        source['ads_growth_rate'] = source.query('source == "ads"').uniq_users.pct_change()
        source['organic_growth_rate'] = source.query('source == "organic"').uniq_users.pct_change()

        ads_users = source.query('day == @dau_df.day.max() and source == "ads"').iloc[0][2]
        organic_users = source.query('day == @dau_df.day.max() and source == "organic"').iloc[0][2]

        ads_growth = round(source.query('day == @dau_df.day.max() and source == "ads"').iloc[0][3] * 100, 2)
        organic_growth = round(source.query('day == @dau_df.day.max() and source == "organic"').iloc[0][4] * 100, 2)

        text_2 = f'Их них {ads_users} ({ads_growth}% к пред. дню) пользователей с рекламы и {organic_users} ({organic_growth}% к пред. дню) с органического трафика. '

        os = dau_df.groupby(['day', 'os'], as_index=False)\
                    .agg({'uniq_users':'sum'})\
                    .sort_values('day')
        os['androind_growth_rate'] = os.query('os == "Android"').uniq_users.pct_change()
        os['iOS_growth_rate'] = os.query('os == "iOS"').uniq_users.pct_change()

        android_users = os.query('day == @dau_df.day.max() and os == "Android"').iloc[0][2]
        ios_users = os.query('day == @dau_df.day.max() and os == "iOS"').iloc[0][2]

        android_growth = round(os.query('day == @dau_df.day.max() and os == "Android"').iloc[0][3] * 100, 2)
        ios_growth = round(os.query('day == @dau_df.day.max() and os == "iOS"').iloc[0][4] * 100, 2)

        text_3 = f'Лентой воспользовались {android_users} ({android_growth}% к пред. дню) пользователей с Android и {ios_users} пользователей с iOS({ios_growth}%  к пред. дню). '

        return title + '\n' + '\n' + text_1 + '\n' + text_2 + '\n' + text_3 + '\n'

    @task()
    def get_df_new_users():
        query = '''with mess as (Select    user_id,
                               min(toDate(time)) as bd,
                           os, gender, age, source
                            From simulator_20221120.message_actions 
                            Group by user_id, os, gender, age, source
                            having bd > (today()-1) - 7 and bd != today()),
                    feed as 
                                (Select    user_id,
                                           min(toDate(time)) as bd,
                                           os, gender, age, source
                                From simulator_20221120.feed_actions 
                                Group by user_id, os, gender, age, source
                                having bd > (today()-1) - 7 and bd != today())

            select count(distinct user_id) as users, bd, os,gender, age, source from feed l
            full Join mess r on l.user_id = r.user_id 
                        AND l.bd=r.bd 
                        AND l.os=r.os 
                        AND l.gender=r.gender 
                        AND l.age=r.age 
                        AND l.source = r.source
            group by bd, os,gender, age, source
            ORDER BY bd DESC'''
        df_new_users = ph.read_clickhouse(query=query, connection=connection)
        df_new_users.bd = df_new_users.bd.dt.date
        return df_new_users
    
    @task()
    def get_info_new_users(df_new_users):

        new_users = df_new_users.groupby('bd', as_index=False).agg({'users':'sum'})
        new_users['growth_rate'] = new_users.users.pct_change()

        date = new_users.bd.max()
        new_users_value = new_users.query('bd == @df_new_users.bd.max()').iloc[0][1]
        diff = round((new_users.query('bd == @df_new_users.bd.max()').iloc[0][2]*100), 2)

        if diff > 0:
            change = 'больше'
        else:
            change = 'меньше'

        title = "🆕<b>Новые пользователи</b>"

        text_1 = f'За день пришло {new_users_value} новых пользователей, что на {abs(diff)}% {change}, чем днем ранее.'

        source = df_new_users.groupby(['bd', 'source'], as_index=False)\
                    .agg({'users':'sum'})\
                    .sort_values('bd')

        source['ads_growth_rate'] = source.query('source == "ads"').users.pct_change()
        source['organic_growth_rate'] = source.query('source == "organic"').users.pct_change()

        ads_users = source.query('bd == @df_new_users.bd.max() and source == "ads"').iloc[0][2]
        organic_users = source.query('bd == @df_new_users.bd.max() and source == "organic"').iloc[0][2]

        ads_growth = round(source.query('bd == @df_new_users.bd.max() and source == "ads"').iloc[0][3] * 100, 2)
        organic_growth = round(source.query('bd == @df_new_users.bd.max() and source == "organic"').iloc[0][4] * 100, 2)

        text_2 = f'Их них {ads_users} ({ads_growth}% к пред. дню) пользователей с рекламы и {organic_users} ({organic_growth}% к пред. дню) с органического трафика. '

        df_new_users['age_cut'] = pd.cut(df_new_users.age, [0, 15, 21, 27, 35, 45, 60, 70, 150])

        male = df_new_users.groupby(['gender', 'bd'])['users'].sum()\
                        .to_frame().reset_index()\
                        .query('bd == @df_new_users.bd.max() and gender == 1')\
                        .iloc[0][2]
        female = df_new_users.groupby(['gender', 'bd'])['users'].sum()\
                        .to_frame().reset_index()\
                        .query('bd == @df_new_users.bd.max() and gender == 0')\
                        .iloc[0][2]

        age = df_new_users.groupby(['age_cut', 'bd'])['users'].sum()\
                        .to_frame().reset_index()\
                        .sort_values(['bd', 'users'], ascending=False)\
                        .iloc[0][0]
        male_share = round(male/(female+male)*100)
        female_share = round(female/(female+male)*100)

        text_3 = f'Среди новых пользователей мужчин - {male} ({male_share}%) человек, девушек - {female} ({female_share}%) человек.  Наибольшее число новых пользователей в возрасте {age}'


        return title+ '\n' + '\n' + text_1 + '\n' + text_2 + '\n' + text_3
    
    @task()
    def get_likes_views_df():
            query = '''SELECT toStartOfDay(toDateTime(time)) AS day,
                           count(user_id) as actions,
                           action 
                      FROM simulator_20221120.feed_actions
                      WHERE day > (today()-1) - 7 and day != today()
                      GROUP BY toStartOfDay(toDateTime(time)), action
                      ORDER BY day DESC '''

            likes_views_df = ph.read_clickhouse(query=query, connection=connection)
            likes_views_df.day = likes_views_df.day.dt.date
            return likes_views_df

    @task()
    def get_messages_df():
            query = '''SELECT toStartOfDay(toDateTime(time)) AS day,
                           count(user_id) as messages
                    FROM simulator_20221120.message_actions
                    WHERE day > (today()-1) - 7 and day != today()
                    GROUP BY toStartOfDay(toDateTime(time))
                    ORDER BY day DESC'''
            messages_df = ph.read_clickhouse(query=query, connection=connection)
            # messages_df.day = likes_views_df.day.dt.date

            return messages_df
    @task()
    def get_info_likes_views_mess(likes_views_df, messages_df):


        actions_df = likes_views_df.groupby(['day', 'action'], as_index=False)\
                    .agg({'actions':'sum'})\
                    .sort_values('day')
        actions_df['like_growth_rate'] = actions_df.query('action == "like"').actions.pct_change()
        actions_df['view_growth_rate'] = actions_df.query('action == "view"').actions.pct_change()

        likes = actions_df.query('day == @actions_df.day.max() and action == "like"').iloc[0][2]
        views = actions_df.query('day == @actions_df.day.max() and action == "view"').iloc[0][2]

        likes_growth = round(actions_df.query('day == @actions_df.day.max() and action == "like"').iloc[0][3] * 100, 2)
        views_growth = round(actions_df.query('day == @actions_df.day.max() and action == "view"').iloc[0][4] * 100, 2)

        ctr = round(likes/views, 4)*100

        title = "💖💬<b>Активность</b>"

        text_1 = f'За вчера было поставлено {likes} лайков ({likes_growth}% к пред. дню) и просмотрено {views} постов ({views_growth}% к пред. дню). CTR составил  {ctr}%'

        mes_1 = messages_df.sort_values('day', ascending=False).iloc[0][1]
        mes_0 = messages_df.sort_values('day', ascending=False).iloc[1][1]

        mes_diff = round(mes_1/mes_0 - 1, 2)*100

        text_2 = f'Также было отправлено {mes_1} сообщений ({mes_diff}% к пред. дню)'

        return title + '\n'+ '\n' + text_1 + '\n' + text_2
    
    @task()
    def send_plot_dau_df(dau_df):
        dau_df.day = pd.to_datetime(dau_df["day"])
        dau_df = dau_df.sort_values('day')
        dau_df.day = dau_df.day.dt.strftime('%d-%m')

        plt.subplot(212)
        plt.title('Динамика DAU')
        sns.lineplot(y = 'uniq_users', x='day', data=dau_df)
        plt.subplot(221)
        plt.title('Динамика DAU в разбивке по OS')
        sns.lineplot(y = dau_df.uniq_users, x=dau_df.day, hue=dau_df.os)
        plt.subplot(222)
        plt.title('Динамика DAU в разбивке по Source')
        sns.lineplot(y = dau_df.uniq_users, x=dau_df.day, hue=dau_df.source)

        plot_object = io.BytesIO()
        plt.savefig(plot_object)
        plot_object.seek(0)
        plot_object.name = 'dau.png'
        plt.close()
        bot.sendPhoto(chat_id=chat_id, photo=plot_object, parse_mode='HTML')
        
    @task()
    def send_plot_new_users_df(df_new_users):
        df_new_users.bd = pd.to_datetime(df_new_users.bd)
        df_new_users = df_new_users.sort_values('bd').query('bd > "1971-01-01"')
        df_new_users.bd = df_new_users.bd.dt.strftime('%d-%m')


        plt.subplot(212)
        plt.title('динамика привлечения новых пользователей')
        sns.lineplot(y = 'users', x='bd', data=df_new_users)
        plt.subplot(221)
        plt.title('Новые пользователи в разрезе OS')
        sns.lineplot(y = 'users', x='bd', hue='os', data=df_new_users)
        plt.subplot(222)
        plt.title('Новые пользователи в разрезе Source')
        sns.lineplot(y = 'users', x='bd', hue='source', data=df_new_users)

        plot_object = io.BytesIO()
        plt.savefig(plot_object)
        plot_object.seek(0)
        plot_object.name = 'dau.png'
        plt.close()
        bot.sendPhoto(chat_id=chat_id, photo=plot_object, parse_mode='HTML')
        
    @task()
    def send_plot_likes_views_df(likes_views_df, messages_df):
        likes_views_df.day = pd.to_datetime(likes_views_df["day"])
        likes_views_df = likes_views_df.sort_values('day')
        likes_views_df.day = likes_views_df.day.dt.strftime('%d-%m')

        messages_df.day = pd.to_datetime(messages_df["day"])
        messages_df = messages_df.sort_values('day')
        messages_df.day = messages_df.day.dt.strftime('%d-%m')

        ctr = likes_views_df.pivot_table(index='day', columns='action', values='actions').reset_index()
        ctr['ctr'] = ctr.like / ctr.view
        ctr['ctr'] = ctr.ctr.mul(100).round(2)

        plt.subplot(212)
        plt.title('Динамика CTR, %')
        sns.lineplot(y = 'ctr', x='day', data=ctr)
        plt.subplot(221)
        plt.title('Активность в ленте')
        sns.lineplot(y = 'actions', x='day', hue='action', data=likes_views_df)
        plt.subplot(222)
        plt.title('Кол-во отправленных сообщений')
        sns.lineplot(y = 'messages', x='day', data=messages_df)

        plot_object = io.BytesIO()
        plt.savefig(plot_object)
        plot_object.seek(0)
        plot_object.name = 'dau.png'
        plt.close()
        bot.sendPhoto(chat_id=chat_id, photo=plot_object, parse_mode='HTML')
        
    @task()
    def send_message(title):
        bot.sendMessage(chat_id=chat_id, text=title, parse_mode='HTML')
        
    @task()
    def send_message_title():
        ds = context['ds']
        bot.sendMessage(chat_id=chat_id, text=f"📄<b>Ежедневный отчет по ленте новостей, и по сервису отправки сообщений. Дата: {ds}</b>", parse_mode='HTML')
    
    send_message_title()
    dau_df = get_dau_df_2()
    title_1 = get_dau_info(dau_df)
    send_message(title_1)
    send_plot_dau_df(dau_df)
    df_new_users = get_df_new_users()
    title_2 = get_info_new_users(df_new_users)
    send_message(title_2)
    send_plot_new_users_df(df_new_users)
    likes_views_df = get_likes_views_df()
    messages_df = get_messages_df()
    title_3 = get_info_likes_views_mess(likes_views_df, messages_df)
    send_message(title_3)
    send_plot_likes_views_df(likes_views_df, messages_df)
    
lesson_7_dag_2_merinov = lesson_7_dag_2_merinov()