Skip to content

Commit

Permalink
Switch database from Postgres to DynamoDB
Browse files Browse the repository at this point in the history
The dataset is growing so let's try using DynamoDB. For now, we're
using both boto (for DynamoDB operations) and botocore (for querying
spot price data). Eventually, we'll move everything to botocore and
possibly use the pynamodb interface. Also:

* Start collecting data for all EC2 "products":

  * Linux/UNIX
  * Linux/UNIX (Amazon VPC)
  * SUSE Linux
  * SUSE Linux (Amazon VPC)
  * Windows
  * Windows (Amazon VPC)

  Previously, we only collected data for Linux/UNIX.

* Handle next_token properly to pull spot price data that is more
  than one page.

* Save the "end_time" timestamp of each run, so we know where to
  start from next time.

* Update to latest bootstrap, jquery, and nvd3.

* Tweak timestamp on graph X axis to include HH:mm.

Performance-wise, the app still feels about the same as when loading
data from Postgres. Possibly it's a little bit slower. This can be
tuned in a future change. We might even use memcache to reduce the
number of DynamoDB operations.
  • Loading branch information
grosskur committed Feb 18, 2014
1 parent 6d20a5d commit 4f5b3b6
Show file tree
Hide file tree
Showing 37 changed files with 28,967 additions and 25,579 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -4,3 +4,6 @@
.venv
.vagrant
*.bak*
.webassets-cache/
.webassets-manifest
ec2price/static/gen/
27 changes: 11 additions & 16 deletions README.md
Expand Up @@ -2,34 +2,29 @@

