# Задача

Дальше нужно составить DAG из нескольких тасок, в результате которого нужно будет найти ответы на следующие вопросы:

1. Какая игра была самой продаваемой в этом году во всем мире?
2. Игры какого жанра были самыми продаваемыми в Европе? Перечислить все, если их несколько
3. На какой платформе было больше всего игр, которые продались более чем миллионным тиражом в Северной Америке? Перечислить все, если их несколько
4. У какого издателя самые высокие средние продажи в Японии? Перечислить все, если их несколько
5. Сколько игр продались лучше в Европе, чем в Японии?

In [4]:
import requests
from zipfile import ZipFile
from io import BytesIO
import pandas as pd
from datetime import timedelta
from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator


In [5]:
# Посмотрим с какими данными нам предстоит работать
df = pd.read_csv('/mnt/HC_Volume_18315164/home-jupyter/jupyter-p-kievskij/airflow/dags/p-kievskij/vgsales.csv')

In [12]:
df.head()

Unnamed: 0,Rank,Name,Platform,Year,Genre,Publisher,NA_Sales,EU_Sales,JP_Sales,Other_Sales,Global_Sales
0,1,Wii Sports,Wii,2006.0,Sports,Nintendo,41.49,29.02,3.77,8.46,82.74
1,2,Super Mario Bros.,NES,1985.0,Platform,Nintendo,29.08,3.58,6.81,0.77,40.24
2,3,Mario Kart Wii,Wii,2008.0,Racing,Nintendo,15.85,12.88,3.79,3.31,35.82
3,4,Wii Sports Resort,Wii,2009.0,Sports,Nintendo,15.75,11.01,3.28,2.96,33.0
4,5,Pokemon Red/Pokemon Blue,GB,1996.0,Role-Playing,Nintendo,11.27,8.89,10.22,1.0,31.37


