In [None]:
import pandas as pd
import requests
import psycopg2
import psycopg2.extras

from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator

orders_url = 'https://tmp.mosyag.in/rb-airflow-hw2/orders.json'
tranzaction_url = 'https://api.jsonbin.io/b/5ed7391379382f568bd22822'
goods_url = 'https://raw.githubusercontent.com/cra/airflow-rosbank-2021q3-materials/master/hw2/goods.csv?token=AKFW6FJRQZLKK73IZGE7OP3BB2URE'
customer_url = 'https://raw.githubusercontent.com/Rmango77/DVC/master/customers.csv'

def create_connect(**kwargs):
    ti = kwargs['ti']
    try:
        conn = psycopg2.connect(database = "db5", user = "user5", password = "shauweVie0oo", host = "abc.ijklmn.xyz", port = "9876")
    except:
        print("I am unable to connect to the database") 
    ti.xcom_push(value=conn, key='db_connector')

def orders_operator(**kwargs):
    ti = kwargs['ti']
    conn = ti.xcom_pull(key='db_connector')
    cur = conn.cursor()
    
    orders = pd.read_json(orders_url, lines=True)
    
    try:
        cur.execute(f"""CREATE TABLE orders (order_id varchar, order_uuid varchar, sku_name varchar,\
                order_date varchar, qty varchar, full_name varchar, email varchar)""")
    except:
        print("I can't create Orders table!")
    conn.commit()
    
    tpls = [tuple(x) for x in orders[['order_id', 'order_uuid', 'sku_name', 'order_date', 'qty', 'full_name',\
       'email']].to_numpy()]
        query = """insert into orders(order_id, order_uuid, sku_name, order_date, qty, full_name,\
        email) VALUES ( %s, %s, %s, %s, %s, %s, %s)"""
    psycopg2.extras.execute_batch(cur, query, tpls)
    conn.commit()
    
    cur.close()

def tranzaction_operator(**kwargs):
    ti = kwargs['ti']
    conn = ti.xcom_pull(key='db_connector')
    cur = conn.cursor()
    
    transaction_status = pd.DataFrame(requests.get(tranzaction_url).json()).T
    transaction_status = transaction_status[transaction_status.success == True]
    transaction_status = transaction_status.reset_index().rename(columns={'index': 'id'})
    
    try:
        cur.execute(f"""CREATE TABLE transaction_status (id varchar, success varchar, errors varchar)""")
    except:
        print("I can't create transaction_status table!")
    conn.commit()
    
    tpls = [tuple(x) for x in transaction_status[['id', 'success', 'errors']].to_numpy()]
    query = """insert into transaction_status(id, success, errors) VALUES ( %s, %s, %s)"""
    psycopg2.extras.execute_batch(cur, query, tpls)
    conn.commit()
    
    cur.close()
    
def goods_operator(**kwargs):
    ti = kwargs['ti']
    conn = ti.xcom_pull(key='db_connector')
    cur = conn.cursor()
    
    goods_information = pd.DataFrame([i.split(',') for i in requests.get(goods_url).text.split('\n')],\
                                 columns=['id', 'goods_info', 'price'])
    
    try:
        cur.execute(f"""CREATE TABLE goods_information (id varchar, goods_info varchar, price varchar)""")
    except:
        print("I can't create goods_information table!")
    conn.commit()
    
    tpls = [tuple(x) for x in goods_information[['id', 'goods_info', 'price']].to_numpy()]
    query = """insert into goods_information(id, goods_info, price) VALUES ( %s, %s, %s)"""
    psycopg2.extras.execute_batch(cur, query, tpls)
    conn.commit()
    
    cur.close()

