In [2]:
import datetime
import time
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator , BranchPythonOperator
from airflow.operators.dummy import DummyOperator
import pandas

import psycopg2

In [12]:
start_date = Variable.get('DAG_STARTDATE')
start_date = datetime.datetime.strptime(start_date + ' 09:15:00','%Y%m%d %H:%M:%S')

datetime.datetime(2021, 9, 17, 9, 15)

# Setting

## Arguments

In [2]:
args = {
    'owner' : 'Buneo' ,
    'depends_on_past' : False ,
    'start_date' : start_date ,
    'email_on_failure' : 'buneostock@gmail.com' ,
    'email_on_retry' : 'buneostock@gmail.com' ,
    'retreis' : 2 ,
    'retry_delay' : datetime.timedelta(minutes = 5)
        }

dag = DAG('stock_thedaybefore_data',
          description = '' ,
          schedule_interval = '15 9 * * 1-5',
          default_args = args ,
          tags = ['stock'])

## Variables

In [3]:
postgres_password = Variable.get("Postgres_password")

## DataBase Connection

In [4]:
conn = psycopg2.connect(host = '127.0.0.1' , dbname = 'postgres' , user = 'postgres' , password = postgres_password)
cur = conn.cursor()

# CHECK_WORK_DATE

## 【 Function 】

In [5]:
def CHECK_WORK_DATE():
    work_D = pandas.read_sql("select date from work_date order by date desc limit 1" , con = conn)['date'].iloc[0]
    if work_D == datetime.date.today():
        return 'task_GET_STOCK_CREDIT'
    else :
        return 'task_NOT_TRADED_DATE'

## 【 BranchPythonOperator 】

In [6]:
task_CHECK_WORK_DATE = BranchPythonOperator(task_id = 'task_CHECK_WORK_DATE' , 
                                            python_callable = CHECK_WORK_DATE ,
                                            dag = dag)

# NOT_TRADED_DATE

## 【 DummyOperator 】

In [7]:
task_NOT_TRADED_DATE = DummyOperator(task_id = 'task_NOT_TRADED_DATE',
                                     dag = dag)

# GET_STOCK_CREDIT

## 【 Function 】

In [8]:
def STOCK_CREDIT():
    
    work_D = [pandas.read_sql("select date from work_date order by date desc limit 2" , con = conn)['date'].iloc[1]]
    
    for D in work_D:
        
        insert_D = D
        process_day = '{:0>4}-{:0>2}-{:0>2}'.format(insert_D.year,insert_D.month,insert_D.day)
        
        try :
            main = pandas.read_html("https://www.twse.com.tw/exchangeReport/MI_MARGN?response=html&date={:0>4}{:0>2}{:0>2}&selectType=ALL".format(insert_D.year,insert_D.month,insert_D.day))[1].iloc[:,[0,6,12,13]]
            main.columns = ['no','margin','short','total']
            main['date'] = process_day
            main['store'] = main.apply(lambda x : 'y' if len(x['no']) == 4 else 'n' , axis = 1)
            main = main[main['store']=='y']
            main = main[['date','no','margin','short','total']]
            
            tmp = pandas.read_sql("select * from credit_trade where date = '{}'".format(process_day) , con = conn)
            
            i_table = pandas.merge(left = main , right = tmp , how = 'left' , on = 'no')
            i_table = i_table[i_table['date_y'].isnull()].iloc[:,:5]
            
            for _ , data in i_table.iterrows():
                cur.execute("insert into credit_trade (date,no,margin,short,total) values (%s,%s,%s,%s,%s)" , data)
            cur.execute("commit")
            
            i_table = ''
            
            print('【 (STOCK) Credit Trade 】{} data inserted.'.format(process_day))
            time.sleep(20)
                    
        except ValueError :
            print('【 (STOCK) Credit Trade 】{} No data'.format(process_day))
            pass

## 【 PythonOperator 】

