In [1]:
import sys,os
sys.path.append("..")
import django
django.setup()
import pandas as pd
from io import StringIO
import requests
from crawlers.finlab.crawler_import_tools import *
from sqlalchemy import create_engine
import datetime
from crawlers.models import *

#  爬蟲測試範例

In [2]:
def crawl_stockpricetw(date):
    
    # 上市爬蟲,將 date 變成字串 舉例：'20180525' 
    datestr = date.strftime('%Y%m%d')
    
    # 從網站上依照 datestr 將指定日期的股價抓下來
    r = requests.post('http://www.twse.com.tw/exchangeReport/MI_INDEX?response=csv&date=' + datestr + '&type=ALLBUT0999')
    
    # 將抓下來的資料（r.text），其中的等號給刪除
    content = r.text.replace('=', '')
    
    # 將 column 數量小於等於 10 的行數都刪除
    lines = content.split('\n')
    lines = list(filter(lambda l:len(l.split('",')) > 10, lines))
    
    # 將每一行再合成同一行，並用肉眼看不到的換行符號'\n'分開
    content = "\n".join(lines)
    
    # 假如沒下載到，則回傳None（代表抓不到資料）
    if content == '':
        return None
    
    # 將content變成檔案：StringIO，並且用pd.read_csv將表格讀取進來
    df = pd.read_csv(StringIO(content))
      
    
    # 將表格中的元素都換成字串，並把其中的逗號刪除
    df = df.astype(str)
    df = df.apply(lambda s: s.str.replace(',', ''))
    
   # 將「證券代號」的欄位改名成「stock_id」
    df = df.rename(columns={'證券代號':'stock_id'})
    #設定date欄位
    df['date'] = pd.to_datetime(date)
    # 將 「stock_id」與「date」設定成index 
    df = df.set_index(['stock_id','date'])
    
    # 保留證券名稱,將所有的表格元素都轉換成數字，error='coerce'的意思是說，假如無法轉成數字，則用 NaN 取代
    df =pd.concat([df.iloc[:,:1],df.iloc[:,1:].apply(lambda s:pd.to_numeric(s, errors='coerce'))],axis=1)
    
    # 刪除不必要的欄位
    df = df[df.columns[df.isnull().all() == False]]
    
    #新增欄位     
    df=df.loc[:,["證券名稱",'成交股數','成交金額','開盤價','收盤價','最高價','最低價']]
    
    #上櫃爬蟲，將 date 變成字串 舉例：'1071012' 
    Y=str(int(date.strftime('%Y'))-1911)

    datestr = Y+'/'+date.strftime('%m')+'/'+date.strftime('%d')
    link = 'http://www.tpex.org.tw/web/stock/aftertrading/daily_close_quotes/stk_quote_download.php?l=zh-tw&d='+datestr+'&s=0,asc,0'
    r = requests.get(link)

    lines = r.text.replace('\r', '').split('\n')
    df2 = pd.read_csv(StringIO("\n".join(lines[3:])), header=None)
    
    #設定欄名
    df2.columns = list(map(lambda l: l.replace(' ',''), lines[2].split(',')))
    
    #資料處理
    df2 = df2.apply(lambda s: s.str.replace(',', '')).apply(lambda s: s.str.replace('+', ''))
    df2 = df2.rename(columns={'代號':'舊證券代號',"名稱":"證券名稱","收盤":"收盤價","漲跌":"漲跌價","開盤":"開盤價",
                        "最高":"最高價","最低":"最低價",'成交金額(元)':'成交金額'})
    df2['stock_id']=df2['舊證券代號']
    df2=pd.concat([df2.iloc[:,:1].apply(lambda s:pd.to_numeric(s,errors='coerce')),df2.iloc[:,1:]],axis=1)
    df2= df2[df2['舊證券代號']<9999]
    df2['date'] = pd.to_datetime(date)
    df2 = df2.set_index(['stock_id','date'])
    df2=df2.drop(['舊證券代號'],axis=1)
    df2 =pd.concat([df2.iloc[:,:1],df2.iloc[:,1:].apply(lambda s:pd.to_numeric(s, errors='coerce'))],axis=1)

    df2=df2.loc[:,["證券名稱",'成交股數','成交金額','開盤價','收盤價','最高價','最低價']]
    
    #上市與上櫃合體
    df3=pd.concat([df,df2],axis=0)
    df3 = df3.rename(columns={'證券名稱':'stock_name',"成交股數":"turnover_vol",
                              "成交金額":"turnover_price","開盤價":"open_price",
                              "收盤價":"close_price","最高價":"high_price",
                              "最低價":"low_price"})
    df3.iloc[:,3:]=df3.iloc[:,3:].apply(lambda s:pd.to_numeric(s,errors='coerce'))
    
    df3=df3.where(pd.notnull(df3),None)
    df3=df3.reset_index()
    return df3

