<img src="https://upload.wikimedia.org/wikipedia/commons/d/de/AirflowLogo.png" width="150">

## ✅ สร้าง Dags ในเครื่อง _Active 2_ โดย Task 1 ให้ใช้คำสั่ง Check การทำงานของเครื่อง _Active 1_ ดังนี้

---
__Usage Package ลงไว้ที่เครื่อง Active 2__
* Ubuntu ```apt-get install -y netcat```
* CentOS ```yum install -y nc```

In [None]:
def checkActive():
    IPADDR = "192.168.10.23"
    PORT = "8080"
    check = os.system(f"nc -zvw10 {IPADDR}:{PORT}")
    if check == 0:
        raise AirflowSkipException
    else:
        return True

## ✅ Run Function ```checkActive``` ใน Task แรก

In [None]:
t1 = PythonOperator(
    task_id='checkActive',
    python_callable=checkActive,
    dag=dag
)

## ✅ Example Full code

In [None]:
import pandas as pd
import os
import duckdb as db
from sqlalchemy import create_engine
from urllib.parse import quote
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.exceptions import AirflowSkipException
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pyspark

os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf'
os.environ['JAVA_HOME'] = '/usr/local/jdk8u222-b10'
os.environ['HADOOP_USER_NAME']='hive'
os.environ['PYSPARK_PYTHON'] ='/HDFS01/anaconda3/envs/main/bin/python'
conf = pyspark.SparkConf().setAll([
     ('spark.driver.maxResultSize', '0'),
     ('spark.driver.memory', '4g'),
     ('spark.sql.repl.eagerEval.enabled','true'),
     ('hive.strict.managed.tables','false'),
     ('hive.metastore.uris', 'thrift://nn01.bigdata:9083'),
     ('metastore.client.capability.check','false')
    ])
spark = SparkSession.builder \
        .master("local[*]") \
        .appName("myApp") \
        .config(conf=conf) \
        .enableHiveSupport() \
        .getOrCreate();

now = datetime.now()
update_dt = now.strftime("%Y-%m-%d %H:%M:%S")
today = now.strftime("%Y-%m-%d")

#MemoryDb Connect
mydb = db.connect(database=':memory:', read_only=False)

args = {
    'owner': 'Dags_name',
    'depends_on_past': False,    
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

def checkActive():
    IPADDR = "192.168.10.23"
    PORT = "8080"
    check = os.system(f"nc -zvw10 {IPADDR}:{PORT}")
    if check == 0:
        raise AirflowSkipException
    else:
        return True
    
def getData():
    mydb.sql("INSTALL httpfs;")
    mydb.sql("LOAD httpfs;")
    df = db.execute("""SELECT
                    column0 idx, wage, education, experience, ethnicity,
                    CASE WHEN smsa = 'yes' THEN 'Yes' ELSE 'No' END AS smsa,
                    CASE WHEN parttime = 'yes' THEN 'Yes' ELSE 'No' END AS parttime,
                    CONCAT(UPPER(SUBSTRING(region, 1, 1)), SUBSTRING(region, 2)) AS region
                FROM
                    read_csv_auto
                ('https://vincentarelbundock.github.io/Rdatasets/csv/AER/CPS1988.csv');""").df()
    return df

def toHdfs():
    df = getData()
    df2 = spark.createDataFrame(df)
    if spark.sql('show tables in test_db') \
       .filter("tableName == 'nook'").count() > 0:
        df2.write \
            .mode('append') \
            .saveAsTable('test_db.nook')
    else:
        df2.write \
           .mode('overwrite') \
           .saveAsTable('test_db.nook')

dag = DAG(
    dag_id='Dags_name',
    default_args=args,
    start_date= datetime(2023, 5, 10),
    description='Dags_name',
    catchup=False,
    schedule_interval='15 15 * * *',
)
t1 = PythonOperator(
    task_id='checkActive',
    python_callable=checkActive,
    dag=dag
)

t2 = PythonOperator(
    task_id='getData',
    python_callable=getData,
    dag=dag
)

t3 = PythonOperator(
    task_id='toHdfs',
    python_callable=toHdfs,
    dag=dag
)


t1 >> t2 >> t3