Skip to content

Commit

Permalink
Added AWS request signing.
Browse files Browse the repository at this point in the history
  • Loading branch information
dsjen committed Jan 8, 2016
1 parent e96c3b1 commit 59e78cf
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 52 deletions.
54 changes: 8 additions & 46 deletions analytics_data_api/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from collections import defaultdict
from importlib import import_module

from django.db.models import Q
from rest_framework.authtoken.models import Token

from analytics_data_api.v0.models import ProblemResponseAnswerDistribution


def delete_user_auth_token(username):
"""
Expand Down Expand Up @@ -47,49 +45,6 @@ def matching_tuple(answer):
)


def consolidate_answers(problem):
""" Attempt to consolidate erroneously randomized answers. """
answer_sets = defaultdict(list)
match_tuple_sets = defaultdict(set)

for answer in problem:
answer.consolidated_variant = False

answer_sets[answer.value_id].append(answer)
match_tuple_sets[answer.value_id].add(matching_tuple(answer))

# If a part has more than one unique tuple of matching fields, do not consolidate.
for _, match_tuple_set in match_tuple_sets.iteritems():
if len(match_tuple_set) > 1:
return problem

consolidated_answers = []

for _, answers in answer_sets.iteritems():
consolidated_answer = None

if len(answers) == 1:
consolidated_answers.append(answers[0])
continue

for answer in answers:
if consolidated_answer:
if isinstance(consolidated_answer, ProblemResponseAnswerDistribution):
consolidated_answer.count += answer.count
else:
consolidated_answer.first_response_count += answer.first_response_count
consolidated_answer.last_response_count += answer.last_response_count
else:
consolidated_answer = answer

consolidated_answer.variant = None
consolidated_answer.consolidated_variant = True

consolidated_answers.append(consolidated_answer)

return consolidated_answers


def dictfetchall(cursor):
"""Returns all rows from a cursor as a dict"""

Expand All @@ -98,3 +53,10 @@ def dictfetchall(cursor):
dict(zip([col[0] for col in desc], row))
for row in cursor.fetchall()
]


def class_for_name(path):
""" Returns the class given the full path. """
module_name, class_name = path.rsplit('.', 1)
module = import_module(module_name)
return getattr(module, class_name)
8 changes: 7 additions & 1 deletion analytics_data_api/v0/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from django.conf import settings
from elasticsearch_dsl import connections

from analytics_data_api.utils import class_for_name


class ApiAppConfig(AppConfig):

Expand All @@ -10,4 +12,8 @@ class ApiAppConfig(AppConfig):
def ready(self):
super(ApiAppConfig, self).ready()
if settings.ELASTICSEARCH_LEARNERS_HOST:
connections.connections.create_connection(hosts=[settings.ELASTICSEARCH_LEARNERS_HOST])

connection_params = {'hosts': [settings.ELASTICSEARCH_LEARNERS_HOST]}
if settings.ELASTICSEARCH_CONNECTION_CLASS:
connection_params['connection_class'] = class_for_name(settings.ELASTICSEARCH_CONNECTION_CLASS)
connections.connections.create_connection(**connection_params)
69 changes: 69 additions & 0 deletions analytics_data_api/v0/connections.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import json
import time

from boto.connection import AWSAuthConnection
from django.conf import settings
from elasticsearch import Connection


class BotoHttpConnection(Connection):
"""
Uses AWS configured connection to sign requests before they're sent to
elasticsearch nodes.
"""

connection = None

def __init__(self, host='localhost', port=443, **kwargs):
super(BotoHttpConnection, self).__init__(host=host, port=port, **kwargs)
connection_params = {'host': host, 'port': port}

# Set the credentials through settings. If not provided, boto will attempt to use
# default environment variables.
if settings.ELASTICSEARCH_AWS_ACCESS_KEY_ID:
connection_params['aws_access_key_id'] = settings.ELASTICSEARCH_AWS_ACCESS_KEY_ID
if settings.ELASTICSEARCH_AWS_SECRET_ACCESS_KEY:
connection_params['aws_secret_access_key'] = settings.ELASTICSEARCH_AWS_SECRET_ACCESS_KEY
self.connection = ESConnection(**connection_params)

# pylint: disable=unused-argument
def perform_request(self, method, url, params=None, body=None, timeout=None, ignore=()):
"""
Called when making requests elasticsearch. Requests are signed and
http status, headers, and response is returned.
Note: the "timeout" kwarg is ignored in this case. Boto manages the timeout
and the default is 70 seconds.
See: https://github.com/boto/boto/blob/develop/boto/connection.py#L533
"""
if not isinstance(body, basestring):
body = json.dumps(body)
start = time.time()
response = self.connection.make_request(method, url, params=params, data=body)
duration = time.time() - start
raw_data = response.read()

# raise errors based on http status codes and let the client handle them
if not (200 <= response.status < 300) and response.status not in ignore:
self.log_request_fail(method, url, body, duration, response.status)
self._raise_error(response.status, raw_data)

self.log_request_success(method, url, url, body, response.status, raw_data, duration)

return response.status, dict(response.getheaders()), raw_data


class ESConnection(AWSAuthConnection):
"""
Use to sign requests for an AWS hosted elasticsearch cluster.
"""

def __init__(self, *args, **kwargs):
region = kwargs.pop('region', settings.ELASTICSEARCH_CONNECTION_DEFAULT_REGION)
kwargs.setdefault('is_secure', True)
super(ESConnection, self).__init__(*args, **kwargs)
self.auth_region_name = region
self.auth_service_name = 'es'

def _required_auth_capability(self):
return ['hmac-v4']
35 changes: 35 additions & 0 deletions analytics_data_api/v0/tests/test_connections.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from django.test import TestCase
from django.test.utils import override_settings
from elasticsearch.exceptions import ElasticsearchException
from mock import patch

from analytics_data_api.v0.connections import BotoHttpConnection, ESConnection


class ESConnectionTests(TestCase):

@override_settings(ELASTICSEARCH_CONNECTION_DEFAULT_REGION='region_123')
def test_region(self):
connection = ESConnection('mockservice.cc-zone-1.amazonaws.com',
aws_access_key_id='access_key',
aws_secret_access_key='secret')
self.assertEqual(connection.auth_region_name, 'region_123')


@override_settings(ELASTICSEARCH_AWS_ACCESS_KEY_ID='access_key')
@override_settings(ELASTICSEARCH_AWS_SECRET_ACCESS_KEY='secret')
class BotoHttpConnectionTests(TestCase):

@patch('analytics_data_api.v0.connections.ESConnection.make_request')
def test_perform_request_success(self, mock_response):
mock_response.return_value.status = 200
connection = BotoHttpConnection()
status, _header, _data = connection.perform_request('get', 'http://example.com')
self.assertEqual(status, 200)

@patch('analytics_data_api.v0.connections.ESConnection.make_request')
def test_perform_request_error(self, mock_response):
mock_response.return_value.status = 500
connection = BotoHttpConnection()
with self.assertRaises(ElasticsearchException):
connection.perform_request('get', 'http://example.com')
47 changes: 45 additions & 2 deletions analytics_data_api/v0/views/problems.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
API methods for module level data.
"""