#輸入日期
import datetime
df=crawl_stockpricetw(datetime.datetime(2010,12,10))


#執行主程式
D=CrawlerProcess(crawl_stockpricetw,StockPriceTW)
print(D)
D.auto_update_crawl()

  result = self._query(query)


crawlers_stockpricetw table_latest_date:2020-03-31 18:40:47
Finish Update Work


In [None]:
df['date'].iloc[0].strftime('%Y-%m-%d')

In [None]:
D.specified_date_crawl('2020-03-31','2020-03-31')

# Bulk 功能批次匯入資料庫測試

# # 檢查日期

加快寫入速度

In [None]:
table=StockPriceTW._meta.db_table
date='2005-02-14'
a="SELECT date FROM "+table+" where date ='"+date+"'"
cursor = list(engine.execute(a))
cursor

def in_date_list(conn, model_name,check_date):
    table=model_name._meta.db_table
    cursor = list(conn.execute("SELECT date FROM "+table+" where date ='"+check_date+"'"))
    if len(cursor)>0:
        return True
    else:
        return False

y=in_date_list(engine, StockPriceTW,'2005-2-14')
y

## 主程式

In [None]:
connect_info = 'mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8'.format("root","benbilly3@","localhost",3306,"finlab_django")
engine = create_engine(connect_info)

def add_to_sql(model_name, df):

    bulk_update_data=[]
    bulk_create_data=[] 
    
    # if data_date isn't in table,process bulk_create
    data_date=df['date'].iloc[0].strftime('%Y-%m-%d')
    check_date=in_date_list(engine,model_name,data_date)
    
    if check_date == False:
    # Change CSV to iterrow
        for index, item in df.iterrows():
            # Use bulk_update to update obj,PS:must include primekey column
            try:
                obj_create_data = dict((field.name,item[field.name]) for field in model_name._meta.fields if
                                    field.name != 'id')
                obj_create=model_name(**obj_create_data)
                bulk_create_data.append(obj_create)            
                print(f"create{' '}{model_name}{' '}Stock_id:{item['stock_id']}{' '}Date:{item['date']}")

            except Exception as e:
                print(f"error{' '}{e}{' '}Stock_id:{item['stock_id']}{' '}Date:{item['date']}")
                pass            
 

    else:
        # Change CSV to iterrow
        for index, item in df.iterrows():

            # Use bulk_update to update obj,PS:must include primekey column
            try:
                obj_check = model_name.objects.get(stock_id=item['stock_id'], date=item['date'])
                obj_update_data = list((field.name,item[field.name]) if field.name !='id' else (field.name,obj_check.id) for field in model_name._meta.fields)

                for attributes,update_value in obj_update_data:
                    obj_check.attribute=update_value

                bulk_update_data.append(obj_check)
                print(f"update{' '}{model_name}{' '}Stock_id:{item['stock_id']}{' '}Date:{item['date']}")

            # Use dict to bulk_create obj when get nothing ,process incomplete data
            except ObjectDoesNotExist:
                obj_create_data = dict((field.name,item[field.name]) for field in model_name._meta.fields if
                                    field.name != 'id')
                obj_create=model_name(**obj_create_data)
                bulk_create_data.append(obj_create)            
                print(f"create{' '}{model_name}{' '}Stock_id:{item['stock_id']}{' '}Date:{item['date']}")

            except Exception as e:
                print(f"error{' '}{e}{' '}Stock_id:{item['stock_id']}{' '}Date:{item['date']}")
                pass
    
    # Process bulk
    model_name.objects.bulk_create(bulk_create_data, batch_size=1000)
    print(f"Finish{' '}{model_name}{'bulk_create'}{':'}{len(bulk_create_data)}")
    update_fields_area= [field.name for field in model_name._meta.fields if field.name !='id']
    model_name.objects.bulk_update(bulk_update_data,update_fields_area, batch_size=1000)
    print(f"Finish{' '}{model_name}{'bulk_update'}{':'}{len(bulk_update_data)}")
    





