Skip to content

Commit

Permalink
Merge pull request #69 from SergeyBychkov/MG-4228
Browse files Browse the repository at this point in the history
MG-4228: Bump Airflow to 2.6
  • Loading branch information
SergeyBychkov committed Jun 5, 2023
2 parents 9641a16 + 1b42696 commit 51aa96d
Show file tree
Hide file tree
Showing 20 changed files with 2,029 additions and 1,368 deletions.
29 changes: 21 additions & 8 deletions data-detective-airflow/dags/dags/dummy/code/code.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
import yaml
from pandas import DataFrame
from airflow.utils.context import Context
from pandas import DataFrame, concat


def val_translate(context, in_df: DataFrame, file_name: str) -> DataFrame:
task = context.get('task')
def val_translate(context: Context, in_df: DataFrame, file_name: str) -> DataFrame:
"""Translate values
:param context: Airflow context
:param in_df: incoming df
:param file_name: translate file
:return:
"""
task = context.get("task")
out_df = in_df.copy()
with open(f'{task.dag.etc_dir}/{file_name}', 'r', encoding='utf-8') as cfg:
with open(f"{task.dag.etc_dir}/{file_name}", "r", encoding="utf-8") as cfg: # type: ignore
config = yaml.safe_load(cfg)
out_df['test'] = out_df.apply(
lambda row: config[row['test']],
axis=1
)
out_df["test"] = out_df.apply(lambda row: config[row["test"]], axis=1)
return out_df


def append_dfs(_context: Context, *sources: DataFrame) -> DataFrame:
"""Append several dfs in one
:param _context: Airflow context
:param sources: dfs
:return:
"""
return concat([*sources])
3 changes: 1 addition & 2 deletions data-detective-airflow/dags/dags/dummy/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ tasks:
source:
- df_now1
- transform
transformer_callable: >
lambda _context, df_now1, transform: df_now1.append(transform, sort=False)
transformer_callable: append_dfs
- task_id: sink
description: Write the result
type: data_detective_airflow.operators.sinks.PgSCD1
Expand Down
42 changes: 20 additions & 22 deletions data-detective-airflow/dags/dags/dummy_python/code/code.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,43 @@
from data_detective_airflow.constants import PG_CONN_ID

from data_detective_airflow.dag_generator.dags import TDag
from data_detective_airflow.operators.extractors import DBDump
from data_detective_airflow.operators.sinks import PgSCD1
from data_detective_airflow.operators.transformers import PyTransform
from data_detective_airflow.dag_generator.dags import TDag
from data_detective_airflow.operators.transformers import Append, PyTransform


def fill_dag(tdag: TDag):
DBDump(
task_id='test1',
task_id="test1",
conn_id=PG_CONN_ID,
sql='/code/test1.sql',
dag=tdag
sql="/code/test1.sql",
dag=tdag,
)

DBDump(
task_id='test2',
task_id="test2",
conn_id=PG_CONN_ID,
sql='/code/test1.sql',
dag=tdag
sql="/code/test1.sql",
dag=tdag,
)

PyTransform(
task_id='transform',
source=['test2'],
task_id="transform",
source=["test2"],
transformer_callable=lambda _context, df: df,
dag=tdag
dag=tdag,
)