This application collects and displays prices for EC2 spot instances
over time. It's written in [Python](http://www.python.org/) using the
[Tornado](http://www.tornadoweb.org/) web framework. A periodic task
`ec2price collector` grabs spot price data from the EC2 API using
[botocore](https://github.com/boto/botocore) and stores it in a
[Postgres](http://www.postgresql.org/) database. The web interface
`ec2price web` displays graphs of the data using
[NVD3.js](http://nvd3.org/).
[Tornado](http://www.tornadoweb.org/) web framework.

* The daemon `ec2price collector` grabs spot price data from the EC2
API using [botocore](https://github.com/boto/botocore) and stores it
in [DynamoDB](http://aws.amazon.com/dynamodb/).

* The web interface `ec2price web` displays graphs of the data using
[NVD3.js](http://nvd3.org/).

## Instructions for running on Heroku

```bash
$ git clone https://github.com/grosskur/ec2price.git
$ cd ec2price
$ heroku create your-ec2price
$ heroku addons:add heroku-postgresql:dev
$ heroku pg:promote $(heroku config -s | awk -F= '$1 ~ /^HEROKU_POSTGRESQL_[A-Z]+_URL$/ {print $1}')
$ heroku config:set COOKIE_SECRET=$(python -c "import base64, uuid; print base64.b64encode(uuid.uuid4().bytes + uuid.uuid4().bytes)")
$ heroku config:set TABLE_PREFIX=$(uuidgen | cut -c 1-8 | tr 'A-Z' 'a-z')
$ heroku config:set COOKIE_SECRET=$(head /dev/urandom | base64 | cut -c 1-40)
$ heroku config:set AWS_ACCESS_KEY_ID=...
$ heroku config:set AWS_SECRET_ACCESS_KEY=...
$ heroku pg:psql < ec2price/sql/schema.sql
$ heroku pg:psql < ec2price/sql/initial.sql
$ git push heroku master
$ heroku run env HOURS=24 scripts/ec2price collector
$ heroku addons:add scheduler:standard
$ heroku ps:scale web=1 collector=1
```

Then go to the Heroku dashboard and add a scheduled job to run
`scripts/ec2price collector` every 10 minutes.

## To do

* Experiment with Rickshaw for graph drawing
Expand Down
89 changes: 16 additions & 73 deletions ec2price/app.py
Expand Up @@ -6,13 +6,13 @@
import os
import re

import psycopg2
import psycopg2.extras
import tornado.ioloop
import tornado.web
import webassets.loaders

from .web import MainHandler
from .collector import collect
from .model import Model


PROG = 'ec2price'
Expand All @@ -26,8 +26,7 @@
':(?P<port>\d+)'
'/(?P<dbname>.+)'
)
DATABASE_URL_EXAMPLE = 'postgres://username:password@host:port/dbname'
_HOURS = 8
_HOURS = 1


class ArgumentParser(argparse.ArgumentParser):
Expand Down Expand Up @@ -56,9 +55,6 @@ def main(args):
p_web = subparsers.add_parser('web', help='web app')
p_web.set_defaults(cmd='web')

p_api = subparsers.add_parser('api', help='web API')
p_api.set_defaults(cmd='api')

p_collector = subparsers.add_parser('collector', help='collector')
p_collector.set_defaults(cmd='collector')

Expand All @@ -69,77 +65,33 @@ def main(args):
port = int(os.getenv('PORT', 8080))
address = os.getenv('ADDRESS', '')
cookie_secret = os.getenv('COOKIE_SECRET')
database_url = os.getenv('DATABASE_URL')
table_prefix = os.getenv('TABLE_PREFIX')
gauges_site_id = os.getenv('GAUGES_SITE_ID')

database_dsn = None
if database_url:
m = DATABASE_URL_REGEX.match(database_url)
if not m:
parser.error('must be of form %s' % DATABASE_URL_EXAMPLE)
database_dsn = ' '.join('%s=%s' % (k, v)
for k, v in m.groupdict().items())

if not database_url:
parser.error('DATABASE_URL is required')
if not table_prefix:
parser.error('TABLE_PREFIX is required')
if not cookie_secret:
parser.error('COOKIE_SECRET is required')

db_conn = _get_db_conn(database_dsn)
asset_loader = webassets.loaders.YAMLLoader('webassets.yml')
asset_env = asset_loader.load_environment()
asset_env.debug = debug

params = {
'db_conn': db_conn,
'gauges_site_id': gauges_site_id,
}
handlers = [
(r'/', MainHandler, params),
]
_start_tornado_app(debug, cookie_secret, port, address, handlers)
elif opts.cmd == 'api':
debug = bool(os.getenv('DEBUG', False))
port = int(os.getenv('PORT', 8080))
address = os.getenv('ADDRESS', '')
cookie_secret = os.getenv('COOKIE_SECRET')
database_url = os.getenv('DATABASE_URL')
gauges_site_id = os.getenv('GAUGES_SITE_ID')

database_dsn = None
if database_url:
m = DATABASE_URL_REGEX.match(database_url)
if not m:
parser.error('must be of form %s' % DATABASE_URL_EXAMPLE)
database_dsn = ' '.join('%s=%s' % (k, v)
for k, v in m.groupdict().items())

if not database_url:
parser.error('DATABASE_URL is required')
if not cookie_secret:
parser.error('COOKIE_SECRET is required')

db_conn = _get_db_conn(database_dsn)

params = {
'db_conn': db_conn,
'model': Model(table_prefix),
'asset_env': asset_env,
'gauges_site_id': gauges_site_id,
}
handlers = [
(r'/', MainHandler, params),
]
_start_tornado_app(debug, cookie_secret, port, address, handlers)
elif opts.cmd == 'collector':
database_url = os.getenv('DATABASE_URL')
table_prefix = os.getenv('TABLE_PREFIX')
hours = os.getenv('HOURS')

database_dsn = None
if database_url:
m = DATABASE_URL_REGEX.match(database_url)
if not m:
parser.error('must be of form %s' % DATABASE_URL_EXAMPLE)
database_dsn = ' '.join('%s=%s' % (k, v)
for k, v in m.groupdict().items())

if not database_url:
parser.error('DATABASE_URL is required')
if not table_prefix:
parser.error('TABLE_PREFIX is required')

if not hours:
hours = _HOURS
Expand All @@ -148,9 +100,8 @@ def main(args):
except ValueError:
parser.error('HOURS must be an integer')

db_conn = _get_db_conn(database_dsn)

collect(db_conn, hours)
model = Model(table_prefix)
collect(model, hours)
return 0


Expand All @@ -167,11 +118,3 @@ def _start_tornado_app(debug, cookie_secret, port, address, handlers):
logging.info('listening on port: %d', port)
app.listen(port, address)
tornado.ioloop.IOLoop.instance().start()


def _get_db_conn(database_dsn):
psycopg2.extras.register_uuid()
return psycopg2.connect(
database_dsn,
connection_factory=psycopg2.extras.RealDictConnection,
)
164 changes: 102 additions & 62 deletions ec2price/collector.py
@@ -1,80 +1,120 @@
"""
Data collector
"""
import botocore.session

import contextlib
import datetime
import decimal
import logging
import uuid

import arrow
import botocore.session

_FMT = '%Y-%m-%dT%H:%M:%S.000Z'

_SELECT_SPOT_PRICE = """
select price
from spot_prices, availability_zones, instance_types
where spot_prices.availability_zone_id = availability_zones.id
and availability_zones.api_name = %s
and spot_prices.instance_type_id = instance_types.id
and instance_types.api_name = %s
and spot_prices.ts = %s
limit 1
"""
_INSERT_SPOT_PRICE = """
with a as (select id from availability_zones where api_name = %s),
i as (select id from instance_types where api_name = %s)
insert into spot_prices (id, availability_zone_id, instance_type_id, ts, price)
select %s, a.id, i.id, %s, %s
from a, i
"""
_SELECT_INSTANCE_TYPES = """
select api_name
from instance_types
order by api_name
"""
_EXCLUDED_REGION_PREFIXES = ['cn-', 'us-gov-']
_FMT = 'YYYY-MM-DDTHH:mm:ss.000Z'


logging.getLogger('boto').setLevel(logging.WARN)
logging.getLogger('botocore').setLevel(logging.WARN)
logging.getLogger('requests.packages.urllib3').setLevel(logging.WARN)


def collect(db_conn, hours):
def collect(model, hours):
row = model.progress.get_item(name='end_time')
start_time = arrow.get(row['timestamp'])
#logging.debug('window: past %s hours', hours)
#start_time = arrow.utcnow().replace(hours=-hours)
logging.debug('start time: %s', start_time)

end_time = arrow.utcnow()
logging.debug('end time: %s', end_time)

all_regions = set()
all_product_descriptions = set()
all_instance_types = set()
all_instance_zones = set()

session = botocore.session.get_session()
ec2 = session.get_service('ec2')
operation = ec2.get_operation('DescribeSpotPriceHistory')

d = datetime.datetime.utcnow() - datetime.timedelta(hours=hours)
start_time = d.strftime(_FMT)
for region in ec2.region_names:
if any(region.startswith(x) for x in _EXCLUDED_REGION_PREFIXES):
continue
all_regions.add(region)

with contextlib.closing(db_conn.cursor()) as cursor:
cursor.execute(_SELECT_INSTANCE_TYPES)
rows = cursor.fetchall()
instance_types = [r['api_name'] for r in rows]
next_token = None
while True:
logging.debug('collecting spot prices from region: %s', region)
endpoint = ec2.get_endpoint(region)
if next_token:
response, data = operation.call(
endpoint,
start_time=start_time.format(_FMT),
end_time=end_time.format(_FMT),
next_token=next_token,
)
else:
response, data = operation.call(
endpoint,
start_time=start_time.format(_FMT),
)
next_token = data.get('NextToken')
logging.debug('next_token: %s', next_token)
spot_data = data.get('SpotPriceHistory', [])

for region in ec2.region_names:
logging.debug('collecting spot prices from region: %s', region)
endpoint = ec2.get_endpoint(region)
response, data = operation.call(
endpoint,
instance_types=instance_types,
product_descriptions=['Linux/UNIX'],
start_time=start_time,
)
for i in data.get('spotPriceHistorySet', []):
with contextlib.closing(db_conn.cursor()) as cursor:
cursor.execute(_SELECT_SPOT_PRICE, [
i['availabilityZone'],
i['instanceType'],
i['timestamp'],
])
row = cursor.fetchone()
if not row:
logging.debug('inserting spot price: %s', i)
cursor.execute(_INSERT_SPOT_PRICE, [
i['availabilityZone'],
i['instanceType'],
uuid.uuid4(),
i['timestamp'],
i['spotPrice'],
])
db_conn.commit()
#conn = boto.ec2.connect_to_region(r.name)
#logging.debug('getting spot prices for region: %s', r.name)
#data = conn.get_spot_price_history(start_time=start_time)

logging.debug('saving %d spot prices for region: %s',
len(spot_data), region)
with model.spot_prices.batch_write() as batch:
for d in spot_data:
all_product_descriptions.add(d['ProductDescription'])
all_instance_types.add(d['InstanceType'])
all_instance_zones.add((
d['ProductDescription'],
d['InstanceType'],
d['AvailabilityZone'],
))
batch.put_item(data={
'instance_zone_id': ':'.join([
d['ProductDescription'],
d['InstanceType'],
d['AvailabilityZone'],
]),
'timestamp': arrow.get(d['Timestamp']).timestamp,
'price': decimal.Decimal(str(d['SpotPrice'])),
})
if not next_token:
break

logging.debug('saving %d regions', len(all_regions))
with model.regions.batch_write() as batch:
for i in all_regions:
batch.put_item(data={'region': i})

logging.debug('saving %d product_descriptions',
len(all_product_descriptions))
with model.product_descriptions.batch_write() as batch:
for i in all_product_descriptions:
batch.put_item(data={'product_description': i})

logging.debug('saving %d instance_types', len(all_instance_types))
with model.instance_types.batch_write() as batch:
for i in all_instance_types:
batch.put_item(data={'instance_type': i})

logging.debug('saving %d instance_zones', len(all_instance_zones))
with model.instance_zones.batch_write() as batch:
for i in all_instance_zones:
batch.put_item(data={
'instance_id': ':'.join([i[0], i[1]]),
'zone': i[2],
})

logging.debug('saving end_time')
with model.progress.batch_write() as batch:
batch.put_item(data={
'name': 'end_time',
'timestamp': end_time.timestamp,
})

0 comments on commit 4f5b3b6

Please sign in to comment.