<a href="https://colab.research.google.com/github/arthurtibame/cryptcurrency/blob/main/cryptcurrency.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pandasql
!wget http://prdownloads.sourceforge.net/ta-lib/ta-lib-0.4.0-src.tar.gz
!tar -xzvf ta-lib-0.4.0-src.tar.gz
%cd ta-lib
!./configure --prefix=/usr
!make
!make install
!pip install Ta-Lib

In [None]:
import requests
from datetime import datetime
import pandas as pd
import pandasql as psql
import talib
import numpy as np
import math
from talib import abstract
from pprint import pprint

DEFAULT_START_TIMESTAMP=str(int(datetime(2021,2,1).timestamp()))
DEFAULT_END_TIMESTAMP=str(int(datetime.now().timestamp())) #
# pd.options.display.max_columns = None
class Error(Exception):
    """Base class for other exceptions"""
    pass

class BitoProApiNoData(Error):    
    pass

class BitoProApiError(Error):
    pass

class ParamsError(Error):
    pass

class HistoryData(object):
    def __init__(self, 
                 resolution="1m",
                 startTimestamp=DEFAULT_START_TIMESTAMP,
                 endTimestamp=DEFAULT_END_TIMESTAMP,
                 currency="bnb"
                 ):
        self.url = f"https://api.bitopro.com/v3/trading-history/{currency}_twd?resolution={resolution}&from={startTimestamp}&to={endTimestamp}"
        self.data=None
    
    def parse(self):                        
        jsonRes = requests.get(self.url).json()
        if jsonRes.get("data"):
            if len(jsonRes.get("data"))!=0:
                self.data=jsonRes["data"]
                return jsonRes["data"]
            raise BitoProApiNoData
        raise BitoProApiError
    
    def toDF(self):
        df = pd.DataFrame(self.data)    
        df['datetime'] = df["timestamp"].apply(lambda timestamp: datetime.fromtimestamp(int(timestamp)/1000))
        cols = list(df.columns)
        cols = [cols[-1]] + cols[:-1]
        df = df[cols]
        return df

def modifiedSignalDF(df):
    """
    1. get the index of first signal which is 1
    2. get the index of last signal which is 2
    3. return the df where ts between point 1 and 2
    """
    q1 = """SELECT t.timestamp FROM 
                (SELECT * FROM df ORDER BY datetime DESC) t 
            GROUP BY t.signals HAVING t.signals=1"""
    
    q2 = """
            SELECT t.timestamp FROM 
                (SELECT * FROM df ORDER BY datetime ASC) t
            GROUP BY t.signals HAVING t.signals=2
            """

    start = psql.sqldf(q1, locals())["timestamp"][0]
    end = psql.sqldf(q2, locals())["timestamp"][0] 
    q3 = f"""
            SELECT * FROM df
                WHERE timestamp >= {str(start)} and
                timestamp <= {str(end)} and 
                signals!=0
    """

    return psql.sqldf(q3, locals())