In [9]:
task_GET_STOCK_CREDIT = PythonOperator(task_id = 'task_GET_STOCK_CREDIT' , 
                                       python_callable = STOCK_CREDIT ,
                                       dag = dag)

# GET_OTC_CREDIT

## 【 Function 】

In [10]:
def OTC_CREDIT():
    
    work_D = [pandas.read_sql("select date from work_date order by date desc limit 2" , con = conn)['date'].iloc[1]]
    
    for D in work_D:
        
        insert_D = D
        process_day = '{:0>4}-{:0>2}-{:0>2}'.format(insert_D.year,insert_D.month,insert_D.day)
        
        try:
            main = pandas.read_html("https://www.tpex.org.tw/web/stock/margin_trading/margin_balance/margin_bal_result.php?l=zh-tw&o=htm&d={0}/{1:0>2}/{2:0>2}&s=0,asc".format(insert_D.year-1911,insert_D.month,insert_D.day))[0].iloc[:-4,[0,6,14,17]]
            main.columns = ['no','margin','short','total']
            main['date'] = process_day
            main['store'] = main.apply(lambda x : 'y' if len(x['no']) == 4 else 'n' , axis = 1)
            main = main[main['store']=='y']
            main = main[['date','no','margin','short','total']]
            
            tmp = pandas.read_sql("select * from credit_trade where date = '{}'".format(process_day) , con = conn)
            
            i_table = pandas.merge(left = main , right = tmp , how = 'left' , on = 'no')
            i_table = i_table[i_table['date_y'].isnull()].iloc[:,:5]            
            
            for _ , data in i_table.iterrows():
                cur.execute("insert into credit_trade (date,no,margin,short,total) values (%s,%s,%s,%s,%s)" , data)
            cur.execute("commit")
            
            print('【 (OTC) Credit Trade 】{} data inserted.'.format(process_day))
            time.sleep(10)
                    
        except ValueError :
            print('【 (OTC) Credit Trade 】{} No data'.format(process_day))
            pass

## 【 PythonOperator 】

In [11]:
task_GET_OTC_CREDIT = PythonOperator(task_id = 'task_GET_OTC_CREDIT',
                                     python_callable = OTC_CREDIT ,
                                     dag = dag)

# GET_STOCK_FI_SHAREHOLDING

## 【 Function 】

In [12]:
def STOCK_FI_SHAREHOLDING():
    
    work_D = [pandas.read_sql("select date from work_date order by date desc limit 2" , con = conn)['date'].iloc[1]]
    
    for D in work_D:
        
        insert_D = '{:0>4}-{:0>2}-{:0>2}'.format(D.year,D.month,D.day)
        
        html = 'https://www.twse.com.tw/fund/MI_QFIIS?response=html&date={:0>4}{:0>2}{:0>2}&selectType=ALLBUT0999'.format(D.year,D.month,D.day)
        
        try:
            
            main = pandas.read_html(html)[0]
            main = main.iloc[:,[0,7,3]]
            main.columns = ['no','hold_percent','issued_amount']
            main['date'] = insert_D
            main['hold_percent'] = main['hold_percent']/100
            main['store'] = main.apply(lambda x : 'y' if len(x['no']) == 4 else 'n' , axis =1 )
            main = main[main['store']=='y']
            main = main[['date','no','hold_percent','issued_amount']]
            
            tmp = pandas.read_sql("select * from FI_HOLD where date = '{}'".format(insert_D) , con = conn)
            
            i_table = pandas.merge(left = main , right = tmp , how = 'left' , on = 'no')
            i_table = i_table[i_table['date_y'].isnull()].iloc[:,:4]  
            
            for _ , data in i_table.iterrows():
                cur.execute("insert into FI_HOLD (date , no , hold_percent,issued_amount) values (%s,%s,%s,%s)",data)
            cur.execute("commit")
            print("【 Stock Share Holding 】{} data inserted. ".format(insert_D))
            
            time.sleep(15)    
            
        except ValueError :
            print("【 Stock Share Holding 】{} no data. ".format(insert_D))

