Skip to content

Commit

Permalink
AlphaVantage http call are thread safe (#109)
Browse files Browse the repository at this point in the history
  • Loading branch information
ilcardella committed Dec 25, 2019
1 parent 987295a commit 7be4370
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 62 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Replaced TK user interface with GTK+ 3
- Tickers prices are fetched using `alpha-vantage` Python module
- **alpha_vantage_polling_period** configuration parameter is used to wait between each AV call
- AlphaVantage http requests are thread safe

### Added
- Status bar showing portfolio filepath
Expand Down
142 changes: 80 additions & 62 deletions src/Model/AlphaVantageInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from enum import Enum
import datetime as dt
import time
import functools
import threading
from alpha_vantage.timeseries import TimeSeries

currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
Expand All @@ -30,80 +32,96 @@ class AVInterval(Enum):
MONTHLY = "monthly"


class AlphaVantageInterface:
"""class providing interfaces to request data from AlphaVantage"""
# Mutex used for thread synchronisation
lock = threading.Lock()

# Inner class to create a Singleton pattern
class __AlphaVantageInterface:
def __init__(self, config):
self._config = config
self._last_call_ts = dt.datetime.now()
self._TS = TimeSeries(
key=config.get_alpha_vantage_api_key(),
output_format="json",
treat_info_as_error=True,
)

def daily(self, market_id):
"""
Calls AlphaVantage API and return the Daily time series for the given market
- **market_id**: string representing an AlphaVantage compatible market id
- Returns **None** if an error occurs otherwise the pandas dataframe
"""
self._wait_before_call()
try:
market_id = self._format_market_id(market_id)
data, meta_data = self._TS.get_daily(
symbol=market_id, outputsize="compact"
)
return data
except Exception as e:
logging.error("AlphaVantage wrong api call for {}".format(market_id))
logging.debug(e)
logging.debug(traceback.format_exc())
logging.debug(sys.exc_info()[0])
return None
def synchronised(lock):
""" Thread synchronization decorator """

def wrapper(f):
@functools.wraps(f)
def inner_wrapper(*args, **kw):
with lock:
return f(*args, **kw)

return inner_wrapper

return wrapper


class Singleton(type):
"""Metaclass to implement the Singleton desing pattern"""

_instances = {}

@synchronised(lock)
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]


def _format_market_id(self, market_id):
"""
Convert a standard market id to be compatible with AlphaVantage API.
Adds the market exchange prefix (i.e. London is LON:)
"""
# return "{}:{}".format("LON", market_id.split("-")[0])
if "LSE:" in market_id:
subs = market_id.split(":")
assert len(subs) == 2
return "{}:{}".format(Markets[subs[0]].value, subs[1])
return market_id

def _wait_before_call(self):
"""
Wait between API calls to not overload the server
"""
while (dt.datetime.now() - self._last_call_ts) <= dt.timedelta(
seconds=self._config.get_alpha_vantage_polling_period()
):
time.sleep(0.2)
self._last_call_ts = dt.datetime.now()

# Single instance of the inner class
_instance = None
class AlphaVantageInterface(metaclass=Singleton):
"""class providing interfaces to request data from AlphaVantage"""

def __init__(self, config):
if not AlphaVantageInterface._instance:
AlphaVantageInterface._instance = AlphaVantageInterface.__AlphaVantageInterface(
config
)
logging.info("AlphaVantageInterface initialised")
self._config = config
self._last_call_ts = dt.datetime.now()
self._TS = TimeSeries(
key=config.get_alpha_vantage_api_key(),
output_format="json",
treat_info_as_error=True,
)
logging.info("AlphaVantageInterface initialised")

def _daily(self, market_id):
"""
Calls AlphaVantage API and return the Daily time series for the given market
- **market_id**: string representing a market ticker
- Returns **None** if an error occurs otherwise the pandas dataframe
"""
try:
market_id = self._format_market_id(market_id)
data, meta_data = self._TS.get_daily(symbol=market_id, outputsize="compact")
return data
except Exception as e:
logging.error("AlphaVantage wrong api call for {}".format(market_id))
logging.debug(e)
logging.debug(traceback.format_exc())
logging.debug(sys.exc_info()[0])
return None

def _format_market_id(self, market_id):
"""
Convert a standard market id to be compatible with AlphaVantage API.
Adds the market exchange prefix (i.e. London is LON:)
"""
if "LSE:" in market_id:
subs = market_id.split(":")
assert len(subs) == 2
return "{}:{}".format(Markets[subs[0]].value, subs[1])
return market_id

def _wait_before_call(self):
"""
Wait between API calls to not overload the server
"""
while (dt.datetime.now() - self._last_call_ts) <= dt.timedelta(
seconds=self._config.get_alpha_vantage_polling_period()
):
time.sleep(0.2)
self._last_call_ts = dt.datetime.now()

def get_prices(self, market_id, interval=AVInterval.DAILY):
"""
Return the price time series of the requested market with the interval
granularity. Return None if the interval is invalid
"""
self._wait_before_call()
if interval == AVInterval.DAILY:
return self._instance.daily(market_id)
return self._daily(market_id)
else:
logging.warning(
"AlphaVantageInterface supports only DAILY interval. Requested interval: {}".format(
Expand Down

0 comments on commit 7be4370

Please sign in to comment.