from collections import defaultdict
from itertools import groupby

from django.db import OperationalError
Expand All @@ -19,7 +20,7 @@
GradeDistributionSerializer,
SequentialOpenDistributionSerializer,
)
from analytics_data_api.utils import consolidate_answers
from analytics_data_api.utils import matching_tuple


class ProblemResponseAnswerDistributionView(generics.ListAPIView):
Expand Down Expand Up @@ -55,6 +56,48 @@ class ProblemResponseAnswerDistributionView(generics.ListAPIView):
serializer_class = ConsolidatedAnswerDistributionSerializer
allow_empty = False

@classmethod
def consolidate_answers(cls, problem):
""" Attempt to consolidate erroneously randomized answers. """
answer_sets = defaultdict(list)
match_tuple_sets = defaultdict(set)

for answer in problem:
answer.consolidated_variant = False
answer_sets[answer.value_id].append(answer)
match_tuple_sets[answer.value_id].add(matching_tuple(answer))

# If a part has more than one unique tuple of matching fields, do not consolidate.
for _, match_tuple_set in match_tuple_sets.iteritems():
if len(match_tuple_set) > 1:
return problem

consolidated_answers = []

for _, answers in answer_sets.iteritems():
consolidated_answer = None

if len(answers) == 1:
consolidated_answers.append(answers[0])
continue

