In [1]:
from finance.finance_statemnt import *
from finance.process import *
from datetime import datetime, timedelta
# from finance.stock import date_range
from airflow.models import Variable
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
import pandas as pd
import os
import logging

In [2]:
logger = logging.getLogger("airflow.task")

date_format = '%Y-%m-%d'
var = 'crawl_season'


In [3]:

datetime_object = datetime.strptime('20190501', '%Y%m%d')
logger.info("date time is %s" % datetime_object)

dates = season_range(datetime_object, datetime.now())
logger.info("dates is %s" % dates)


[[34m2023-04-30 12:53:20,532[0m] {[34m3773449962.py:[0m2} INFO[0m - date time is 2019-05-01 00:00:00[0m
[[34m2023-04-30 12:53:20,534[0m] {[34m3773449962.py:[0m5} INFO[0m - dates is [datetime.date(2019, 5, 15), datetime.date(2019, 8, 14), datetime.date(2019, 11, 14), datetime.date(2020, 3, 31), datetime.date(2020, 5, 15), datetime.date(2020, 8, 14), datetime.date(2020, 11, 14), datetime.date(2021, 3, 31), datetime.date(2021, 5, 15), datetime.date(2021, 8, 14), datetime.date(2021, 11, 14), datetime.date(2022, 3, 31), datetime.date(2022, 5, 15), datetime.date(2022, 8, 14), datetime.date(2022, 11, 14), datetime.date(2023, 3, 31)][0m


In [4]:
date = dates[0]
year, season, month = get_season(date)
tmp_path = os.getenv('tmp_dir')
path = os.path.join(tmp_path, 'financial_statement', str(year) + str(season))


In [5]:
    
if download_finance_statement(year, season,tmp_path):
    print ('Download success')

balance_sheet = {}
income_sheet = {}
cash_flows = {}
income_sheet_cumulate = {}
all_file = os.listdir(path)
#keep only HTLM files
all_file = [f for f in all_file if f.endswith('.html')]

Download success


In [6]:
for fname in all_file:

    logging.getLogger("airflow.task").info(f'Processing {fname}')
    dfs = read_html2019(os.path.join(path,fname))
    for df in dfs:
        if 'levels' in dir(df.columns):
            df.columns = list(range(df.values.shape[1]))
        # 假如html不完整，則略過
    if len(dfs) < 4:
        print('**WARRN html file broken', year, season, fname)
        continue

    stock_id = fname.split('.')[0]
    # 取得 balance sheet
    df = dfs[1].copy().drop_duplicates(subset=0, keep='last')
    df = df.set_index(0)
    balance_sheet[stock_id] = df[1].dropna()
    #balance_sheet = combine(balance_sheet, df[1].dropna(), stock_id)

    # 取得 income statement
    df = dfs[2].copy().drop_duplicates(subset=0, keep='last')
    df = df.set_index(0)

    # 假如有4個columns，則第1與第3條column是單季跟累計的income statement
    if len(df.columns) == 4:
        income_sheet[stock_id] = df[1].dropna()
        income_sheet_cumulate[stock_id] = df[3].dropna()
    # 假如有2個columns，則代表第3條column為累計的income statement，單季的從缺
    elif len(df.columns) == 2:
        income_sheet_cumulate[stock_id] = df[1].dropna()

    # 假如是第一季財報 累計 跟單季 的數值是一樣的
    if season == 1:
        income_sheet[stock_id] = df[1].dropna()

    # 取得 cash_flows
    df = dfs[3].copy().drop_duplicates(subset=0, keep='last')
    df = df.set_index(0)
    cash_flows[stock_id] = df[1].dropna()

# 將dictionary整理成dataframe
balance_sheet = pd.DataFrame(balance_sheet)
income_sheet = pd.DataFrame(income_sheet)
income_sheet_cumulate = pd.DataFrame(income_sheet_cumulate)
cash_flows = pd.DataFrame(cash_flows)

# 做清理
ret = {'balance_sheet':clean(year, season, balance_sheet), 'income_sheet':clean(year, season, income_sheet),
        'income_sheet_cumulate':clean(year, season, income_sheet_cumulate), 'cash_flows':clean(year, season, cash_flows)}

# 假如是第一季的話，則 單季 跟 累計 是一樣的
if season == 1:
    ret['income_sheet'] = ret['income_sheet_cumulate'].copy()

ret['income_sheet_cumulate'].columns = '累計' + ret['income_sheet_cumulate'].columns

pickle.dump(ret, open(os.path.join(tmp_path, 'financial_statement', 'pack' + str(year) + str(season) + '.pickle'), 'wb'))


[[34m2023-04-30 12:53:57,011[0m] {[34m4018814794.py:[0m3} INFO[0m - Processing 1506.html[0m
[[34m2023-04-30 12:53:57,448[0m] {[34m4018814794.py:[0m3} INFO[0m - Processing 6205.html[0m
[[34m2023-04-30 12:53:57,669[0m] {[34m4018814794.py:[0m3} INFO[0m - Processing 8028.html[0m
[[34m2023-04-30 12:53:57,831[0m] {[34m4018814794.py:[0m3} INFO[0m - Processing 3508.html[0m
[[34m2023-04-30 12:53:58,074[0m] {[34m4018814794.py:[0m3} INFO[0m - Processing 1907.html[0m
[[34m2023-04-30 12:53:58,654[0m] {[34m4018814794.py:[0m3} INFO[0m - Processing 5704.html[0m
[[34m2023-04-30 12:53:58,941[0m] {[34m4018814794.py:[0m3} INFO[0m - Processing 2752.html[0m
[[34m2023-04-30 12:53:59,288[0m] {[34m4018814794.py:[0m3} INFO[0m - Processing 6277.html[0m
[[34m2023-04-30 12:53:59,767[0m] {[34m4018814794.py:[0m3} INFO[0m - Processing 5301.html[0m
[[34m2023-04-30 12:54:00,256[0m] {[34m4018814794.py:[0m3} INFO[0m - Processing 3057.html[0m
[[34m2023-04-30 12:

In [9]:

# hook = PostgresHook(postgres_conn_id="_postgresql")
# engine = hook.get_sqlalchemy_engine()

tmp_path = os.getenv('tmp_dir')
path = os.path.join(tmp_path,'financial_statement')
#get all the pickle files.
pickle_files = [f for f in os.listdir(path) if f.endswith('.pickle')]
logging.info(f'Found {len(pickle_files)} pickle files.')


[[34m2023-04-30 13:11:13,967[0m] {[34m2352002157.py:[0m8} INFO[0m - Found 1 pickle files.[0m


In [10]:
pickle_files

['pack20191.pickle']

In [None]:

for file in pickle_files:
    logging.info(f'Processing {file}')
    dfs = pd.read_pickle(f'{path}/{file}')

    #get the data from the dataframe.
    for key in dfs.items():
        df = dfs[key]

        #test database
        if test:
            test_database(df,key)
        else:
            logging.info(f'Loading {key} into database.')
            df.to_sql(file.split('.')[0], engine, if_exists='append', index=False)
