# Airflow практика

Задание:
    
Нужно составить DAG из нескольких тасок, в результате которого нужно будет найти ответы на следующие вопросы:

- Какая игра была самой продаваемой в этом году во всем мире?
- Игры какого жанра были самыми продаваемыми в Европе? Перечислить все, если их несколько
- На какой платформе было больше всего игр, которые продались более чем миллионным тиражом в Северной Америке? Перечислить все, если их несколько
- У какого издателя самые высокие средние продажи в Японии? Перечислить все, если их несколько
- Сколько игр продались лучше в Европе, чем в Японии?

In [1]:
# импортируем все необходимые библиотеки
import pandas as pd
import numpy as np
from datetime import timedelta
from datetime import datetime

from airflow.decorators import dag, task

In [2]:
login = 'v-hodzitskij'
date = 1994 + hash(f'{login}') % 23  # определим год, за которы будем смотреть данные
path =  '/var/lib/airflow/airflow.git/dags/a.batalov/vgsales.csv'

In [3]:
default_args = {
    'owner': login,
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2023, 1, 13)
}  # зададим дефольные аргументы для дага
schedule_interval = '0 12 * * *'  # установим интервал выполнения дага каждый день в 12 часов 

In [6]:
# задаем даг и таски через декораторы
@dag(default_args=default_args, catchup=False, schedule_interval=schedule_interval)
def v_hodzitskij():
    
    @task()  # таска, которая считывает файл
    def load_data(): 
        data =  pd.read_csv(path).query('Year == @date')
        return data
    
    @task()  # таска, определяющая самую популярную игру в этом году
    def get_pop_game(data):
        pop_game = data.sort_values(by='Global_Sales', ascending=False).head(1).Name
        return pop_game
    
    @task()  # таска, определяющая игры какого жанра были самыми продаваемыми в Европе
    def get_pop_genres_eu(data):
        # находим максимальную сумму продаж среди всех жанров
        max_eu_sales = (data.groupby('Genre', as_index=False)
                            .EU_Sales
                            .sum()
                            .max()
                            .EU_Sales)
        # отбираем жанры с максимальной суммой продаж
        pop_genres_eu = (data.groupby('Genre', as_index=False)
                            .EU_Sales
                            .sum()
                            .query('EU_Sales == @max_eu_sales')
                            .Genre)
        return pop_genres_eu
    
    # таска, определяющая на какой платформе было больше всего игр, которые продались более чем 
    # миллионным тиражом в Северной Америке
    @task()
    def get_pop_platform(data):
        # отбираем только игры, которые продались более чем миллионным тиражом в Северной Америке
        data = data.query('NA_Sales > 1') 
        # находим максимальное количество игр по платформам
        max_sales_count = (data.groupby('Platform', as_index=False).NA_Sales.count()
                               .NA_Sales
                               .max())
        # отбираем платформы с максимальным количеством игр
        platfroms = (data.groupby('Platform', as_index=False)
                         .NA_Sales.count()
                         .query('NA_Sales == @max_sales_count')
                         .Platform)
        return platfroms
    
    # таска, определяющая у каких издателей самые высокие средние продажи в Японии
    @task()
    def get_pop_publisher(data):
        # находим максимальную величину средних продаж в Японии по издателям
        max_sales_publishers = (data.groupby('Publisher', as_index=False)
                                    .JP_Sales
                                    .mean()
                                    .JP_Sales
                                    .max())
        # отбираем издателей с максимальной величиной средних продаж
        publishers = (data.groupby('Publisher', as_index=False)
                          .JP_Sales
                          .mean()
                          .query('JP_Sales == @max_sales_publishers')
                          .Publisher)
              
        return publishers
    
    # таска, определяющая cколько игр продались лучше в Европе, чем в Японии
    @task()
    def get_diff(data):
        diff = data.query('EU_Sales > JP_Sales').Name.count()
        return diff
    
    # финальный таск, который собирает все ответы
    @task()
    def get_result(pop_game, pop_genres_eu, platfroms, publishers, diff):
        print(f'Самая популярная игра в этом году: {pop_game}')
        print(f'Самые продаваемые жанры в Европе: {pop_genres_eu}')
        print(f'Платформа, на которой было больше всего игр, которые продались более чем миллионным тиражом в Северной Америке: {platfroms}')
        print(f'Издатели с самыми высокими средними продажами в Японии: {publishers}')
        print(f'Сколько игр продались лучше в Европе, чем в Японии: {diff}')
    
    # задаем последовательность выполнения тасков
    data = load_data()
    
    pop_game = get_pop_game(data)     
    pop_genres_eu = get_pop_genres_eu(data)
    platfroms = get_pop_platform(data)
    publishers = get_pop_publisher(data)
    diff = get_diff(data)
    
    get_result(pop_game, pop_genres_eu, platfroms, publishers, diff)
    
v_hodzitskij = v_hodzitskij()
    