> Необходимо внести оптимизацию в процесс: для срочных командировок перед заказом билетов сотрудник АХД должен загрузить по коду пункта назначения и времени вылета данные по задержкам рейсов из файла (flight_delays.csv) или таблицы flight_delays. В ответ надо вернуть целиком строку из flight_delays, но код месяца необходимо перекодировать по таблице Month.

Перед выполнением задания установить Apache Airflow.

Задание:

* Данные по задержками рейсов находятся в таблице public.flight_delays в базе данных PG (или файле flight_delays.csv). \li

* В этой таблице месяцы (столбец Month) закодированы в виде значений c-1, c-2 и т.д.

* Выполнить перекодировку месяцев – написать DAG для Apache Airflow. Данные по перекодировке месяцев будут находится в файле Month.csv на сервере с Apache Airflow.


Ожидаемый результат: DAG файл (Python), выполняющий все необходимые действия.
Файл приложить к текущему заданию.

Пример файла во вложении.

Полезные ссылки:

https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html

https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.join.html


1) Загружает данные из таблицы public.flight_delays (или файла flight_delays.csv)
2) Загружает данные из файла Month.csv
3) Выполняет merge по столбцам Month и month_code
4) Вывести результат функцией print


In [219]:
import pandas as pd

In [220]:
fight_delays = pd.read_csv('flight_delays.csv',encoding='cp1251')

In [221]:
Month_table = pd.read_csv('Month.csv', header=None, encoding='cp1251', sep=';',names = ['code', 'Month'])


In [222]:
merged_df = fight_delays.merge(Month_table, left_on='Month', right_on='code', how='left')
merged_df.Month_x = merged_df.Month_y
merged_df.drop(['code', 'Month_y'], axis=1, inplace=True)
merged_df.rename(columns={'Month_x': 'Month'}, inplace=True)

In [223]:

merged_df

Unnamed: 0,Month,DayofMonth,DayOfWeek,DepTime,UniqueCarrier,Origin,Dest,Distance
0,Jule,c-25,c-3,615,YV,MRY,PHX,598
1,April,c-17,c-2,739,WN,LAS,HOU,1235
2,December,c-2,c-7,651,MQ,GSP,ORD,577
3,March,c-25,c-7,1614,WN,BWI,MHT,377
4,June,c-6,c-3,1505,UA,ORD,STL,258
...,...,...,...,...,...,...,...,...
99995,June,c-5,c-2,852,WN,CRP,HOU,187
99996,November,c-24,c-6,1446,UA,ORD,LAS,1515
99997,January,c-30,c-2,1509,OO,ORD,SGF,438
99998,January,c-5,c-5,804,DL,LGA,ATL,761


In [228]:
from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd
from datetime import datetime

def load_fights(flight_path:str='flight_delays.csv') -> pd.DataFrame:
    fight_delays = pd.read_csv(flight_path, encoding='cp1251')
    return fight_delays

def load_moths(months_path:str='Month.csv') -> pd.DataFrame:
    Month_table = pd.read_csv(months_path, header=None, encoding='cp1251', sep=';', names = ['code', 'Month'])
    return Month_table

def merge_data_by_months(fight_delays:pd.DataFrame, Month_table:pd.DataFrame) -> pd.DataFrame:
    # fight_delays = load_fights(flight_path)
    # Month_table = load_moths(months_path)

    merged_df = fight_delays.merge(Month_table, left_on='Month', right_on='code', how='left')
    merged_df.Month_x = merged_df.Month_y
    merged_df.drop(['code', 'Month_y'], axis=1, inplace=True)
    merged_df.rename(columns={'Month_x': 'Month'}, inplace=True)

    return merged_df

def print_table(merged_df:pd.DataFrame) -> None:
    print(merged_df)

In [233]:
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 12, 11),
    'retries': 1
}

dag = DAG('flight_delay_analysis', default_args=default_args, schedule=None)

load_flight_delays_task = PythonOperator(
    task_id='load_flight_delays_data',
    python_callable=load_moths,
    dag=dag
)


load_month_data_task = PythonOperator(
    task_id='load_month_data',
    python_callable=load_moths,
    dag=dag
)

merge_data_task = PythonOperator(
    task_id='merge_flight_delays_with_month',
    python_callable=merge_data_by_months,
    # provide_context=True, provide_context is deprecated as of 2.0 and is no longer required
    dag=dag
)

print_result_task = PythonOperator(
    task_id='print_result_df',
    python_callable=print_table,
    # provide_context=True, provide_context is deprecated as of 2.0 and is no longer required
    dag=dag
)

In [234]:
load_flight_delays_task >> merge_data_task
load_month_data_task >> merge_data_task
merge_data_task >> print_result_task

<Task(PythonOperator): print_result_df>

In [224]:
from datetime import datetime
from airflow import DAG
import pandas as pd
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
import sqlalchemy
from sqlalchemy import create_engine, types
from datetime import timedelta


# 1: global variables:
##################################################
SRC_CONN = 'postgres_default'  # Airflow connection name to Xavier PostgreSQL
##################################################

# 2: json-file:
##################################################
JSON='/app/airflow/files/table2.json'
##################################################

# 3: sql query:
##################################################
TABLE_IN_QUERY="select * from table1"

##################################################


def tuto():
 table2 = pd.read_json(JSON)
 postgres_hook = PostgresHook(postgres_conn_id=SRC_CONN)
 engine = postgres_hook.get_sqlalchemy_engine()
 table1 = pd.read_sql(
             TABLE_IN_QUERY,
             engine)
 table1.columns = [ "FLOAT_VALUE", "ID", "INT_VALUE","STRING_VALUE",  "TABLE2_ID"]
 print(table1.head().to_string())
 
 table2.columns = ['TABLE2_ID', 'ATTRIBUTE1']   

 print(table2.to_string()) 
 
 df=pd.merge(table1, table2,  how='inner', on=['TABLE2_ID'])
 df2 = df[["TABLE2_ID", "FLOAT_VALUE", "INT_VALUE","STRING_VALUE","ATTRIBUTE1"]]
 dataTypeSeries = df2.dtypes
 df2.to_sql('table3', con=engine, if_exists='replace', chunksize=500, index=False)


 return  'OK'

dag = DAG('POSTGRES_TUTORIAL', description='connect_to_postgres', schedule=timedelta(days=1), start_date=datetime(2017, 3, 20), catchup=False)


hello_operator = PythonOperator(task_id='POSTGRES_TUTORIAL', python_callable=tuto, dag=dag)
