Skip to content

Commit

Permalink
fix misspelling, add coins history management, update constants that …
Browse files Browse the repository at this point in the history
…gets from DB
  • Loading branch information
korcky committed Jul 28, 2017
1 parent f47d453 commit d45bbdb
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 38 deletions.
94 changes: 67 additions & 27 deletions crawler/crawler.py
Expand Up @@ -20,32 +20,43 @@
"""This module contains main logic for crawler."""

import asyncio
from datetime import datetime
from datetime import datetime, timedelta
from traceback import format_exc

from crawler.data_loader import loader


class Crawler:
"""
Crawler that checks new prices on various coins/exchanges pairs and update this price in DB.
Crawler that checks new prices on various coins/exchanges pairs, update this price in DB, and create
price history.
Attributes:
:param coin_map: <dict> where keys is coin name (in BD) and values is <dict>
with structure like {exchange name (in BD): coin name (on exchange), ...}
:param bd: <object> to interact with BD that contain next functions:
update_exchange(coin, exchange, price), get_coin(coin), add_coin(coin),
get_exchange(coin, exchange, price) add_exchange(coin, exchange, price)
:param coin_map: <dict> where keys is coin name (in db) and values is <dict>
with structure like {exchange name (in db): coin name (on exchange), ...}
:param db: <object> to interact with DB that contain next functions:
get_coin(coin), add_coin(coin),
get_exchange(coin, exchange), add_exchange(coin, exchange, price), update_exchange(coin, exchange, price),
get_coin_h(coin), add_coin_h(coin),
get_exchange(coin, exchange), add_exchange_h(coin, exchange), get_exchange_history(coin, exchange),
add_price_to_exchange_h(coin, exchange, time, price), update_exchange_h(coin, exchange, history)
:param logger: <logging.Logger> crawler's logger
:param timeout: <int>[optional] frequency (in seconds) of requesting new data
:param timeout: <int>[optional=1] frequency (in seconds) of requesting new data
:param history: <bool>[optional=True] will write coins price history if True
:param h_update_time: <int>[optional=3600] frequency (in seconds) of history cleaning
:param h_threshold_time: <int>[optional=1] time (in days) that price will exist in history
"""
def __init__(self, coin_map, bd, logger, timeout=1):

def __init__(self, coin_map, db, logger, timeout=1, history=True, h_update_time=3600, h_threshold_time=1):
self.coin_map = coin_map
self.bd = bd
self.db = db
self.timeout = timeout
self.loop = asyncio.get_event_loop()
self.logger = logger
self.history = history
self.h_update_time = h_update_time
self.h_threshold_time = h_threshold_time

