In [1]:
"""
Updated: 15 Nov 2020
Tasks: Live data processing, weather data addition,
post processing and profiles addition, Regression run
Schedule: At the end of every 30 minutes
Description: Scheduled DAG to process OMS data arriving at 30 min interval,
adding the weather data and predicting using RDS model objects
Environment: Composer-0002
Run-time environments: Pyspark,SparkR and python callable
"""

'\nUpdated: 15 Nov 2020\nTasks: Live data processing, weather data addition,\npost processing and profiles addition, Regression run\nSchedule: At the end of every 30 minutes\nDescription: Scheduled DAG to process OMS data arriving at 30 min interval,\nadding the weather data and predicting using RDS model objects\nEnvironment: Composer-0002\nRun-time environments: Pyspark,SparkR and python callable\n'

In [1]:
import logging
logging.basicConfig(level=logging.INFO)
import subprocess
import datetime
from airflow.models import Variable
from airflow.contrib.operators.dataproc_operator import (DataProcPySparkOperator)
from airflow.models import DAG

ModuleNotFoundError: No module named 'airflow'

In [7]:
# copy config file from gcs to root

BASHCOMMAND = 'gsutil cp gs://aes-analytics-0002-curated/Outage_Restoration/Live_Data_Curation/Config/confignew0002.ini /root/'
PROCESS = subprocess.Popen(BASHCOMMAND.split(), stdout=subprocess.PIPE)
OUTPUT, ERROR = PROCESS.communicate()
logging.info("Config file loaded to cluster")
logging.info('\n')

INFO:root:Config file loaded to cluster
INFO:root:



In [None]:
# ===================Variables=================================
ENV = Variable.get("env")
logging.info(ENV)

JOB_NAME = 'outage_end_end'
PROJECT = 'aes-datahub-'+ENV
COMPOSER_NAME = 'composer-'+ENV
BUCKET = 'aes-analytics-0002-curated'
COMPOSER_BUCKET = 'us-east4-composer-0002-8d07c42c-bucket'
DATAPROC_BUCKET = 'aes-datahub-0002-temp'
RAW_BUCKET = 'aes-datahub-'+ENV+'-raw'
CLUSTER_NAME = 'outage-python-cluster-0002'
BQ_PROJECT = "aes-analytics-0002"
BQ_DATASET = "mds_outage_restoration"
BQ_TABLE_CHVG = "IPL_Live_Input_Master_Dataset"
BQ_TABLE_FINAL = "IPL_LIVE_PREDICTIONS"
BQ_TABLE_REPO = "IPL_PREDICTIONS"
CLUSTER_NAME_R = 'outage-r-cluster-0002'
EMAIL = ['musigma.bkumar.c@aes.com']
BQ_DATASET_NAME = "mds_outage_restoration"

OUTPUT_DATE = datetime.datetime.now().strftime("%Y%m%d")

YESTERDAY = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())
START_TIME = datetime.datetime(2020, 10, 27, 14, 00, 00)

In [None]:
# =================== DAG Arguments =================================
DEFAULT_ARGS = {
    'start_date': START_TIME,
    'email_on_failure': True,
    'EMAIL': ['musigma.bkumar.c@aes.com', 'ms.gkumar.c@aes.com', 'eric.nussbaumer@aes.com'],
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': datetime.timedelta(minutes=1),
    # Consistent network configs for all tasks
    'gcp_conn_id': 'google_cloud_default',
    'subnetwork_uri': COMPOSER_NAME,
    'internal_ip_only': True,
    'region': 'us-east4',
    'zone': 'us-east4-c',
    'labels': {'resource-owner': 'datascience',
               'financial-identifier': 'digital'}}

In [None]:

# =================== DAG Definition =================================
with DAG(
        dag_id=JOB_NAME,
        default_args=DEFAULT_ARGS,
        schedule_interval='*/32 * * * *'
) as dag:
    OMS_LIVE_DATASET_PREPROCESSING = DataProcPySparkOperator(task_id='OMS_LIVE_DATA_COLLATION',
                                                         main='gs://us-east4-composer-0002-8d07c4'\
                                                               '2c-bucket/data/'\
                                                               'Outage_restoration/IPL/'\
															   'Python_scripts/live_oms_'\
															   'preprocessing_pylint.py',
                                                         arguments=None,
                                                         archives=None,
                                                         pyfiles=None,
                                                         files=None,
                                                         cluster_name='dp-outage-python-0002',
                                                         dataproc_pyspark_properties=None,
                                                         dataproc_pyspark_jars=None,
                                                         gcp_conn_id='google_cloud_default',
                                                         delegate_to=None,
                                                         region='us-east4',
                                                         job_error_states=['ERROR'],
                                                         dag=dag,
                                                         email_on_failure=None)

    WEATHER_DATA_COLLATION = DataProcPySparkOperator(task_id='WEATHER_DATA_COLLATION',
                                                     main='gs://us-east4-composer-0002-'\
                        							       '8d07c42c-bucket/data/Outage_'\
                        								   'restoration/IPL/Python_scripts/'\
                        								   'weather_source_data_collation'\
                        	    						   '_pylint.py',
                                                     arguments=None,
                                                     archives=None,
                                                     pyfiles=None,
                                                     files=None,
                                                     cluster_name='dp-outage-python-0002',
                                                     dataproc_pyspark_properties=None,
                                                     dataproc_pyspark_jars=None,
                                                     gcp_conn_id='google_cloud_default',
                                                     delegate_to=None,
                                                     region='us-east4',
                                                     job_error_states=['ERROR'],
                                                     dag=dag)

    CURATED_DATASET_CREATION = DataProcPySparkOperator(task_id='CURATED_DATASET_CREATION',
                                                       main='gs://us-east4-composer-0002-8d07c42c'\
                                                             '-bucket/data/Outage_restoration/IPL'\
                                                             '/Python_scripts/curated_dataset_'\
													         'creation_pylint.py',
                                                       arguments=None,
                                                       archives=None,
                                                       pyfiles=None,
                                                       files=None,
                                                       cluster_name='dp-outage-python-0002',
                                                       dataproc_pyspark_properties=None,
                                                       dataproc_pyspark_jars=None,
                                                       gcp_conn_id='google_cloud_default',
                                                       delegate_to=None,
                                                       region='us-east4',
                                                       job_error_states=['ERROR'],
                                                       dag=dag,
                                                       email_on_failure=None)

    REGRESSION_CODE_RUN = DataProcPySparkOperator(task_id='Regression_Run',
                                                  main='gs://us-east4-composer-0002-8d07c42c-'\
                                                        'bucket/data/Outage_restoration/IPL/'\
                                                        'Python_scripts/load_predict_pylint.py',
                                                  arguments=None,
                                                  archives=None,
                                                  pyfiles=None,
                                                  files=None,
                                                  cluster_name='dp-outage-python-0002',
                                                  dataproc_pyspark_properties=None,
                                                  dataproc_pyspark_jars=None,
                                                  gcp_conn_id='google_cloud_default',
                                                  delegate_to=None,
                                                  region='us-east4',
                                                  job_error_states=['ERROR'],
                                                  dag=dag)
# Create pipeline
OMS_LIVE_DATASET_COLLATION >> WEATHER_DATA_COLLATION >> CURATED_DATASET_CREATION >> REGRESSION_CODE_RUN