def backTestSMA(resolution = "1h", ma1=20, ma2=60, currency="bnb", initDeposit=60000):
    if ma2 < ma1:
      raise ParamsError
    # DEFAULT_START_TIMESTAMP=str(int(datetime(2021,2,20).timestamp()))
    hd =HistoryData(currency=currency, resolution=resolution, startTimestamp=DEFAULT_START_TIMESTAMP)
    hd.parse()
    df = hd.toDF()
    df[f"rolling_{ma1}"] = talib.SMA(df["close"], ma1)
    df[f"rolling_{ma2}"] = talib.SMA(df["close"], ma2)
    df = df.iloc[ma2-1:]
    df[f"rolling_{ma1}-rolling_{ma2}"] = df[f"rolling_{ma1}"] - df[f"rolling_{ma2}"]
    _ = df[f"rolling_{ma1}-rolling_{ma2}"].tolist()
    signals=[0]
    for idx, val in enumerate(_[1:]):
        if _[idx] * val < 0:
            if _[idx] < 0:
                signal = 1
            else:
                signal = 2
            signals.append(signal)
        else:
            signal = 0
            signals.append(signal)
    df["signals"]=signals
    df = modifiedSignalDF(df)
    
    lastPrice=0
    now_currency_unit=0
    deposit = initDeposit
    test = []
    for price, signal in zip(df["close"], df["signals"]):
        if signal==1:
            
            lastPrice=float(price)
            now_currency_unit = (float(deposit) - float(price)*0.001) /float(price) 
            deposit=0
            
        elif signal==2:
            # print(float(price)-lastPrice)
            # print()
            deposit = float(now_currency_unit) *float(price) - float(price)*0.002
            now_currency_unit=0                   
        
        # print(deposit)        

    profit = deposit - initDeposit
    profitRate = round((deposit - initDeposit)/initDeposit, 4)*100
    return {
        "幣別": currency,
        "初始投入金額": initDeposit,
        "獲利": profit,
        "獲利率": profitRate,
        "開始日期":df["datetime"].tolist()[0],
        "結束日期": df["datetime"].tolist()[-1]
        
    }

    

def backTestEMA(resolution = "1h", ma1=20, ma2=60, currency="bnb", initDeposit=60000):
    if ma2 < ma1:
      raise ParamsError
    # DEFAULT_START_TIMESTAMP=str(int(datetime(2021,2,20).timestamp()))
    hd =HistoryData(currency=currency, resolution=resolution, startTimestamp=DEFAULT_START_TIMESTAMP)
    hd.parse()
    df = hd.toDF()
    df[f"rolling_{ma1}"] = talib.EMA(df["close"], ma1)
    df[f"rolling_{ma2}"] = talib.EMA(df["close"], ma2)
    df = df.iloc[ma2-1:]
    df[f"rolling_{ma1}-rolling_{ma2}"] = df[f"rolling_{ma1}"] - df[f"rolling_{ma2}"]
    
    _ = df[f"rolling_{ma1}-rolling_{ma2}"].tolist()
    signals=[0]
    for idx, val in enumerate(_[1:]):
        if _[idx] * val < 0:
            if _[idx] < 0:
                signal = 1
            else:
                signal = 2
            signals.append(signal)
        else:
            signal = 0
            signals.append(signal)
    df["signals"]=signals
    df = modifiedSignalDF(df)
    
    lastPrice=0
    now_currency_unit=0
    deposit = initDeposit
    test = []
    for price, signal in zip(df["close"], df["signals"]):
        if signal==1:
            
            lastPrice=float(price)
            now_currency_unit = (float(deposit) - float(price)*0.001) /float(price) 
            deposit=0
            
        elif signal==2:
            # print(float(price)-lastPrice)
            # print()
            deposit = float(now_currency_unit) *float(price) - float(price)*0.002
            now_currency_unit=0            
        
        # print(deposit)        

    profit = deposit - initDeposit
    profitRate = round((deposit - initDeposit)/initDeposit, 4)*100
    return {
        "幣別": currency,
        "初始投入金額": initDeposit,
        "獲利": profit,
        "獲利率": profitRate,
        "開始日期":df["datetime"].tolist()[0],
        "結束日期": df["datetime"].tolist()[-1]
        
    }


def backTestMACD():
  pass