async def _load(self, coin, exchange):
"""
Expand All @@ -64,42 +75,52 @@ async def _load(self, coin, exchange):

def _update(self, coin, exchange, price):
"""
Update price of specific coin on specific exchange in BD
Update price of specific coin on specific exchange in db
Args:
:param coin: <string> coin name (in BD)
:param coin: <string> coin name (in db)
:param exchange: <string> exchange name
:param price: <float> price
"""
self.bd.update_exchange(coin=coin, exchange=exchange, price=price)
self.db.update_exchange(coin=coin, exchange=exchange, price=price)
if self.history:
self.db.add_price_to_exchange_h(coin=coin, exchange=exchange, time=datetime.utcnow(), price=price)

def _check_existing(self, coin, exchange):
"""
Check existence of coin/exchange pair in BD and if not exist add them.
Check existence of coin/exchange pair in db and if not exist add them.
Args:
:param coin: <string> coin name (in BD)
:param coin: <string> coin name (in db)
:param exchange: <string> exchange name
"""
db_coin = self.bd.get_coin(coin=coin)
if not db_coin:
self.bd.add_coin(coin=coin)
db_exchange = self.bd.get_exchange(coin=coin, exchange=exchange)
if not db_exchange:
self.bd.add_exchange(coin=coin, exchange=exchange, price=None)
if not self.db.get_coin(coin=coin):
self.db.add_coin(coin=coin)
if not self.db.get_exchange(coin=coin, exchange=exchange):
self.db.add_exchange(coin=coin, exchange=exchange, price=None)

if self.history:
if not self.db.get_coin_h(coin=coin):
self.db.add_coin_h(coin=coin)
if not self.db.get_exchange_h(coin=coin, exchange=exchange):
self.db.add_exchange_h(coin=coin, exchange=exchange)

async def load_and_update(self, coin, exchange):
"""
Check availability of coin/exchange pair. If available start infinite loop
that gets new coin price from exchange site and updates this price in BD
that gets new coin price from exchange site and updates this price in db
Args:
:param coin: <string> coin name (in BD)
:param coin: <string> coin name (in db)
:param exchange: <string> exchange name
"""
if exchange == 'bitfinex':
sleep_time = len(self.coin_map)
else:
sleep_time = self.timeout
try:
coin_name = self.coin_map[coin].get(exchange)
if coin_name is not None:
Expand All @@ -109,16 +130,33 @@ async def load_and_update(self, coin, exchange):
stime = datetime.now()
value = await self._load(coin=coin_name, exchange=exchange)
self._update(coin=coin, exchange=exchange, price=value)
await asyncio.sleep(self.timeout - (datetime.now() - stime).total_seconds())
await asyncio.sleep(sleep_time - (datetime.now() - stime).total_seconds())
except Exception as e:
self.logger.warning('Exception in thread\'s loop ({}{}): {}'
self.logger.warning('Exception in thread\'s loop ({} {}): {}'
'{}'.format(exchange, coin, str(e), format_exc()))
await asyncio.sleep(self.timeout)
await asyncio.sleep(sleep_time)
except Exception as e:
self.logger.critical('Exception in crawler\'s thread ({}{}): {}'
self.logger.critical('Exception in crawler\'s thread ({} {}): {}'
'{}'.format(exchange, coin, str(e), format_exc()))
self.loop.stop()

async def history_cleaner(self):
"""Clean history from old prices"""
while True:
for coin in self.coin_map.keys():
for exchange in self.coin_map[coin].keys():
try:
new_history = []
time = datetime.utcnow() - timedelta(days=self.h_threshold_time)
for timestamp in self.db.get_exchange_history(coin=coin, exchange=exchange):
if timestamp['time'] > time:
new_history.append(timestamp)
self.db.update_exchange_h(coin=coin, exchange=exchange, history=new_history)
except Exception as e:
self.logger.warning('Exception in history_cleaner ({} {}): {}'
'{}'.format(exchange, coin, str(e), format_exc()))
await asyncio.sleep(self.h_update_time)

def launch(self):
"""
Start function of Crawler (start asynchronous load_and_update() tasks for
Expand All @@ -129,6 +167,8 @@ def launch(self):
for exchange in self.coin_map[coin].keys():
asyncio.ensure_future(self.load_and_update(coin=coin,
exchange=exchange))
if self.history:
asyncio.ensure_future(self.history_cleaner())
self.loop.run_forever()
except Exception as e:
self.logger.critical('Exception in creating crawler\'s threads: {}'
Expand Down
2 changes: 1 addition & 1 deletion crawler_launch.py
Expand Up @@ -28,5 +28,5 @@
crawler_logger = logging.getLogger(__name__)

if __name__ == '__main__':
crawler = Crawler(coin_map=mq.coin_map, bd=mq, logger=crawler_logger)
crawler = Crawler(coin_map=mq.coin_map, db=mq, logger=crawler_logger)
crawler.launch()
137 changes: 127 additions & 10 deletions mongo_queries.py
Expand Up @@ -28,16 +28,31 @@
db = mongo_client[local_config.MONGO_DB]
db.authenticate(local_config.MONGO_BOT_USER, local_config.MONGO_BOT_PASSWORD)


# Dictionary that compares coin name with coin name on specific exchange
coin_map = db.settings.find_one()['coin_map']
def _coin_map():
return db.settings.find_one()['coin_map']
coin_map = _coin_map()


# All available coins
coins = db.settings.find_one()['coin_map'].keys()
def _coins():
return db.settings.find_one()['coin_map'].keys()
coins = _coins()


# Dictionary that compares exchange name from DB with real exchange name
exchange_map = db.settings.find_one()['exchange_map']
def _exchange_map():
return db.settings.find_one()['exchange_map']
exchange_map = _exchange_map()
# All available exchanges
exchanges = list(set([exchange for coin in coins for exchange in coin_map[coin].keys()]))


# Default settings for users
default_settings = db.settings.find_one()['default_settings']
def _default_settings():
return db.settings.find_one()['default_settings']
default_settings = _default_settings()


# Users collection
Expand All @@ -50,7 +65,7 @@ def get_users():
:return: <list> of dict with user's data
"""
return db['users'].find({})
return list(db['users'].find({}))


def add_user(msg, email):
Expand Down Expand Up @@ -195,7 +210,7 @@ def add_exchange(coin, exchange, price):
"""
db['coins'].update({'name': coin},
{'$push': {'exchanges': {'name': exchange,
'price': price}}})
'price': price}}})


