In [6]:
import pandas as pd
from datetime import timedelta
from datetime import datetime

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context

path = '/var/lib/airflow/airflow.git/dags/a.batalov/vgsales.csv'
login = 'v-tsuzoj'
year = 1994 + hash(f'{login}') % 23

default_args = {
    'owner': 'v-tsuzoj',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2024, 3, 9)
}

@dag(default_args=default_args, schedule_interval = '0 12 * * *', catchup=False)
def v_tsuzoy_dag_l3():
    @task(retries=3)
    def get_data():
        df_year = pd.read_csv(path).query('Year == @year')
        df_year['Year'] = df_year['Year'].astype(int)
        return df_year

    @task()
    def most_sales_game(df_year):
        most_sales_game = df_year.groupby('Name', as_index = False) \
                         .agg({'Global_Sales' : 'sum'}) \
                         .sort_values('Global_Sales', ascending = False) \
                         .head(1)['Name']
        return most_sales_game

    @task()
    def top_genre_eu(df_year):
        top_genre_eu = df_year.groupby('Genre', as_index = False) \
                              .agg({'EU_Sales' : 'sum'}) \
                              .sort_values('EU_Sales', ascending = False) \
                              .query("EU_Sales == EU_Sales.max()")['Genre'].to_list()
        return top_genre_eu
    
    @task()
    def top_NA_platform(df_year):
        top_NA_platform = df_year.query("NA_Sales > 1") \
                                 .groupby('Platform', as_index = False) \
                                 .agg({'Name' : 'count'}) \
                                 .sort_values('Name', ascending = False) \
                                 .query('Name == Name.max()').Platform.to_list() 
        return top_NA_platform
    
    @task()
    def jp_publisher_mean_sales(df_year):
        jp_publisher_mean_sales = df_year.groupby('Publisher', as_index = False) \
                                         .agg({'JP_Sales' : 'mean'}) \
                                         .sort_values('JP_Sales', ascending = False) \
                                         .query("JP_Sales == JP_Sales.max()").Publisher.to_list()
        return jp_publisher_mean_sales
    
    @task()
    def games_count_eu_vs_jp(df_year):
        games_count_eu_vs_jp = df_year.query("EU_Sales > JP_Sales").Name.nunique()
    return games_count_eu_vs_jp

    @task()
    def print_data(most_sales_game, top_genre_eu, top_NA_platform, jp_publisher_mean_sales, games_count_eu_vs_jp):

        context = get_current_context()
        date = context['ds']
        
        print(f'Игра {most_sales_game} была самой продаваемой в {year} году во всем мире')
        print(f'Игры жанра {top_genre_eu} были самыми продаваемыми в {year} году в Европе')
        print(f'На платформе {top_NA_platform} в {year} году было больше всего игр, которые продались более чем миллионным тиражом в Северной Америке')
        print(f'У издателя {jp_publisher_mean_sales} в {year} году самые высокие средние продажи в Японии')
        print(f'{games_count_eu_vs_jp} игр в {year} году продались лучше в Европе, чем в Японии')


    df_year = get_data()
    max_sale = most_sales_game(df_year)
    eu_max_sale = top_genre_eu(df_year)
    na_max_platform = top_NA_platform(df_year)
    jp_max_publisher_sales = jp_publisher_mean_sales(df_year)
    eu_japan_sales = games_count_eu_vs_jp(df_year)
    print_data(max_sale, eu_max_sale, na_max_platform, jp_max_publisher_sales, eu_japan_sales)


v_tsuzoy_dag_lesson_3 = v_tsuzoy_dag_l3()