## 【 PythonOperator 】

In [13]:
task_GET_STOCK_FI_SHAREHOLDING = PythonOperator(task_id = 'task_GET_STOCK_FI_SHAREHOLDING',
                                                python_callable = STOCK_FI_SHAREHOLDING ,
                                                dag = dag)

# GET_STOCK_ISSUED_AMOUNTS

## 【 Function 】

In [14]:
def STOCK_ISSUED_AMOUNTS():
    
    work_D = [pandas.read_sql("select date from work_date order by date desc limit 2" , con = conn)['date'].iloc[1]]
    
    for D in work_D:
        
        insert_D = '{:0>4}{:0>2}{:0>2}'.format(D.year,D.month,D.day)
        html = 'https://www.twse.com.tw/fund/MI_QFIIS?response=html&date={0}&selectType=ALLBUT0999'.format(insert_D)
        
        try:
            main = pandas.read_html(html)[0]
            main = main.iloc[:,[0,3]]
            main['date'] = '{:0>4}-{:0>2}-{:0>2}'.format(D.year,D.month,D.day)
            main.columns = ['no','amounts','date']
            main['store'] = main.apply(lambda x : 'y' if len(x['no']) == 4 else 'n' , axis = 1)
            main['type'] = 'STOCK'
            main = main[main['store'] == 'y'][['date','no','amounts','type']]
            
            tmp = pandas.read_sql("select * from issued_amounts where date = '{}'".format(insert_D) , con = conn)
            
            i_table = pandas.merge(left = main , right = tmp , how = 'left' , on = 'no')
            i_table = i_table[i_table['date_y'].isnull()].iloc[:,:4]
            
            for _ , data in i_table.iterrows():
                cur.execute("insert into issued_amounts (date , no , amounts,type) values (%s,%s,%s,%s)",data)
            cur.execute("commit")
            
            print('【 Stock issued amounts 】{} data inserted . '.format(insert_D))
            time.sleep(10)
            
        except ValueError:
            print('【 Stock issued amounts 】{} No data'.format(insert_D))
            pass


## 【 PythonOperator 】

In [15]:
task_GET_STOCK_ISSUED_AMOUNTS = PythonOperator(task_id = 'task_GET_STOCK_ISSUED_AMOUNTS',
                                               python_callable = STOCK_ISSUED_AMOUNTS ,
                                               dag = dag)

# DISTINCT_ISSUED_AMOUNTS

## 【 Function 】

In [16]:
def DISTINCT_ISSUED_AMOUNTS():
    main = pandas.read_sql("select date , no , count(*) from issued_amounts group by date , no having count(*) > 1" , con = conn)
    main = main[['date','no']]
    main['type'] = 'OTC'
    for _ , data in main.iterrows():
        cur.execute("delete from issued_amounts where date = %s and no = %s and type = %s",data)
    cur.execute("commit")
    
    print('DISTINCT_ISSUED_AMOUNTS is finished .')

## 【 PythonOperator 】

In [18]:
task_DISTINCT_ISSUED_AMOUNTS = PythonOperator(task_id = 'task_DISTINCT_ISSUED_AMOUNTS' ,
                                              python_callable = DISTINCT_ISSUED_AMOUNTS ,
                                              dag = dag)

# DAGs

In [19]:
task_CHECK_WORK_DATE >> task_NOT_TRADED_DATE
task_CHECK_WORK_DATE >> task_GET_STOCK_CREDIT >> task_GET_OTC_CREDIT >> task_GET_STOCK_FI_SHAREHOLDING >> task_GET_STOCK_ISSUED_AMOUNTS >> task_DISTINCT_ISSUED_AMOUNTS

<Task(PythonOperator): task_DISTINCT_ISSUED_AMOUNTS>