for answer in answers:
if consolidated_answer:
if isinstance(consolidated_answer, ProblemResponseAnswerDistribution):
consolidated_answer.count += answer.count
else:
consolidated_answer.first_response_count += answer.first_response_count
consolidated_answer.last_response_count += answer.last_response_count
else:
consolidated_answer = answer

consolidated_answer.variant = None
consolidated_answer.consolidated_variant = True

consolidated_answers.append(consolidated_answer)

return consolidated_answers

def get_queryset(self):
"""Select all the answer distribution response having to do with this usage of the problem."""
problem_id = self.kwargs.get('problem_id')
Expand All @@ -69,7 +112,7 @@ def get_queryset(self):
consolidated_rows = []

for _, part in groupby(queryset, lambda x: x.part_id):
consolidated_rows += consolidate_answers(list(part))
consolidated_rows += self.consolidate_answers(list(part))

return consolidated_rows

Expand Down
10 changes: 10 additions & 0 deletions analyticsdataserver/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@
ELASTICSEARCH_LEARNERS_HOST = environ.get('ELASTICSEARCH_LEARNERS_HOST', None)
ELASTICSEARCH_LEARNERS_INDEX = environ.get('ELASTICSEARCH_LEARNERS_INDEX', None)
ELASTICSEARCH_LEARNERS_UPDATE_INDEX = environ.get('ELASTICSEARCH_LEARNERS_UPDATE_INDEX', None)

# access credentials for signing requests to AWS.
# For more information see http://docs.aws.amazon.com/general/latest/gr/signing_aws_api_requests.html
ELASTICSEARCH_AWS_ACCESS_KEY_ID = None
ELASTICSEARCH_AWS_SECRET_ACCESS_KEY = None
# override the default elasticsearch connection class and useful for signing certificates
# e.g. 'analytics_data_api.v0.connections.BotoHttpConnection'
ELASTICSEARCH_CONNECTION_CLASS = None
# only needed with BotoHttpConnection, e.g. 'us-east-1'
ELASTICSEARCH_CONNECTION_DEFAULT_REGION = None
########## END ELASTICSEARCH CONFIGURATION

########## GENERAL CONFIGURATION
Expand Down
8 changes: 5 additions & 3 deletions requirements/base.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
boto==2.22.1 # MIT
Django==1.7.5 # BSD License
Markdown==2.6 # BSD
django-model-utils==2.2 # BSD
djangorestframework==2.4.4 # BSD
ipython==2.4.1 # BSD
django-rest-swagger==0.2.8 # BSD
djangorestframework-csv==1.3.3 # BSD
django-countries==3.2 # MIT
elasticsearch-dsl==0.0.9 # Apache 2.0
elasticsearch-dsl==0.0.9 # Apache 2.0
ipython==2.4.1 # BSD
Markdown==2.6 # BSD

-e git+https://github.com/edx/opaque-keys.git@d45d0bd8d64c69531be69178b9505b5d38806ce0#egg=opaque-keys

0 comments on commit 59e78cf

Please sign in to comment.