Нужно составить DAG из нескольких тасков, в результате которого нужно будет найти ответы на следующие вопросы:

- Какая игра была самой продаваемой в этом году во всем мире?
- Игры какого жанра были самыми продаваемыми в Европе? Перечислить все, если их несколько
- На какой платформе было больше всего игр, которые продались более чем миллионным тиражом в Северной Америке?
Перечислить все, если их несколько
- У какого издателя самые высокие средние продажи в Японии?
Перечислить все, если их несколько
- Сколько игр продались лучше в Европе, чем в Японии?

In [None]:
import requests
from zipfile import ZipFile
from io import BytesIO
import pandas as pd
import numpy as np
from datetime import timedelta
from datetime import datetime
from io import StringIO
import telegram

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.models import Variable

TOP_1M_DOMAINS = 'http://s3.amazonaws.com/alexa-static/top-1m.csv.zip'
TOP_1M_DOMAINS_FILE = 'top-1m.csv'

default_args = {
    'owner': 'a.batalov',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2021, 10, 7),
    'schedule_interval': '0 12 * * *'
}

def send_message(context):
    date = context['ds']
    dag_id = context['dag'].dag_id
    message = f'Huge success! Dag {dag_id} completed on {date}'
    if BOT_TOKEN != '':
        bot = telegram.Bot(token=BOT_TOKEN)
        bot.send_message(chat_id=CHAT_ID, text=message)
    else:
        pass

@dag(default_args=default_args, catchup=False)
def top_10_airflow_2():
    @task(retries=3)
    def get_data():
        top_doms = requests.get(TOP_1M_DOMAINS, stream=True)
        zipfile = ZipFile(BytesIO(top_doms.content))
        top_data = zipfile.read(TOP_1M_DOMAINS_FILE).decode('utf-8')
        return top_data

    @task(retries=4, retry_delay=timedelta(10))
    def get_table_ru(top_data):
        top_data_df = pd.read_csv(StringIO(top_data), names=['rank', 'domain'])
        top_data_ru = top_data_df[top_data_df['domain'].str.endswith('.ru')]
        return top_data_ru.to_csv(index=False)

    @task()
    def get_stat_ru(top_data_ru):
        ru_df = pd.read_csv(StringIO(top_data_ru))
        ru_avg = int(ru_df['rank'].aggregate(np.mean))
        ru_median = int(ru_df['rank'].aggregate(np.median))
        return {'ru_avg': ru_avg, 'ru_median': ru_median}

    @task()
    def get_table_com(top_data):
        top_data_df = pd.read_csv(StringIO(top_data), names=['rank', 'domain'])
        top_data_com = top_data_df[top_data_df['domain'].str.endswith('.com')]
        return top_data_com.to_csv(index=False)

    @task()
    def get_stat_com(top_data_com):
        com_df = pd.read_csv(StringIO(top_data_com))
        com_avg = int(com_df['rank'].aggregate(np.mean))
        com_median = int(com_df['rank'].aggregate(np.median))
        return {'com_avg': com_avg, 'com_median': com_median}

    @task(on_success_callback=send_message)
    def print_data(ru_stat, com_stat):

        context = get_current_context()
        date = context['ds']

        ru_avg, ru_median = ru_stat['ru_avg'], ru_stat['ru_median']
        com_avg, com_median = com_stat['com_avg'], com_stat['com_median']

        print(f'''Data from .RU for {date}
                  Avg rank: {ru_avg}
                  Median rank: {ru_median}''')

        print(f'''Data from .COM for {date}
                          Avg rank: {com_avg}
                          Median rank: {com_median}''')

    top_data = get_data()
    top_data_ru = get_table_ru(top_data)
    ru_data = get_stat_ru(top_data_ru)

    top_data_com = get_table_com(top_data)
    com_data = get_stat_com(top_data_com)

    print_data(ru_data, com_data)

top_10_airflow_2 = top_10_airflow_2()


In [7]:
1994 + hash('d-prokofev') % 23

2013

In [None]:
import requests
from zipfile import ZipFile
from io import BytesIO
import pandas as pd
import numpy as np
from datetime import timedelta
from datetime import datetime
from io import StringIO
import telegram

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.models import Variable



default_args = {
    'owner': 'd-prokofev',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2021, 10, 7),
}

my_year = 1994 + hash('d-prokofev') % 23