# 爬蟲執行檔Class測試

In [4]:
class CrawlerProcess:

    def __init__(self, func, model_name):
        self.crawler_func_name = func
        self.model_name = model_name
        self.table_latest_date = table_latest_date(engine, self.model_name._meta.db_table)

    def __repr__(self):
        return str(self.model_name._meta.db_table) + ' ' + "table_latest_date:" + str(self.table_latest_date)

    def crawl_process(self, date_list: list):
        for d in date_list:
            df = self.crawler_func_name(d)
            try:
                ret = df.drop_duplicates(['stock_id', 'date'], keep='last')
                add_to_sql(self.model_name, ret)
                print(f'Finish {d} Data')

            # holiday is blank
            except AttributeError:
                print(f'fail, check if {d} is a holiday')
            time.sleep(12)

    # 指定區間，主要為測試用
    def specified_date_crawl(self, start_date: str, end_date: str, range_date=date_range):

        start_date = datetime.datetime.strptime(start_date, "%Y-%m-%d")
        end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d")
        try:
            if (start_date - end_date).days <= 0:
                date_list = range_date(start_date, end_date)
                self.crawl_process(date_list)
            else:
                print(f"The start_date > your end_date,please modify your start_date <={end_date} .")
                return None
        except ValueError:
            print('Error:last_day form is %Y-%m-%d,please modify. ')
            return None

            # 自動爬取結尾後日期的資料

    def working_process(self):
        recent = datetime.datetime.now()
        day_num = (recent - self.table_latest_date).days
        if day_num > 0:
            return 0
        elif day_num == 0:
            return 1
        else:
            return 2

    def auto_update_crawl(self, last_day='Now', range_date=date_range):

        try:
            if last_day == 'Now':
                end_date = datetime.datetime.now()
            else:
                end_date = datetime.datetime.strptime(last_day, "%Y-%m-%d")

            if self.working_process() == 0:
                start_date = self.table_latest_date
                date_list = range_date(start_date, end_date)
                self.crawl_process(date_list)

            elif self.working_process() == 1:
                print(f"Finish Update Work")
                return None
            else:
                print(f"The table_latest_date > your setting date,please modify your setting date >{last_day} .")
                return None
        except ValueError:
            print('Error:last_day form is %Y-%m-%d,please modify. ')
            return None

In [None]:
D=CrawlerProcess(crawl_stockpricetw,StockPriceTW)
print(D)
D.auto_update_crawl()

# 日期控制測試

In [None]:
#日期控制
import datetime
connect_info = 'mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8'.format("root","benbilly3@","localhost",3306,"finlab_django")
engine = create_engine(connect_info)

def date_control(model_name,last_day='Now'):   
    try:
        if last_day=='Now':
            recent=datetime.datetime.now()
        else:   
            recent= datetime.datetime.strptime(last_day, "%Y-%m-%d")
        end= table_latest_date(engine, model_name._meta.db_table)
        if (recent-end).days>=0:
            dateList = [(recent - datetime.timedelta(days=x)) for x in range((recent-end).days,-1,-1)]
        else:
            print(f"The table_latest_date > your setting date,please modify your setting date >{last_day} .")
            return None
    except ValueError :
        print('Error:last_day form is %Y-%m-%d,please modify. ')
        return None
    return dateList

date_control(StockPriceTW)

In [None]:
date_control(StockPriceTW,'2008-09-01')

In [None]:
def specified_date_crawl(start_date:str,end_date:str):
    
    start_date= datetime.datetime.strptime(start_date, "%Y-%m-%d")
    end_date= datetime.datetime.strptime(end_date, "%Y-%m-%d")
    try:
        if (start_date-end_date).days <= 0:
            dateList = date_range(start_date, end_date)
        else:
            print(f"The start_date > your end_date,please modify your start_date <={end_date} .")
            return None
    except ValueError :
        print('Error:last_day form is %Y-%m-%d,please modify. ')
        return None       
    return dateList


specified_date_crawl('2020-01-01','2020-01-05')


In [None]:
from dateutil.rrule import rrule, DAILY, MONTHLY



def date_range(start_date, end_date):
    return [dt.date() for dt in rrule(DAILY, dtstart=start_date, until=end_date)]

In [None]:
date_range(datetime.datetime(2010,12,10), datetime.datetime(2010,12,15))