Permalink
Browse files

Initial import of Stackpy, a Python library for the StackExchange v2 API

  • Loading branch information...
0 parents commit e2ba49c807eed62636e1c86dd674198e67d2efd1 @doismellburning committed May 1, 2012
Showing with 199 additions and 0 deletions.
  1. +2 −0 .gitignore
  2. +3 −0 README.md
  3. +12 −0 setup.py
  4. +148 −0 stackpy.py
  5. +30 −0 test.py
  6. +4 −0 test_docs.py
@@ -0,0 +1,2 @@
+*.pyc
+*.egg-info
@@ -0,0 +1,3 @@
+stackpy is a simple Python wrapper for the [StackExchange API v2](http://api.stackexchange.com/docs).
+
+Currently very pre-alpha. Use at your peril.
@@ -0,0 +1,12 @@
+from distutils.core import setup
+
+setup(name='stackpy',
+ description='Python library for the StackExchange API v2',
+ long_description=open('README.md').read(),
+ author='Kristian Glass',
+ author_email='stackpy@doismellburning.co.uk',
+ url='https://github.com/doismellburning/stackpy',
+ license='MIT',
+ version='0.1',
+ py_modules=['stackpy'],
+ )
@@ -0,0 +1,148 @@
+import httplib
+import urllib
+from StringIO import StringIO
+import gzip
+import json
+
+DEBUG = True
+DEBUG = False
+
+class Stackpy:
+ host = 'api.stackexchange.com'
+ version = '2.0'
+ site = None
+ key = None
+ access_token = None
+
+ def __init__(self):
+ self.connection = httplib.HTTPSConnection(self.host, strict=True)
+
+ def _api_fetch(self, endpoint, site=None, siteless=False, item_type=None,
+ **kwargs):
+ params = {}
+
+ if not siteless:
+ if not site:
+ site = self.site
+ if not site:
+ site = 'stackoverflow'
+ params['site'] = site
+
+ params.update(kwargs)
+
+ if self.key:
+ params['key'] = self.key
+ if self.access_token:
+ params['access_token'] = self.access_token
+ #TODO Barf if no key
+
+ url = "/%s/%s?%s" % (self.version, endpoint, urllib.urlencode(params))
+ self.connection.request('GET', url)
+ response = self.connection.getresponse()
+ #TODO Handle various forms of failure...
+ data = response.read()
+ data = self._decompress(data)
+ data = json.loads(data)
+
+ # If error_id we assume it's all dead Jim
+ # TODO It may not be the case that error => total failure
+ if 'error_id' in data:
+ raise StackpyError(data['error_id'], data['error_name'],
+ data['error_message'])
+
+ data = Response(data, item_type)
+ if data.backoff:
+ #TODO Handle backoff
+ print 'Got backoff of %d - currently unhandled...' % data.backoff
+
+ return data
+
+ def _decompress(self, data):
+ #TODO Handle cases other than "blind gunzip"
+ buf = StringIO(data)
+ gzipfile = gzip.GzipFile(fileobj=buf)
+ data = gzipfile.read()
+ return data
+
+ def sites(self):
+ return self._api_fetch('/sites', siteless=True, item_type=Site)
+
+ def users(self, ids=None):
+ if ids:
+ ids = ';'.join([str(user_id) for user_id in ids])
+ users = self._api_fetch('/users/%s' % ids, item_type=User)
+ else:
+ users = self._api_fetch("/users", item_type=User)
+
+ return users
+
+ def me(self):
+ #TODO Assert access_token
+ return self._api_fetch('/me', item_type=User)
+
+class Base(object):
+ def __init__(self, data):
+ if DEBUG:
+ self.data = data
+ print data
+ for key, value in data.iteritems():
+ if key.startswith('_'):
+ continue
+ setattr(self, key, self._coerce(key, value))
+
+ def _coerce(self, key, value):
+ return value
+
+class Response(Base):
+ """ http://api.stackexchange.com/docs/wrapper """
+ backoff = None
+
+ def __init__(self, data, item_type=None):
+ super(Response, self).__init__(data)
+ # TODO Handle item_type being None / a type being included
+ item_objs = [item_type(item) for item in self.items]
+ self.items = item_objs
+
+class StackpyError(Exception):
+ def __init__(self, error_id, name, description):
+ self.error_id = error_id
+ self.name = name
+ self.description = description
+
+ def __str__(self):
+ return '(%d) %s: %s' % (self.error_id, self.name, self.description)
+
+
+class User(Base):
+ def _coerce(self, key, value):
+ if key == 'badge_counts':
+ return BadgeCount(value)
+ else:
+ return value
+
+class BadgeCount(Base):
+ pass
+
+class Site(Base):
+ pass
+
+def oauth_explicit_one(client_id, redirect_uri, scope=None, state=None):
+ url = 'https://stackexchange.com/oauth'
+ params = {'client_id': client_id, 'redirect_uri': redirect_uri}
+ if scope is not None:
+ params['scope'] = scope
+ if state is not None:
+ params['state'] = state
+ return '%s?%s' % (url, urllib.urlencode(params))
+
+def oauth_explicit_two(client_id, client_secret, code, redirect_uri):
+ params = {'client_id': client_id, 'client_secret': client_secret,
+ 'code': code, 'redirect_uri': redirect_uri}
+ headers = {'Content-Type': 'application/x-www-form-urlencoded'}
+ connection = httplib.HTTPSConnection('stackexchange.com', strict=True)
+ connection.request('POST', '/oauth/access_token', urllib.urlencode(params), headers)
+ # Charles
+ #connection = httplib.HTTPConnection('localhost:8888')
+ #connection.request('POST', 'https://stackexchange.com/oauth/access_token', urllib.urlencode(params))
+ response = connection.getresponse()
+ return response.read()
30 test.py
@@ -0,0 +1,30 @@
+import unittest
+import stackpy
+
+class TestStackpy(unittest.TestCase):
+ """
+ These tests all depend on being able to reach the StackExchange API.
+
+ TODO Sort some mocks such that this dependency is removed? Running on Travis would be nice
+ """
+ def setUp(self):
+ self.stackpy = stackpy.Stackpy()
+
+ def test_users(self):
+ self.stackpy.users()
+
+ def test_users_by_id(self):
+ ME = 928098
+ users = self.stackpy.users([ME])
+ self.assertEqual(len(users.items), 1)
+ user = users.items[0]
+ #print user.__dict__
+ self.assertEqual(user.user_id, ME)
+ self.assertIn('Kristian', user.display_name)
+
+ def test_sites(self):
+ sites = self.stackpy.sites().items
+ self.assertGreater(len(sites), 0)
+
+if __name__ == '__main__':
+ unittest.main()
@@ -0,0 +1,4 @@
+import doctest
+
+doctest.testfile('./README.md')
+doctest.testfile('./stackpy.py')

0 comments on commit e2ba49c

Please sign in to comment.