@dag(default_args=default_args, catchup=False)
def d_prokofev_lesson_3_second_retry():
    
    @task(retries=3)
    def get_vg_data():
        vg_data = pd.read_csv('/var/lib/airflow/airflow.git/dags/a.batalov/vgsales.csv')
        vg_data.Year = vg_data.fillna(0).Year.astype('int')
        vg_data.Publisher = vg_data.Publisher.fillna('Unknown')
        vg_data = vg_data.query('Year == @my_year')
        return vg_data.to_csv(index=False)
    
    @task()
    def world_bestseller(vg_data):
        vg_data = pd.read_csv(StringIO(vg_data))
        bestseller = vg_data.groupby('Name', as_index = False) \
            .agg({'Global_Sales': 'sum'}) \
            .sort_values('Global_Sales', ascending=False) \
            .head(1)
        return bestseller.to_csv(index=False)
    
    @task
    def europes_main_genre(vg_data):
        vg_data = pd.read_csv(StringIO(vg_data))
        main_genre = vg_data.groupby('Genre', as_index=False) \
            .agg({'EU_Sales': 'sum'}) \
            .query('EU_Sales == EU_Sales.max()')
        return main_genre.to_csv(index=False)
    
    @task()
    def NA_best_platform(vg_data):
        vg_data = pd.read_csv(StringIO(vg_data))
        best_platform = vg_data.query('NA_Sales > 1.0') \
            .groupby('Platform', as_index=False) \
            .agg({'Name': 'count'}) \
            .query('Name == Name.max()')
        return best_platform.to_csv(index=False)
    
    @task()
    def JP_best_publisher(vg_data):
        vg_data = pd.read_csv(StringIO(vg_data))
        best_publisher = vg_data.groupby('Publisher', as_index=False) \
            .agg({'JP_Sales': 'mean'}) \
            .query('JP_Sales == JP_Sales.max()')
        return best_publisher.to_csv(index=False)
    
    @task
    def EU_vs_JP_sales(vg_data):
        vg_data = pd.read_csv(StringIO(vg_data))
        EU_vs_JP = vg_data.groupby('Name', as_index=False) \
            .agg({'JP_Sales': 'sum', 'EU_Sales': 'sum'}) \
            .query('JP_Sales < EU_Sales') \
            .shape[0]
        return EU_vs_JP
    
    @task
    def print_results(bestseller, main_genre, best_platform, best_publisher, EU_vs_JP):
        
        context = get_current_context()
        date = context['ds']
        
        print(f'''Data for: {date}\n
                Analysed year: {my_year}\n
                  1. Globaly, the bestselling game is \n
                  {bestseller}\n
                  2. Bestselling genre(s) in Europe is(are)\n
                  {main_genre}\n
                  3. Platform(s) with the highest number of games, that wore sold in more than a million copies (for North America)\n
                  {best_platform}\n
                  4. Publisher(s) with the highest mean value of sales in Japan\n
                  {best_publisher}\n
                  5. {EU_vs_JP} games were sold in higher quantity in Europe in comparison to Japan''')
    
    vg_data = get_vg_data()
    world_bestseller = world_bestseller(vg_data)
    europes_main_genre = europes_main_genre(vg_data)
    NA_best_platform = NA_best_platform(vg_data)
    JP_best_publisher = JP_best_publisher(vg_data)
    EU_vs_JP_sales = EU_vs_JP_sales(vg_data)
    
    print_results(world_bestseller, europes_main_genre, NA_best_platform, JP_best_publisher, EU_vs_JP_sales)

    
d_prokofev_lesson_3_second_retry = d_prokofev_lesson_3_second_retry()

In [19]:
import pandas as pd
import numpy as np

my_year = 1994 + hash('d-prokofev') % 23
vg_data = pd.read_csv('~/vgsales.csv')
vg_data.Year = vg_data.fillna(0).Year.astype('int')
vg_data.Publisher = vg_data.Publisher.fillna('Unknown')
vg_data.head(15)