In [8]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 16598 entries, 0 to 16597
Data columns (total 11 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   Rank          16598 non-null  int64  
 1   Name          16598 non-null  object 
 2   Platform      16598 non-null  object 
 3   Year          16327 non-null  float64
 4   Genre         16598 non-null  object 
 5   Publisher     16540 non-null  object 
 6   NA_Sales      16598 non-null  float64
 7   EU_Sales      16598 non-null  float64
 8   JP_Sales      16598 non-null  float64
 9   Other_Sales   16598 non-null  float64
 10  Global_Sales  16598 non-null  float64
dtypes: float64(6), int64(1), object(4)
memory usage: 1.4+ MB


In [9]:
df.describe()

Unnamed: 0,Rank,Year,NA_Sales,EU_Sales,JP_Sales,Other_Sales,Global_Sales
count,16598.0,16327.0,16598.0,16598.0,16598.0,16598.0,16598.0
mean,8300.605254,2006.406443,0.264667,0.146652,0.077782,0.048063,0.537441
std,4791.853933,5.828981,0.816683,0.505351,0.309291,0.188588,1.555028
min,1.0,1980.0,0.0,0.0,0.0,0.0,0.01
25%,4151.25,2003.0,0.0,0.0,0.0,0.0,0.06
50%,8300.5,2007.0,0.08,0.02,0.0,0.01,0.17
75%,12449.75,2010.0,0.24,0.11,0.04,0.04,0.47
max,16600.0,2020.0,41.49,29.02,10.22,10.57,82.74


Видим, что:
1. Есть пропущенные значения в годе издания и в издателе. Но их не много.
2. Год имеет формат float64, лучше заменить на int для лучшего восприятия
3. Однозначных аномалий в значениях года издания и продаж нет.

In [1]:
# определяем тестовый год (используем хеш логина как определитель варианта)
login = 'p-kievskij'
test_year = 1994 + hash(f'{login}') % 23

In [113]:
# обьявляем функции
def get_data(): #считываем данные, преобразуем год, оставляем только нужный нам год.
    df = pd.read_csv('/mnt/HC_Volume_18315164/home-jupyter/jupyter-p-kievskij/airflow/dags/p-kievskij/vgsales.csv')
    df['Year'] = df['Year'].astype('Int64')
    df = df.query("Year == @test_year").reset_index().drop(columns='index')
    df.to_csv('year_vgsales.csv', index=False)

# Какая игра была самой продаваемой в этом году во всем мире?
def top_sale_def():
    top_sale_df = pd.read_csv('year_vgsales.csv')
    g_top_sale_df = top_sale_df.groupby('Name', as_index=False).agg({"Global_Sales":"sum"})
    g_top_sale_df = g_top_sale_df.query("Global_Sales == @g_top_sale_df.Global_Sales.max()")
    with open('top_sale_df.csv', 'w') as f:
        f.write(g_top_sale_df.to_csv(index=False, header=True))

# Игры какого жанра были самыми продаваемыми в Европе?
def top_genre_eu_def():
    eu_df = pd.read_csv('year_vgsales.csv')
    eu_df = eu_df.groupby("Genre", as_index=False).agg({'EU_Sales':'sum'})
    eu_df = eu_df.query("EU_Sales == @eu_df.EU_Sales.max()")
    with open('top_genre_eu.csv', 'w') as f:
        f.write(eu_df.to_csv(index=False, header=True))

# На какой платформе было больше всего игр, которые продались более чем миллионным тиражом в Северной Америке?
def M_platform_def():
    na_df = pd.read_csv('year_vgsales.csv')
    na_df = na_df.query("NA_Sales > 1")
    na_df = na_df.groupby("Platform", as_index=False).agg({'Name':'count'}).rename(columns={'Name':'Games'})
    na_df = na_df.query("Games == @na_df.Games.max()")
    with open('1M_platform_NE.csv', 'w') as f:
        f.write(na_df.to_csv(index=False, header=True))        

# У какого издателя самые высокие средние продажи в Японии?
def best_sales_JP_def():
    jp_df = pd.read_csv('year_vgsales.csv')
    jp_df = jp_df.groupby("Publisher", as_index=False).agg({'JP_Sales':'mean'})
    jp_df = jp_df.query("JP_Sales == @jp_df.JP_Sales.max()")
    with open('best_sales_JP.csv', 'w') as f:
        f.write(jp_df.to_csv(index=False, header=True))

# Сколько игр продались лучше в Европе, чем в Японии?
def EU_JP_sale_def():
    EU_JP_sale = pd.read_csv('year_vgsales.csv')
    EU_JP_sale = str(EU_JP_sale.query("EU_Sales > JP_Sales").shape[0])
    with open('EU_JP_sale.txt', 'w') as f:
        f.write(EU_JP_sale)

# выводим полученные ответы в лог исполнения DAGа
def print_data():
    with open('top_sale_df.csv', 'r') as f:
        data_top_sale = f.read()
    with open('top_genre_eu.csv', 'r') as f:
        data_top_genre_eue = f.read()
    with open('1M_platform_NE.csv', 'r') as f:
        data_1M_platform_NE = f.read()
    with open('best_sales_JP.csv', 'r') as f:
        data_best_sales_JP = f.read()
    with open('EU_JP_sale.txt', 'r') as f:
        data_EU_JP_sale = f.read()

    print(f'Самая продоваемая в {test_year} году игра:')
    print(data_top_sale)

    print(f'Самые продаваемые в Европе жанры в {test_year} году:')
    print(data_top_genre_eue)

    print(f'Платформы с наибольшим количеством игр и миллионным тиражом Северной Америке в {test_year} году:')
    print(data_1M_platform_NE)
    
    print(f'Издатель с самыми высокими средними продажами в Японии в {test_year} году:')
    print(data_best_sales_JP)
    
    print(f'Игр которые продались в Европпе лучше чем в Японии в {test_year} году: {data_EU_JP_sale}')

In [139]:
# параметры DAGа
default_args = {
    'owner': 'p-kievskij',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2023, 1, 16),
    'schedule_interval': '0 15 * * *'
}
dag = DAG('p-kievskij_less_3', default_args=default_args)

In [137]:
# прописываем таски в DAG
t1 = PythonOperator(task_id='get_data',
                    python_callable=get_data,
                    dag=dag)

t2 = PythonOperator(task_id='top_sale',
                    python_callable=top_sale_def,
                    dag=dag)

t3 = PythonOperator(task_id='top_genre_eu',
                        python_callable=top_genre_eu_def,
                        dag=dag)

t4 = PythonOperator(task_id='1M_platform_NE',
                        python_callable=M_platform_def,
                        dag=dag)

t5 = PythonOperator(task_id='best_sales_JP',
                        python_callable=best_sales_JP_def,
                        dag=dag)

t6 = PythonOperator(task_id='EU_JP_sale',
                        python_callable=EU_JP_sale_def,
                        dag=dag)
t7 = PythonOperator(task_id='print_data',
                    python_callable=print_data,
                    dag=dag)


In [138]:
# последовательность выполнения тасок
t1 >> [t2, t3, t4, t5, t6] >> t7

<Task(PythonOperator): print_data>