def customer_operator(**kwargs):
    ti = kwargs['ti']
    conn = ti.xcom_pull(key='db_connector')
    cur = conn.cursor()
    
    customer_info = pd.DataFrame([i.split(',') for i in requests.get(customer_url).text.split('\n')],\
                                 columns=['id', 'customer', 'birthday', 'sex', 'email'])
    
    try:
        cur.execute(f"""CREATE TABLE customer_info (id varchar, customer varchar, birthday varchar,\
                sex varchar, email varchar)""")
    except:
        print("I can't create customer_info table!")
    conn.commit()
    
    tpls = [tuple(x) for x in customer_info[['id', 'customer', 'birthday', 'sex', 'email']].to_numpy()]
    query = """insert into customer_info(id, customer, birthday, sex, email) VALUES ( %s, %s, %s, %s, %s)"""
    psycopg2.extras.execute_batch(cur, query, tpls)
    conn.commit()
    
    cur.close()
    
def concut_tables(**kwargs):
    ti = kwargs['ti']
    conn = ti.xcom_pull(key='db_connector')
    cur = conn.cursor()
    
    try:
        cur.execute(f"""SELECT * FROM orders""")
    except:
        print("I can't select from orders table!")
    orders = cur.fetchall()
    
    try:
        cur.execute(f"""SELECT * FROM transaction_status""")
    except:
        print("I can't select from transaction_status table!")
    transaction_status = cur.fetchall()
    
    try:
        cur.execute(f"""SELECT * FROM goods_information""")
    except:
        print("I can't select from goods_information table!")
    goods_information = cur.fetchall()
    
    try:
        cur.execute(f"""SELECT * FROM customer_info""")
    except:
        print("I can't select from customer_info table!")
    customer_info = cur.fetchall()
        
    transaction_status = transaction_status[transaction_status.success == True]
    transaction_status = transaction_status.reset_index().rename(columns={'index': 'id'})
    orders = orders.merge(transaction_status, left_on='order_uuid', right_on='id', how='inner')
    
    orders = orders.merge(customer_info, left_on=['email'], right_on=['email'], how='left')
    
    orders = orders.merge(goods_information, left_on=['sku_name'], right_on=['goods_info'], how='left')
    
    orders.birthday = orders.birthday.fillna('1800-12-01')
    orders.birthday = orders.birthday.apply(lambda x: x.lstrip())
    orders.birthday = orders.birthday.apply(lambda x: x.rstrip())
    orders.birthday = orders.birthday.apply(lambda x: datetime.strptime(x, '%Y-%m-%d').date())
    
    orders['age'] = orders.birthday.apply(lambda x: datetime.today().date().year - x.year)
    
    orders['price'] = orders['price'].astype('float64')
    orders['total_price'] = orders['qty'] * orders['price']
    
    try:
        cur.execute(f"""CREATE TABLE final_table (full_name varchar, age varchar, order_date varchar,\
                success varchar, total_price' varchar)""")
    except:
        print("I can't create final_table table!")
    conn.commit()
    
    tpls = [tuple(x) for x in orders[['full_name', 'age', 'sku_name', 'order_date',\
                                                 'success', 'total_price']].to_numpy()]
    query = """insert into final_table(full_name, age, sku_name, order_date,\
                                        success, total_price) VALUES ( %s, %s, %s)"""
    psycopg2.extras.execute_batch(cur, query, tpls)
    conn.commit()
    
    cur.close()
    conn.close()  
    

with DAG(
    dag_id='customers_orders',
    schedule_interval=None,
    start_date=days_ago(2)
) as dag:
    
    create_table = PythonOperator(
    task_id='create_table',
    python_callable=create_connect,
    dag=dag
    )
    
    orders = PythonOperator(
    task_id='orders',
    python_callable=orders_operator,
    dag=dag
    )
    
    tranzaction = PythonOperator(
    task_id='tranzaction',
    python_callable=tranzaction_operator,
    dag=dag
    )
    
    goods = PythonOperator(
    task_id='goods',
    python_callable=goods_operator,
    dag=dag
    )
    
    customer = PythonOperator(
    task_id='customer',
    python_callable=customer_operator,
    dag=dag
    )
    
    concat = PythonOperator(
    task_id='concat',
    python_callable=concut_tables,
    dag=dag
    )

    create_table >> orders >> tranzaction >> goods >> customer >> concat