# Bollinger Band
def bollingerBandTest(bband_1, resolution = "1m", bbUnit=20, currency="bnb", initDeposit=5000):
    """
    布林通道:
        目前設定, 只要任一時間的最高價超過上軌道 -> 賣出訊號
            只要任一時間的最低價超過下軌道 -> 買入訊號
        當連續兩次訊號出現, 以第一次的為主
    """
    """
    bbands:

        STEP 1. find upper, middle, lower from close price
        STEP 2. add more column ( (high-upper), (low-lower) ) to make the signal column
        STEP 3. IF (high-upper) or (low-lower)  <= 0 -> there is a signal (buy (1) or sell(2))                
        STEP 4. Deal with the repeat signals, keep the first one 
        STEP 5. Check the profit from an initial deposit
            
    """
    startTimestamp=str(int(datetime(2021,1,1).timestamp()))
    # endTimestamp=str(int(datetime(2021,2,31).timestamp()))

    """ Crawl data from BitoPro API with custom class """
    hd =HistoryData(resolution=resolution, currency=currency, startTimestamp=DEFAULT_START_TIMESTAMP)
    hd.parse()
    df = hd.toDF()
    # Convert to numeric 
    cols=[i for i in df.columns if i not in ["datetime","timestamp"]]
    for col in cols:
        df[col]=pd.to_numeric(df[col])


    """ Test some functions from talib bbands"""


       
    close = [float(x) for x in pd.to_numeric(df["close"]).to_numpy()]
    np_float_close = np.array(close)
    upper, middle, lower = abstract.BBANDS(np_float_close, bbUnit, bband_1, bband_1, int(bband_1))
    df["upper"]=upper
    df["middle"]=middle
    df["lower"]=lower
    df = df.iloc[19:,:]

    # Prepare more columns to find signals
    df["high-upper"]=df["high"] - df["upper"]
    df["low-lower"] = df["low"] - df["lower"]

    # find signals
    # a. high price over upper -> high-upper > 0 -> sell signal (2) 
    # b. low proce lower lower price -> low-lower > 0 -> buy signal (1)
    huArr = df["high-upper"].tolist()
    llArr = df["low-lower"].tolist()
    signals = [0]
    for idx in range(1, len(huArr)):
        if huArr[idx] * huArr[idx-1] <0: # signal
            signal=2
        elif llArr[idx] * llArr[idx-1] <0: # signal
            signal=1
        else:
            signal=0
        signals.append(signal)
    hold_unit_of_currency=0
    transcation = []
    for idx in range(len(signals)):
        if signals[idx]==1 and hold_unit_of_currency==0:
            transcation.append(1)
            hold_unit_of_currency+=1
        elif signals[idx]==2 and hold_unit_of_currency==1:
            transcation.append(2)
            hold_unit_of_currency-=1
        else:
            transcation.append(0)
    
    df["signals"]=transcation        

    q0 = """SELECT * FROM df WHERE signals !=0 """
    q1 = """SELECT transactionWithFee FROM df WHERE signals =1  """
    q2 = """SELECT transactionWithFee FROM df WHERE signals =2  """

    df = modifiedSignalDF(psql.sqldf(q0, locals()))
    print(len(df.index))
    
    # print(df)
    lastPrice=0
    now_currency_unit=0
    deposit = initDeposit
    for price, signal in zip(df["close"], df["signals"]):
        if signal==1:
            
            lastPrice=float(price)
            now_currency_unit = (float(deposit) - float(price)*0.001) /float(price) 
            deposit=0
            
        elif signal==2:
            # print(float(price)-lastPrice)
            # print()
            deposit = float(now_currency_unit) *float(price) - float(price)*0.002
            now_currency_unit=0             
        
        # print(deposit)        

    profit = deposit - initDeposit
    profitRate = round((deposit - initDeposit)/initDeposit, 4)*100
    
    return {
        "幣別": currency,
        "初始投入金額": initDeposit,
        "獲利": profit,
        "獲利率": profitRate,
        "開始日期":df["datetime"].tolist()[0],
        "結束日期": df["datetime"].tolist()[-1]
        
    }


    

    # buy = [float(f) for f in psql.sqldf(q1, locals())["transactionWithFee"].tolist()]
    # initDeposit = buy[0]


    # sell = [float(f) for f in psql.sqldf(q2, locals())["transactionWithFee"].tolist()]

    # print(psql.sqldf("SELECT datetime, close, signals FROM df", locals()))
    # return sum(sell) - sum(buy)

