In [None]:
from datetime import datetime, timedelta
import boto3
from oksk.airflow.dags.base_dag import OkskDAG
from oksk.airflow.operators.spark_operator import SparkOperator
from airflow.providers.amazon.aws.sensors.s3_key import S3KeySensor
from airflow.operators.python import ShortCircuitOperator
from airflow.operators.python import BranchPythonOperator
from airflow.models import Variable
import logging
import sys


logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logging.captureWarnings(True)


WFL_NAME = 'test_ncp'
WFL_DESC = 'Numbers change people'
WFL_VERSION = 'v1'
WFL_OWNER = ####
TLG_USR = ####
DAG_ID = '{}_{}'.format(WFL_NAME, WFL_VERSION)
IMAGE_NAME = WFL_NAME

envs = {
    'S3_ENDPOINT': Variable.get('S3_ENDPOINT'),
    'S3_ACCESS_KEY': Variable.get('S3_ACCESS_KEY'),
    'S3_SECRET_KEY': Variable.get('S3_SECRET_KEY')
}

default_args = {
    'owner': WFL_OWNER,
    'depends_on_past': False,
    'start_date': datetime(2022, 1, 1),
    'retries': 4,
    'retry_delay': timedelta(seconds=60),
    'params': {
        'tg_username': TLG_USR
    }
    }


dag = OkskDAG(
    dag_id=DAG_ID,
    description=WFL_DESC,
    default_args=default_args,
    schedule_interval="@monthly",
    max_active_runs=1,
    tags=['test_model']
                )
    
s3_sms = S3KeySensor(
    task_id='s3_sms',
    bucket_key="data/abonent_vector/taxonomy_first_layer/{{execution_date.strftime('date=%Y-%m-%d')}}/_SUCCESS",
    aws_conn_id='airflow-data',
    mode='reschedule',
    wildcard_match=True,
    poke_interval=10,
    timeout=3600,
    soft_fail=False,
    bucket_name='oksk',
    verify=False, 
    dag=dag
)

test = SparkOperator(
    task_id='test',
    application='{}/run.ipynb'.format(WFL_NAME),
    application_args=['{{ds}}', '{{ts}}'],
    application_envs=envs,
    executor_instances=2,
    executor_cores=2,
    executor_memory='2g', 
    dag=dag
)

s3_sms >> test