PyTransform(
task_id='append_all',
source=['transform', 'test1'],
transformer_callable=lambda _context, transform, test1: transform.append(test1, sort=False),
dag=tdag
Append(
task_id="append_all",
source=["transform", "test1"],
dag=tdag,
)

PgSCD1(
task_id='sink',
source=['append_all'],
task_id="sink",
source=["append_all"],
conn_id=PG_CONN_ID,
table_name='test2',
key=['test'],
dag=tdag
table_name="test2",
key=["test"],
dag=tdag,
)
11 changes: 11 additions & 0 deletions data-detective-airflow/dags/dags/dummy_sql/code/code.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from airflow.utils.context import Context
from pandas import DataFrame, concat


def append_dfs(_context: Context, *sources: DataFrame) -> DataFrame:
"""Append several dfs in one
:param _context: Airflow context
:param sources: dfs
:return:
"""
return concat([*sources])
3 changes: 1 addition & 2 deletions data-detective-airflow/dags/dags/dummy_sql/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ tasks:
source:
- test1
- transform
transformer_callable: >
lambda _context, df_now1, transform: df_now1.append(transform, sort=False)
transformer_callable: append_dfs
- task_id: sink
description: Write the result
type: data_detective_airflow.operators.sinks.PgSCD1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,30 @@
from airflow import AirflowException
from typing import Any, Optional

from pandas import concat

from data_detective_airflow.operators.tbaseoperator import TBaseOperator


class Append(TBaseOperator):
"""Merge multiple objects into one
The object must have an append method
"""Merge multiple DataFrames into one
:param source: Source
:param append_options: Additional options for pandas.concat() method
:param kwargs: Additional params for the TBaseOperator
"""

ui_color = '#8f75d1'
ui_color = "#8f75d1"

def __init__(self, source: list[str], **kwargs):
def __init__(self, source: list[str], append_options: Optional[dict[str, Any]] = None, **kwargs):
super().__init__(**kwargs)
self.source = source
self.append_options = append_options or {}
for src in self.source:
self.dag.task_dict[src] >> self # pylint: disable=pointless-statement

def execute(self, context):
result = None
self.log.info('Start appending')
for src in self.source:
read_result = self.dag.task_dict[src].result.read(context)
if result is None:
result = read_result
if 'append' not in dir(result):
raise AirflowException('Object should have "append" method.')
continue
result = result.append(read_result, sort=False)
self.log.info('Writing pickle result')
self.result.write(result, context)
self.log.info('Finish')
self.log.info("Start appending...")
result = [self.dag.task_dict[src].result.read(context) for src in self.source]
self.log.info("Writing pickle result")
self.result.write(concat(result, **self.append_options), context)
self.log.info("Finish")
3,213 changes: 1,958 additions & 1,255 deletions data-detective-airflow/poetry.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions data-detective-airflow/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "data_detective_airflow"
license = "Apache-2.0"
version = "2.1.1"
version = "2.2.0"
description = "Framework with task testing over Apache Airflow"
authors = ["Tinkoff Data Detective Team"]
readme = 'README.md' # Markdown files are supported
Expand All @@ -21,13 +21,14 @@ classifiers = [

[tool.poetry.dependencies]
python = "^3.9"
apache-airflow = "~2.4"
apache-airflow = "~2.6"
apache-airflow-providers-amazon = "*"
botocore = "^1.29.144"
apache-airflow-providers-celery = "*"
apache-airflow-providers-postgres = "*"
apache-airflow-providers-redis = "*"
apache-airflow-providers-ssh = "*"
pandas = "^1.1, <1.6"
pandas = "~2.0"
petl = "^1.7"


Expand Down
4 changes: 0 additions & 4 deletions data-detective-airflow/tests_data/dags/dummy/append_all.md

This file was deleted.

3 changes: 0 additions & 3 deletions data-detective-airflow/tests_data/dags/dummy/df_now1.md

This file was deleted.

3 changes: 0 additions & 3 deletions data-detective-airflow/tests_data/dags/dummy/df_now2.md

This file was deleted.

4 changes: 0 additions & 4 deletions data-detective-airflow/tests_data/dags/dummy/sink.md

This file was deleted.

3 changes: 0 additions & 3 deletions data-detective-airflow/tests_data/dags/dummy/transform.md

This file was deleted.

This file was deleted.

3 changes: 0 additions & 3 deletions data-detective-airflow/tests_data/dags/dummy_petl/dump1.md

This file was deleted.

3 changes: 0 additions & 3 deletions data-detective-airflow/tests_data/dags/dummy_petl/dump2.md

This file was deleted.

4 changes: 0 additions & 4 deletions data-detective-airflow/tests_data/dags/dummy_petl/sink.md

This file was deleted.

This file was deleted.

This file was deleted.

12 changes: 0 additions & 12 deletions data-detective-airflow/tests_data/dags/dummy_processed/sink.md

This file was deleted.

0 comments on commit 51aa96d

Please sign in to comment.