Сначала определим год, за какой будем смотреть данные.
Дальше нужно составить DAG из нескольких тасок, в результате которого нужно будет найти ответы на следующие вопросы:

   - Какая игра была самой продаваемой в этом году во всем мире?
   - Игры какого жанра были самыми продаваемыми в Европе?
   - На какой платформе было больше всего игр,которые продались более чем миллионным тиражом в Северной    
    Америке?Перечислить все, если их несколько
   - У какого издателя самые высокие средние продажи в Японии? Перечислить все, если их несколько
   - Сколько игр продались лучше в Европе, чем в Японии?

Ожидается, что в DAG будет 7 тасков. По одному на каждый вопрос, таск с загрузкой данных и финальный таск который собирает все ответы. 



In [3]:
import pandas as pd
from datetime import timedelta
from datetime import datetime
from io import StringIO
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.models import Variable

path =  '/var/lib/airflow/airflow.git/dags/a.batalov/vgsales.csv'
name = 'v-skurlatova-33'
year = 1994 + hash(f'{name}') % 23


default_args = {
    'owner': 'v.skurlatova',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2023, 4, 15),
    'schedule_interval': '45 14 * * *'
}


@dag(default_args=default_args, catchup=False)
def airflow_3():
    
    @task()
    def get_data():
        df = pd.read_csv(path)
        df = df.query('Year==@year')
        return df

    @task()
    def most_pop(df):
        most_pop = df.groupby('Name',as_index=False)['Global_Sales'].sum().max().to_frame().reset_index()
        return most_pop
    
    @task()
    def eu_genre(df):
        eu_genre = df.groupby('Genre',as_index=False)['EU_Sales'].sum().sort_values('EU_Sales',ascending=False)
        eu_genre = eu_genre.head(5)
        return eu_genre
    
    @task()
    def us_platform(df):
        na_sales = df.query('NA_Sales> 1')[['Name','Platform']]
        us_platform = na_sales.value_counts('Platform')
        us_platform = us_platform.to_frame().reset_index()
        us_platform = us_platform.head(1)
        return us_platform

    @task()
    def jp(df):
        jp = df. groupby('Publisher',as_index=False).agg({'JP_Sales':'mean'}).sort_values('JP_Sales',ascending=False)
        jp = jp.head(1)
        return jp
    
    @task()
    def better_sold(df):
        better_sold = df.query('EU_Sales>JP_Sales')['Name'].count()
        return better_sold
    
    @task()
    def print_data(most_pop, eu_genre, us_platform, jp, better_sold):
        
        date = year
        
        print(f'The most popular Global Sales game in {date}')
        print(most_pop)
        
        print(f'Top sold in EU genres year {date}')
        print(eu_genre)
        
        print(f'The most popular NA platform in {date}')
        print(us_platform)
        
        print(f'Japanese top-selling publisher in {date}')
        print(jp)
        
        print(better_sold)
        print(f'games was sold in {date} better in EU than in JP')
        
    df = get_data()
    most_pop = most_pop(df)
    eu_genre = eu_genre(df)
    us_platform = us_platform(df)
    jp = jp(df)
    better_sold = better_sold(df)
    print_data(most_pop, eu_genre, us_platform, jp, better_sold)

    
airflow_3 = airflow_3()