def most_frequent(List): 
    return max(set(List), key = List.count) 

if __name__ == "__main__":
    resolution = "5m"
    # currency = "trx"
    currencies = ["btc", "eth", "bnb", "eos", "ltc", "xrp", "yfi", "trx", "bch", "bchsv","bito"]
    ma1 = 20
    ma2 = 60
    initDeposit = 2000000
    bband_1 = 2.0
    result = []
    
    for currency in currencies:
        try:
            b = bollingerBandTest(bband_1=bband_1,resolution=resolution, currency=currency, initDeposit=initDeposit) 
            sma = backTestSMA(resolution=resolution, ma1=ma1, ma2=ma2, currency=currency, initDeposit=initDeposit)
            ema = backTestEMA(resolution=resolution, ma1=ma1, ma2=ma2, currency=currency, initDeposit=initDeposit)
            print(b)
            print(sma)
            print(ema)

            _ = [("b", b["獲利率"]), ("sma", sma["獲利率"]), ("ema", ema["獲利率"])]
            __ = max(_,key=lambda item:item[1])
            print(f"{currency} : {__}")
            result.append(__)
        except Exception as e:
            print(e)
    print(result)        

144
{'幣別': 'btc', '初始投入金額': 2000000, '獲利': -104878.96511503682, '獲利率': -5.24, '開始日期': '2021-02-01 06:50:00.000000', '結束日期': '2021-03-01 15:30:00.000000'}
{'幣別': 'btc', '初始投入金額': 2000000, '獲利': 182861.74420642946, '獲利率': 9.139999999999999, '開始日期': '2021-02-01 18:40:00.000000', '結束日期': '2021-03-01 19:05:00.000000'}
{'幣別': 'btc', '初始投入金額': 2000000, '獲利': 169239.7072436777, '獲利率': 8.459999999999999, '開始日期': '2021-02-01 18:30:00.000000', '結束日期': '2021-03-01 06:45:00.000000'}
btc : ('sma', 9.139999999999999)
158
{'幣別': 'eth', '初始投入金額': 2000000, '獲利': 275613.4255316686, '獲利率': 13.780000000000001, '開始日期': '2021-02-01 07:00:00.000000', '結束日期': '2021-03-01 12:50:00.000000'}
{'幣別': 'eth', '初始投入金額': 2000000, '獲利': 116137.22043576045, '獲利率': 5.81, '開始日期': '2021-02-01 09:30:00.000000', '結束日期': '2021-03-01 18:25:00.000000'}
{'幣別': 'eth', '初始投入金額': 2000000, '獲利': 269347.1441526953, '獲利率': 13.469999999999999, '開始日期': '2021-02-01 08:55:00.000000', '結束日期': '2021-03-01 18:30:00.000000'}
eth : ('b', 13.780

In [None]:
pd.options.display.max_columns = None
# Bollinger Band
def bollingerBandTest(bband_1, resolution = "1m", bbUnit=20, currency="bnb", initDeposit=5000):
    """
    布林通道:
        目前設定, 只要任一時間的最高價超過上軌道 -> 賣出訊號
            只要任一時間的最低價超過下軌道 -> 買入訊號
        當連續兩次訊號出現, 以第一次的為主
    """
    """
    bbands:

        STEP 1. find upper, middle, lower from close price
        STEP 2. add more column ( (high-upper), (low-lower) ) to make the signal column
        STEP 3. IF (high-upper) or (low-lower)  <= 0 -> there is a signal (buy (1) or sell(2))                
        STEP 4. Deal with the repeat signals, keep the first one 
        STEP 5. Check the profit from an initial deposit
            
    """
    startTimestamp=str(int(datetime(2021,1,1).timestamp()))
    # endTimestamp=str(int(datetime(2021,2,31).timestamp()))

    """ Crawl data from BitoPro API with custom class """
    hd =HistoryData(resolution=resolution, currency=currency, startTimestamp=startTimestamp)
    hd.parse()
    df = hd.toDF()
    # Convert to numeric 
    cols=[i for i in df.columns if i not in ["datetime","timestamp"]]
    for col in cols:
        df[col]=pd.to_numeric(df[col])


    """ Test some functions from talib bbands"""


       
    close = [float(x) for x in pd.to_numeric(df["close"]).to_numpy()]
    np_float_close = np.array(close)
    upper, middle, lower = abstract.BBANDS(np_float_close, bbUnit, bband_1, bband_1, int(bband_1))
    df["upper"]=upper
    df["middle"]=middle
    df["lower"]=lower
    df = df.iloc[19:,:]

    # Prepare more columns to find signals
    df["high-upper"]=df["high"] - df["upper"]
    df["low-lower"] = df["low"] - df["lower"]

    # find signals
    # a. high price over upper -> high-upper > 0 -> sell signal (2) 
    # b. low proce lower lower price -> low-lower > 0 -> buy signal (1)
    huArr = df["high-upper"].tolist()
    llArr = df["low-lower"].tolist()
    signals = [0]
    for idx in range(1, len(huArr)):
        if huArr[idx] * huArr[idx-1] <0: # signal
            signal=2
        elif llArr[idx] * llArr[idx-1] <0: # signal
            signal=1
        else:
            signal=0
        signals.append(signal)
    hold_unit_of_currency=0
    transcation = []
    for idx in range(len(signals)):
        if signals[idx]==1 and hold_unit_of_currency==0:
            transcation.append(1)
            hold_unit_of_currency+=1
        elif signals[idx]==2 and hold_unit_of_currency==1:
            transcation.append(2)
            hold_unit_of_currency-=1
        else:
            transcation.append(0)
    
    df["signals"]=transcation        

    q0 = """SELECT * FROM df WHERE signals !=0 """
    q1 = """SELECT transactionWithFee FROM df WHERE signals =1  """
    q2 = """SELECT transactionWithFee FROM df WHERE signals =2  """

    df = modifiedSignalDF(psql.sqldf(q0, locals()))
    
    # print(df)
    lastPrice=0
    now_currency_unit=0
    deposit = initDeposit
    for price, signal in zip(df["close"], df["signals"]):
        if signal==1:
            
            lastPrice=float(price)
            now_currency_unit = (float(deposit) - float(price)*0.001) /float(price) 
            deposit=0
            
        elif signal==2:
            # print(float(price)-lastPrice)
            # print()
            deposit = float(now_currency_unit) *float(price) - float(price)*0.002
            now_currency_unit=0             
        
        # print(deposit)        

    profit = deposit - initDeposit
    profitRate = round((deposit - initDeposit)/initDeposit, 4)*100
    
    return {
        "幣別": currency,
        "初始投入金額": initDeposit,
        "獲利": profit,
        "獲利率": profitRate,
        "開始日期":df["datetime"].tolist()[0],
        "結束日期": df["datetime"].tolist()[-1]
        
    }


    

    # buy = [float(f) for f in psql.sqldf(q1, locals())["transactionWithFee"].tolist()]
    # initDeposit = buy[0]


    # sell = [float(f) for f in psql.sqldf(q2, locals())["transactionWithFee"].tolist()]

    # print(psql.sqldf("SELECT datetime, close, signals FROM df", locals()))
    # return sum(sell) - sum(buy)

def most_frequent(List): 
    return max(set(List), key = List.count) 
  
     

if __name__ == "__main__":    
    bband_1 = 3.0
    curr = "xrp"
    initDeposit=500
    resolution="1m"
    a = bollingerBandTest(bband_1=bband_1,resolution=resolution, currency=curr, initDeposit=initDeposit)    
    print(a)

{'幣別': 'xrp', '初始投入金額': 500, '獲利': 948.2254877347116, '獲利率': 189.65, '開始日期': '2021-01-07 11:14:00.000000', '結束日期': '2021-02-19 16:06:00.000000'}
