### Создаём свой собственный DAG:
[Airflow DAG: Coding your first DAG for Beginners](https://www.youtube.com/watch?v=IH1-0hwFZRQ);
[Airflow Data Pipelines with Github Copilot?!](https://www.youtube.com/watch?v=1p1oE_MNA9A);
[How to write your first DAG in Apache Airflow - Airflow tutorials](https://www.youtube.com/watch?v=2nhdhIYueIE).

А теперь к заданию, которое и закрепит пройденный материал:

Сначала определим год, за какой будем смотреть данные.

- Сделать это можно так:
    в питоне выполнить 1994 + hash(f‘{login}') % 23,  где {login} - ваш логин (или же папка с дагами)

Дальше нужно составить DAG из нескольких тасок, в результате которого нужно будет найти ответы на следующие вопросы:

1. Какая игра была самой продаваемой в этом году во всем мире?
2. Игры какого жанра были самыми продаваемыми в Европе? Перечислить все, если их несколько
3. На какой платформе было больше всего игр, которые продались более чем миллионным тиражом в Северной Америке?
Перечислить все, если их несколько
4. У какого издателя самые высокие средние продажи в Японии?
Перечислить все, если их несколько
5. Сколько игр продались лучше в Европе, чем в Японии?


Оформлять DAG можно как угодно, важно чтобы финальный таск писал в лог ответ на каждый вопрос. Ожидается, что в DAG будет 7 тасков. По одному на каждый вопрос, таск с загрузкой данных и финальный таск который собирает все ответы. Дополнительный бонус за настройку отправки сообщений в телеграмм по окончанию работы DAG


In [1]:
#Для датасета
import pandas as pd
from datetime import timedelta
from datetime import datetime

#Для DAGS и отчетов в telegram
import telegram
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.models import Variable

##### Описание данных
Имеются следующие поля:

1. Rank – место по объему продаж
2. Name – название игры
3. Platform – платформа, на которой выпущена игра
4. Year – год релиза
5. Genre – жанр
6. Publisher – издатель
7. NA_Sales – продажи в Северной Америке, в млн.
8. EU_Sales – продажи в Европе, в млн.
9. JP_Sales – продажи в Японии, в млн.
10. Other_Sales – продажи в остальном мире, в млн.
11. Global_Sales – продажи по всему миру, в млн.

Как и в прошлый раз, я сначала выполню задачки, а после соберу свой Dag:
1) Но прежде всего нужно выбрать год исходя из своего логина.

In [2]:
year = 1994 + hash(f'{"v-saharov-20"}') % 23
year

2003

In [3]:
df = pd.read_csv("vgsales.csv").query("Year == @year")
df.head()

Unnamed: 0,Rank,Name,Platform,Year,Genre,Publisher,NA_Sales,EU_Sales,JP_Sales,Other_Sales,Global_Sales
104,105,Need for Speed Underground,PS2,2003.0,Racing,Electronic Arts,3.27,2.83,0.08,1.02,7.2
110,111,Mario Kart: Double Dash!!,GC,2003.0,Racing,Nintendo,4.12,1.77,0.87,0.19,6.95
174,175,Final Fantasy X-2,PS2,2003.0,Role-Playing,Electronic Arts,1.92,1.08,2.11,0.17,5.29
182,183,Super Mario Bros. 3,GBA,2003.0,Platform,Nintendo,2.93,1.25,0.83,0.2,5.2
190,191,Medal of Honor: Rising Sun,PS2,2003.0,Shooter,Electronic Arts,1.98,2.23,0.13,0.8,5.13


In [4]:
df.Name.unique().shape[0]

546

Есть несколько строк с повторением, то есть с повторными продажами:

А теперь преступим к вопросам:

2. Какая игра была самой продаваемой в этом году во всем мире?


In [5]:
df.groupby("Name",as_index=False).agg({"Global_Sales":"sum"})\
    .sort_values("Global_Sales",ascending=False).head(1).Name

316    Need for Speed Underground
Name: Name, dtype: object

3. Игры какого жанра были самыми продаваемыми в Европе? Перечислить все, если их несколько

In [6]:
df.groupby("Genre",as_index=False).agg({"EU_Sales":"sum"})\
    .sort_values("EU_Sales",ascending=False).head(1).Genre

0    Action
Name: Genre, dtype: object

4. На какой платформе было больше всего игр, которые продались более чем миллионным тиражом в Северной Америке?
Перечислить все, если их несколько

In [7]:
df.query("NA_Sales > 1").groupby("Platform",as_index = False)\
    .agg({"Name":pd.Series.nunique})\
    .sort_values("Name",ascending=False).head(1).Platform

3    PS2
Name: Platform, dtype: object

5. У какого издателя самые высокие средние продажи в Японии?
Перечислить все, если их несколько

In [8]:
df.groupby("Publisher",as_index = False).agg({"JP_Sales":"mean"})\
    .sort_values("JP_Sales",ascending=False).head(1).Publisher

63    SquareSoft
Name: Publisher, dtype: object

6. Сколько игр продались лучше в Европе, чем в Японии?

In [9]:
df.groupby("Name",as_index = False)\
    .agg({"EU_Sales":"sum","JP_Sales":"sum"})\
    .query("EU_Sales > JP_Sales")\
    .shape[0]

447

На все вопросы я ответил и теперь это можно встроить в Dag, при этом я хочу попробывать архитектуру airflow 2.0

In [10]:
default_args = {
    'owner': 'v-saharov-20',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2022, 6, 5),
    'schedule_interval': '0 12 * * *'
}

Помимо всех запросов, которые нам нужно реализовать, нам также надо реализовать функцию с обработкой датасета и вывода всех ответов:

In [None]:
CHAT_ID = -429299660
BOT_TOKEN = Variable.get('telegram_secret')

def send_message(context):
    date = context['ds']
    dag_id = context['dag'].dag_id
    message = f"Well Done! Dag {dag_id} completed on {date}. It's fine."
    bot = telegram.Bot(token=BOT_TOKEN)
    bot.send_message(chat_id=CHAT_ID, text=message)

@dag(default_args=default_args, catchup=False)
def lesson3_v_saharov_20():
    
    @task(retries=3)
    def get_data():
        year = 1994 + hash(f'{"v-saharov-20"}') % 23
        games = pd.read_csv("vgsales.csv").query("Year == @year")
        return games
    
    #Какая игра была самой продаваемой в этом году во всем мире?
    @task(retries=4)
    def top_game(games):
        top_game_df = games.groupby("Name",as_index=False).agg({"Global_Sales":"sum"})\
    .sort_values("Global_Sales",ascending=False).head(1).Name
        return top_game_df
    
    #Игры какого жанра были самыми продаваемыми в Европе? Перечислить все, если их несколько
    @task(retries=4)
    def top_genres(games):
        top_genres_df = games.groupby("Genre",as_index=False).agg({"EU_Sales":"sum"})\
    .sort_values("EU_Sales",ascending=False).head(1).Genre
        return top_genres_df
    
    #На какой платформе было больше всего игр, которые продались более чем миллионным тиражом в Северной Америке? Перечислить все, если их несколько
    @task(retries=4)
    def top_platform_na(games):
        top_platform_na_df = games.query("NA_Sales > 1").groupby("Platform",as_index = False)\
            .agg({"Name":pd.Series.nunique})\
            .sort_values("Name",ascending=False).head(1).Platform
        return top_platform_na_df
    
    #У какого издателя самые высокие средние продажи в Японии? Перечислить все, если их несколько
    @task(retries=4)
    def top_publisher_japan(games):
        top_publisher_japan_df = games.groupby("Publisher",as_index = False).agg({"JP_Sales":"mean"})\
            .sort_values("JP_Sales",ascending=False).head(1).Publisher
        return top_publisher_japan_df
    
    #Сколько игр продались лучше в Европе, чем в Японии?
    @task(retries=4)
    def europe_bigger_japan(games):
        europe_bigger_japan_df = games.groupby("Name",as_index = False)\
            .agg({"EU_Sales":"sum","JP_Sales":"sum"})\
            .query("EU_Sales > JP_Sales")\
            .shape[0]
        return europe_bigger_japan_df
    
    @task(on_success_callback=send_message)
    def print_data(top_game_df, top_genres_df, top_platform_na_df, top_publisher_japan_df, europe_bigger_japan_df):
        context = get_current_context()
        date = context['ds']
        year = 1994 + hash('v-saharov-20') % 23

        print(f"The most popular game in {year} is {top_game_df}")
        print(f"The most popular genres in Europe in {year} is {top_genres_df}")
        print(f"The most popular platform with the biggest amount of games which have been sold more than 1 millions copies in {year} in NA is {top_platform_na_df}")
        print(f"The best publisher of mean sales in Japan in {year} is {top_publisher_japan_df}")
        print(f"{europe_bigger_japan_df} were sold more in Europe than in Japan in {year}")
        

    data = get_data()
    world_top_game = top_game(data)
    top_genres_eu = top_genres(data)
    top_platform_na_1m = top_platform_na(data)
    top_publisher_japan_sales = top_publisher_japan(data)
    number_europe_bigger_japan = europe_bigger_japan(data)
    print_data(world_top_game, top_genres_eu, top_platform_na_1m, top_publisher_japan_sales, number_europe_bigger_japan)
    
lesson3_v_saharov_20 = lesson3_v_saharov_20()


### Выводы:
____
В ходе я ответил на несколько рутинных вопросов, которые после собрал в DAGS, при этом я использовал новый способ назначение тасков, а именно с помощью декодаров.

### Полезные ссылки для изучения:

Базовые вещи:
____
1. [Основные термины и концепты airflow](https://airflow.apache.org/docs/apache-airflow/stable/concepts/index.html);
2. [Best practices от самого airflow](https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html);
3. [Еще практика](https://medium.com/datareply/airflow-lesser-known-tips-tricks-and-best-practises-cf4d4a90f8f ).

Advanced:
____
1. [Разные гайды в доке](https://airflow.apache.org/docs/apache-airflow/stable/howto/index.html);
2. [Немного про нотификации](https://www.astronomer.io/guides/error-notifications-in-airflow);
3. [Целый учебник](https://livebook.manning.com/book/data-pipelines-with-apache-airflow/welcome/v-4/).