Unnamed: 0,Rank,Name,Platform,Year,Genre,Publisher,NA_Sales,EU_Sales,JP_Sales,Other_Sales,Global_Sales
0,1,Wii Sports,Wii,2006,Sports,Nintendo,41.49,29.02,3.77,8.46,82.74
1,2,Super Mario Bros.,NES,1985,Platform,Nintendo,29.08,3.58,6.81,0.77,40.24
2,3,Mario Kart Wii,Wii,2008,Racing,Nintendo,15.85,12.88,3.79,3.31,35.82
3,4,Wii Sports Resort,Wii,2009,Sports,Nintendo,15.75,11.01,3.28,2.96,33.0
4,5,Pokemon Red/Pokemon Blue,GB,1996,Role-Playing,Nintendo,11.27,8.89,10.22,1.0,31.37
5,6,Tetris,GB,1989,Puzzle,Nintendo,23.2,2.26,4.22,0.58,30.26
6,7,New Super Mario Bros.,DS,2006,Platform,Nintendo,11.38,9.23,6.5,2.9,30.01
7,8,Wii Play,Wii,2006,Misc,Nintendo,14.03,9.2,2.93,2.85,29.02
8,9,New Super Mario Bros. Wii,Wii,2009,Platform,Nintendo,14.59,7.06,4.7,2.26,28.62
9,10,Duck Hunt,NES,1984,Shooter,Nintendo,26.93,0.63,0.28,0.47,28.31


In [20]:
vg_data.dtypes

Rank              int64
Name             object
Platform         object
Year              int64
Genre            object
Publisher        object
NA_Sales        float64
EU_Sales        float64
JP_Sales        float64
Other_Sales     float64
Global_Sales    float64
dtype: object

In [21]:
for column in vg_data.columns:
    print(column, vg_data[column].hasnans, vg_data[column].isna().sum())

Rank False 0
Name False 0
Platform False 0
Year False 0
Genre False 0
Publisher False 0
NA_Sales False 0
EU_Sales False 0
JP_Sales False 0
Other_Sales False 0
Global_Sales False 0


In [23]:
my_year = 1994 + hash('d-prokofev') % 23
vg_data = vg_data.query('Year == @my_year')
vg_data

Unnamed: 0,Rank,Name,Platform,Year,Genre,Publisher,NA_Sales,EU_Sales,JP_Sales,Other_Sales,Global_Sales
16,17,Grand Theft Auto V,PS3,2013,Action,Take-Two Interactive,7.01,9.27,0.97,4.14,21.40
23,24,Grand Theft Auto V,X360,2013,Action,Take-Two Interactive,9.63,5.31,0.06,1.38,16.38
32,33,Pokemon X/Pokemon Y,3DS,2013,Role-Playing,Nintendo,5.17,4.05,4.34,0.79,14.35
61,62,Call of Duty: Ghosts,X360,2013,Shooter,Activision,6.72,2.63,0.04,0.82,10.21
67,68,Call of Duty: Ghosts,PS3,2013,Shooter,Activision,4.09,3.73,0.38,1.38,9.59
...,...,...,...,...,...,...,...,...,...,...,...
16468,16471,Exstetra,3DS,2013,Role-Playing,FuRyu,0.00,0.00,0.01,0.00,0.01
16507,16510,Romeo Vs. Juliet,PSP,2013,Adventure,Quinrose,0.00,0.00,0.01,0.00,0.01
16526,16529,Onigokko! Portable,PSP,2013,Adventure,Alchemist,0.00,0.00,0.01,0.00,0.01
16535,16538,Mushi Bugyou,3DS,2013,Action,Namco Bandai Games,0.00,0.00,0.01,0.00,0.01


In [39]:
world_bestseller = vg_data.groupby('Name', as_index = False) \
    .agg({'Global_Sales': 'sum'}) \
    .sort_values('Global_Sales', ascending=False) \
    .head(1)
print(world_bestseller)

                   Name  Global_Sales
115  Grand Theft Auto V         37.78


In [41]:
europes_main_genre = vg_data.groupby('Genre', as_index=False) \
    .agg({'EU_Sales': 'sum'}) \
    .query('EU_Sales == EU_Sales.max()')
europes_main_genre

Unnamed: 0,Genre,EU_Sales
0,Action,45.21


In [44]:
best_platform = vg_data.query('NA_Sales > 1.0') \
    .groupby('Platform', as_index=False) \
    .agg({'Name': 'count'}) \
    .query('Name == Name.max()')
best_platform

Unnamed: 0,Platform,Name
5,X360,12


In [45]:
best_publisher = vg_data.groupby('Publisher', as_index=False) \
    .agg({'JP_Sales': 'mean'}) \
    .query('JP_Sales == JP_Sales.max()')
best_publisher

Unnamed: 0,Publisher,JP_Sales
31,GungHo,0.775


In [52]:
EU_vs_JP = vg_data.groupby('Name', as_index=False) \
    .agg({'JP_Sales': 'sum', 'EU_Sales': 'sum'}) \
    .query('JP_Sales < EU_Sales') \
    .shape[0]
EU_vs_JP

142