In [1]:
import sys
from pathlib import Path

current_dir = Path.cwd()
parent_dir = current_dir.parent
print(f"root_dir: {parent_dir}")
sys.path.append(str(parent_dir))

root_dir: /Users/afaque.ahmad/Documents/Projects/Masan/Repos/wfx


In [2]:
from src.wfx.processors.airflow import AirflowProcessor
from src.wfx.processors.gsheet import GsheetProcessor
import json

In [3]:
dag_name = "data_processing"
gsheet_path = parent_dir / "inputs/mappings/" / dag_name / "task_list_all.csv"
input_dag_path = parent_dir / "inputs/airflow_dags"
output_workflow_path = parent_dir / "resources/jobs"
output_workflow_name = "sample_databricks_etl"

print(f"dag_name: {dag_name}")
print(f"gsheet_path: {gsheet_path}")
print(f"input_dag_path: {input_dag_path}")
print(f"output_workflow_path: {output_workflow_path}")
print(f"output_workflow_name: {output_workflow_name}")

dag_name: data_processing
gsheet_path: /Users/afaque.ahmad/Documents/Projects/Masan/Repos/wfx/inputs/mappings/data_processing/task_list_all.csv
input_dag_path: /Users/afaque.ahmad/Documents/Projects/Masan/Repos/wfx/inputs/airflow_dags
output_workflow_path: /Users/afaque.ahmad/Documents/Projects/Masan/Repos/wfx/resources/jobs
output_workflow_name: sample_databricks_etl


In [6]:
gsheet_processor = GsheetProcessor(dag_name, str(gsheet_path))
airflow_processor = AirflowProcessor(dag_name, gsheet_processor, str(input_dag_path))
target_tasks = airflow_processor.build_save_target_tasks()

Info: No params found for task_id data_validation_task of operator validate_input_data
Info: No params found for task_id data_processing_task of operator process_data
Info: No params found for task_id data_export_task of operator export_processed_data
Info: No params found for task_id report_generation_task of operator generate_report
Info: No params found for task_id downstream_trigger_task of operator trigger_downstream_dag
Writing /Users/afaque.ahmad/Documents/Projects/Masan/Repos/wfx/inputs/dbx_workflows/data_processing_tasks_config.json


In [7]:
gsheet_processor.get_old_task_name_list()

['validate_input_data',
 'process_data',
 'export_processed_data',
 'generate_report',
 'trigger_downstream_dag']

In [8]:
print(json.dumps(target_tasks, indent=2))

{
  "tasks": [
    {
      "task_key": "data_validation_task",
      "depends_on": [],
      "notebook_task": {
        "notebook_path": "/repos/data_team/etl/validation",
        "source": "WORKSPACE",
        "base_parameters": {}
      }
    },
    {
      "task_key": "data_processing_task",
      "depends_on": [
        {
          "task_key": "data_validation_task"
        }
      ],
      "notebook_task": {
        "notebook_path": "/repos/data_team/etl/processing",
        "source": "WORKSPACE",
        "base_parameters": {}
      }
    },
    {
      "task_key": "data_export_task",
      "depends_on": [
        {
          "task_key": "data_processing_task"
        }
      ],
      "notebook_task": {
        "notebook_path": "/repos/data_team/etl/export",
        "source": "WORKSPACE",
        "base_parameters": {}
      }
    },
    {
      "task_key": "report_generation_task",
      "depends_on": [
        {
          "task_key": "data_processing_task"
        }
      ],
    

In [9]:
target_tasks = airflow_processor.build_target_tasks()
target_tasks

Info: No params found for task_id data_validation_task of operator validate_input_data
Info: No params found for task_id data_processing_task of operator process_data
Info: No params found for task_id data_export_task of operator export_processed_data
Info: No params found for task_id report_generation_task of operator generate_report
Info: No params found for task_id downstream_trigger_task of operator trigger_downstream_dag


{'tasks': [{'task_key': 'data_validation_task',
   'depends_on': [],
   'notebook_task': {'notebook_path': '/repos/data_team/etl/validation',
    'source': 'WORKSPACE',
    'base_parameters': {}}},
  {'task_key': 'data_processing_task',
   'depends_on': [{'task_key': 'data_validation_task'}],
   'notebook_task': {'notebook_path': '/repos/data_team/etl/processing',
    'source': 'WORKSPACE',
    'base_parameters': {}}},
  {'task_key': 'data_export_task',
   'depends_on': [{'task_key': 'data_processing_task'}],
   'notebook_task': {'notebook_path': '/repos/data_team/etl/export',
    'source': 'WORKSPACE',
    'base_parameters': {}}},
  {'task_key': 'report_generation_task',
   'depends_on': [{'task_key': 'data_processing_task'}],
   'notebook_task': {'notebook_path': '/repos/data_team/etl/reporting',
    'source': 'WORKSPACE',
    'base_parameters': {}}},
  {'task_key': 'downstream_trigger_task',
   'depends_on': [{'task_key': 'data_export_task'},
    {'task_key': 'report_generation_task

In [None]:
type(target_tasks)

In [13]:
from src.wfx.core.converter import DBXDabConverter
from pathlib import Path

PROJECT_ROOT = Path.cwd().parent
print(f"PROJECT_ROOT: {PROJECT_ROOT}")

target_workflow_name = "sample_databricks_etl"
output_workflow_path = PROJECT_ROOT / "resources/jobs"

converter = DBXDabConverter()

dabs_job = converter.convert_job_config(target_tasks, target_workflow_name)
write_path = str(output_workflow_path / target_workflow_name) + ".yml"

converter.save_dabs_yaml(dabs_job, str(write_path))
print(f"Successfully created DABs job configuration: {write_path}")

PROJECT_ROOT: /Users/afaque.ahmad/Documents/Projects/Masan/Repos/wfx
Successfully created DABs job configuration: /Users/afaque.ahmad/Documents/Projects/Masan/Repos/wfx/resources/jobs/sample_databricks_etl.yml
