In [1]:
import sys
sys.path.insert(1, '/home/b3arjuden/crocket')

In [2]:
from datetime import datetime, timedelta
from decimal import Decimal
from copy import deepcopy
from json import load as json_load
from numpy import mean
from pprint import pprint
from random import randint, shuffle
from requests import get
from time import sleep, time
from requests import get
from requests_futures.sessions import FuturesSession
from requests.exceptions import ConnectTimeout, ConnectionError, ProxyError, ReadTimeout
from concurrent.futures import as_completed, Future

from crocket.bittrex.bittrex2 import Bittrex, format_bittrex_entry, return_request_input
from crocket.utilities.time import format_time, convert_bittrex_timestamp_to_datetime, utc_to_local

In [3]:
def process_data(input_data, working_data, market_datetime, interval=60):
    
    entries = []
    
    if not working_data:
        working_data = deepcopy(input_data)
        
    for market in working_data:
        
        working_list = working_data.get(market)
        input_list = input_data.get(market)
        current_datetime = market_datetime.get(market)
            
        last_id = working_list[0].get('Id')
        id_list = [x.get('Id') for x in input_list]
        
        if last_id in id_list:
            overlap_index = id_list.index(last_id)
            working_list = input_list[:overlap_index] + working_list
        else:
            working_list = input_list + working_list
            print('Latest ID in {} working list not found in input data. '
                         'Adding all input data to working list.')
        
        working_data[market] = working_list
        
        latest_datetime = utc_to_local(convert_bittrex_timestamp_to_datetime(working_list[0].get('TimeStamp')))
        
        if (latest_datetime - current_datetime).total_seconds() > interval:
            
            timestamp_list = [utc_to_local(convert_bittrex_timestamp_to_datetime(x.get('TimeStamp')))
                              for x in working_list]
            
            start, stop = get_interval_index(timestamp_list, current_datetime, interval)

            if start == stop:
                while (current_datetime + timedelta(seconds=interval)) < timestamp_list[start - 1]:
                    metrics = calculate_metrics(working_list[start:stop], current_datetime)

                    metrics['price'] = last_price.get(market)

                    fields, values = format_bittrex_entry(metrics)
                    entries.append((market, fields, values))

                    current_datetime = current_datetime + timedelta(seconds=interval)

                market_datetime[market] = current_datetime
            else:
                metrics = calculate_metrics(working_list[start:stop], current_datetime)

                fields, values = format_bittrex_entry(metrics)
                entries.append((market, fields, values))
                
                market_datetime[market] = current_datetime + timedelta(seconds=interval)
                
            working_data[market] = working_list[:start]
            last_price[market] = metrics.get('price')
    
    # db.insert_transaction_query(entries)
    
    return working_data, market_datetime

In [4]:
def configure_ip(ip):
    
    return {
        'http': ip,
        'https': ip
    }

def process_response(session, response):
    
    try:
        response.data = response.json()
    except:
        response.data = {
               'success': False,
               'message': 'NO_API_RESPONSE',
               'result': None
            }

In [5]:
def get_data(markets, bittrex, session, proxies, proxy_indexes, max_api_retry=3, logger=None):

    futures = []
    response_dict = {}
    num_proxies = len(proxies)
    
    start1 = time()
    
    for index in range(len(markets)):
        market = markets[index]
        request_input = bittrex.get_market_history(market)

        proxy = configure_ip(proxies[proxy_indexes[index]])
        url = request_input.get('url')
        headers = {"apisign": request_input.get('apisign')}

        response = session.get(url,
                               background_callback=process_response,
                               headers=headers,
                               timeout=3,
                               proxies=proxy)

        # Add attributes to response
        response.market = market
        response.url = request_input.get('url')
        response.headers = headers

        futures.append(response)
    
    stop1 = time()
    send_time = stop1 - start1
    
    start2 = time()
    for future in as_completed(futures):

        try:
            response_data = future.result().data

            if not response_data.get('success'):
                if response_data.get('message') == "INVALID_MARKET":
                    markets.remove(future.market)
                    print('Removed {}: invalid market ...'.format(future.market))
                continue

            response_dict[future.market] = response_data.get('result')
            if not response_dict[future.market]:
                if response_data.get('message') == "NO_API_RESPONSE":
                    raise ProxyError('NO API RESPONSE')

        except (ProxyError, ConnectTimeout, ConnectionError, ReadTimeout):

            api_retry = 0

            while True:

                if api_retry >= max_api_retry:
                    print('MAX API RETRY LIMIT ({}) REACHED. SKIPPING {}.'.format(str(max_api_retry),
                                                                                         future.market))
                    break

                r = randint(0, num_proxies - 1)
                proxy = configure_ip(proxies[r])

                try:
                    response = session.get(future.url,
                                           background_callback=process_response,
                                           headers=future.headers,
                                           timeout=2,
                                           proxies=proxy)
                    response_dict[future.market] = response.result().data.get('result')
                    if not response_dict[future.market]:
                        print('NO API RESPONSE, RETRYING: {} ...'.format(future.market))
                        api_retry += 1
                        continue

                    break

                except (ProxyError, ConnectTimeout, ConnectionError, ReadTimeout):
                    api_retry += 1
                    print('Retried API call failed for {}.'.format(future.market))
    
    stop2 = time()
    receive_time = stop2 - start2
    
    return send_time, receive_time

In [6]:
PROXY_LIST_PATH = '/home/b3arjuden/crocket/proxy_list.txt'
BITTREX_CREDENTIALS_PATH = '/home/b3arjuden/bittrex_credentials.json'

In [7]:
# Read files

with open(PROXY_LIST_PATH, 'r') as f:
    PROXIES = f.read().splitlines()
    