def update_exchange(coin, exchange, price):
Expand All @@ -209,7 +224,109 @@ def update_exchange(coin, exchange, price):
"""
db['coins'].find_and_modify(query={'name': coin, 'exchanges.name': exchange},
update={"$set": {'exchanges.$.price': price}})
update={'$set': {'exchanges.$.price': price}})


# Coins history collection

def get_coin_h(coin):
"""
Return coin's data
Args:
:param coin: <string> coin name
Returns:
:return: <dict> with coin's data or None if coin doesn't exist
"""
return db['coins_history'].find_one({'name': coin})


def add_coin_h(coin):
"""
Add new coin with emtpy exchange list to collection
Args:
:param coin: <string> coin name
"""
db['coins_history'].insert_one({'name': coin, 'exchanges': []})


def get_exchange_h(coin, exchange):
"""
Return coin's data where coin contains specific exchange
Args:
:param coin: <string> coin name
:param exchange: <string> exchange name
Returns:
:return: <dict> with coin's data or None if that coin doesn't exist
"""
return db['coins_history'].find_one({'name': coin, 'exchanges.name': exchange})


def add_exchange_h(coin, exchange):
"""
Add new exchange with empty coin's price history to coin's list
Args:
:param coin: <string> coin name
:param exchange: <string> exchange name
"""
db['coins_history'].update({'name': coin},
{'$push': {'exchanges': {'name': exchange,
'history': []}}})


def add_price_to_exchange_h(coin, exchange, time, price):
"""
Add coin's price on exchange and time when this price get to history
Args:
:param coin: <string> coin name
:param exchange: <string> exchange name
:param time: <datetime.datetime> UTC time
:param price: <float> coin's price on exchange
"""
db['coins_history'].find_and_modify(query={'name': coin, 'exchanges.name': exchange},
update={'$push': {'exchanges.$.history': {'time': time,
'price': price}}})


def get_exchange_history(coin, exchange):
"""
Return coin's price history on exchange
Args:
:param coin: <string> coin name
:param exchange: <string> exchange name
Returns:
:return: <list> of coin's price history
"""
return db['coins_history'].find_one({'name': coin, 'exchanges.name': exchange},
{'exchanges.$': 1})['exchanges'][0]['history']


def update_exchange_h(coin, exchange, history):
"""
Update coin's history on exchange
Args:
:param coin: <string> coin name
:param exchange: <string> exchange name
:param history: <list> of dictionaries with next structure: {'time': <datetime.datetime>, 'price': <float>}
"""
db['coins_history'].find_and_modify(query={'name': coin, 'exchanges.name': exchange},
update={'$set': {'exchanges.$.history': history}})


# Subscribers collection
Expand All @@ -225,8 +342,8 @@ def get_user_email(key):
:return: <string> user's e-mail or None if subscriber doesn't exist
"""
user = db.subscribers.find_one({"key": str(key)})
user = db.subscribers.find_one({'key': str(key)})
if user:
return user["email"]
return user['email']
else:
return None
return None

0 comments on commit d45bbdb

Please sign in to comment.