with open(BITTREX_CREDENTIALS_PATH, 'r') as f:
    credentials = json_load(f)

In [8]:
# Create bittrex objects

b1 = Bittrex(api_key=credentials.get('key'), 
             api_secret=credentials.get('secret'), 
             api_version='v1.1')

bittrex = Bittrex(api_key=credentials.get('key'), 
             api_secret=credentials.get('secret'), 
             dispatch=return_request_input,
             api_version='v1.1')

In [19]:
pprint(sorted(MARKETS))

['BTC-1ST',
 'BTC-2GIVE',
 'BTC-ABY',
 'BTC-ADA',
 'BTC-ADT',
 'BTC-ADX',
 'BTC-AEON',
 'BTC-AGRS',
 'BTC-AMP',
 'BTC-ANT',
 'BTC-APX',
 'BTC-ARDR',
 'BTC-ARK',
 'BTC-AUR',
 'BTC-BAT',
 'BTC-BAY',
 'BTC-BCC',
 'BTC-BCY',
 'BTC-BITB',
 'BTC-BLITZ',
 'BTC-BLK',
 'BTC-BLOCK',
 'BTC-BNT',
 'BTC-BRK',
 'BTC-BRX',
 'BTC-BSD',
 'BTC-BTCD',
 'BTC-BTG',
 'BTC-BURST',
 'BTC-BYC',
 'BTC-CANN',
 'BTC-CFI',
 'BTC-CLAM',
 'BTC-CLOAK',
 'BTC-CLUB',
 'BTC-COVAL',
 'BTC-CPC',
 'BTC-CRB',
 'BTC-CRW',
 'BTC-CURE',
 'BTC-CVC',
 'BTC-DASH',
 'BTC-DCR',
 'BTC-DCT',
 'BTC-DGB',
 'BTC-DGD',
 'BTC-DMD',
 'BTC-DNT',
 'BTC-DOGE',
 'BTC-DOPE',
 'BTC-DTB',
 'BTC-DYN',
 'BTC-EBST',
 'BTC-EDG',
 'BTC-EFL',
 'BTC-EGC',
 'BTC-EMC',
 'BTC-EMC2',
 'BTC-ENG',
 'BTC-ENRG',
 'BTC-ERC',
 'BTC-ETC',
 'BTC-ETH',
 'BTC-EXCL',
 'BTC-EXP',
 'BTC-FAIR',
 'BTC-FCT',
 'BTC-FLDC',
 'BTC-FLO',
 'BTC-FTC',
 'BTC-FUN',
 'BTC-GAM',
 'BTC-GAME',
 'BTC-GBG',
 'BTC-GBYTE',
 'BTC-GCR',
 'BTC-GEO',
 'BTC-GLD',
 'BTC-GNO',
 'BTC-GNT',
 'BTC-G

In [16]:
# Get list of currencies

response = b1.get_markets()

MARKETS = [x.get('MarketName') for x in response.get('result') 
              if x.get('BaseCurrency') == 'BTC' and x.get('IsActive')]

In [11]:
num_proxies = 200
proxy_indexes = list(range(num_proxies))

In [14]:
# Test asynchronous requests
sleep_time = 10

data = {'stime': [],
        'rtime': [],
        'ttime': []}

current_datetime = datetime.now().astimezone(tz=None)
current_datetime = {k: current_datetime for k in MARKETS}
last_price = {k: Decimal(0) for k in MARKETS}
weighted_price = {k: Decimal(0) for k in MARKETS}

try:

    # TODO: optimize max_workers
    with FuturesSession(max_workers=20) as session:

        for ii in range(10):
            shuffle(proxy_indexes)
            start = time()

            stime, rtime = get_data(MARKETS, bittrex, session, PROXIES, proxy_indexes,
                                     max_api_retry=3)

            stop = time()
            run_time = stop - start
            
            data['stime'].append(stime)
            data['rtime'].append(rtime)
            data['ttime'].append(run_time)
            
            print('Send time: {}'.format(str(stime)))
            print('Receive time: {}'.format(str(rtime)))
            print('Total elapsed time: {}'.format(str(run_time)))
            
            if run_time < sleep_time:
                sleep(sleep_time - run_time)
    
    print('Average send time: {}'.format(str(mean(data.get('stime')))))
    print('Average receive time: {}'.format(str(mean(data.get('rtime')))))
    print('Average total time: {}'.format(str(mean(data.get('ttime')))))
    
except ConnectionError as e:
    logger.debug('ConnectionError: {}. Exiting ...'.format(e))

Send time: 0.28432393074035645
Receive time: 6.66538667678833
Total elapsed time: 6.983477592468262
Send time: 0.2125239372253418
Receive time: 4.512909650802612
Total elapsed time: 4.725466012954712
Send time: 0.17916083335876465
Receive time: 4.9338297843933105
Total elapsed time: 5.11301326751709
Send time: 0.13524580001831055
Receive time: 4.326725006103516
Total elapsed time: 4.461992502212524
Send time: 0.11584806442260742
Receive time: 4.634593725204468
Total elapsed time: 4.75046706199646
Send time: 0.1454923152923584
Receive time: 4.930344343185425
Total elapsed time: 5.075860977172852
Send time: 0.20578455924987793
Receive time: 3.3354337215423584
Total elapsed time: 3.5412440299987793
Send time: 0.13341355323791504
Receive time: 2.239128589630127
Total elapsed time: 2.3994123935699463
Send time: 0.26300597190856934
Receive time: 2.007354497909546
Total elapsed time: 2.310774803161621
Send time: 0.37558484077453613
Receive time: 2.9384493827819824
Total elapsed time: 3.346690