/
Requester.py
83 lines (65 loc) · 3.12 KB
/
Requester.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# Copyright 2012 Vincent Jacques
# vincent@vincent-jacques.net
# This file is part of PyGithub. http://vincent-jacques.net/PyGithub
# PyGithub is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License
# as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
# PyGithub is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License along with PyGithub. If not, see <http://www.gnu.org/licenses/>.
import httplib
import base64
import urllib
try:
import json
except ImportError:
import simplejson as json
import GithubException
class Requester:
def __init__( self, login_or_token, password ):
if password is not None:
login = login_or_token
self.__authorizationHeader = "Basic " + base64.b64encode( login + ":" + password ).replace( '\n', '' )
elif login_or_token is not None:
token = login_or_token
self.__authorizationHeader = "token " + token
else:
self.__authorizationHeader = None
self.rate_limiting = ( 5000, 5000 )
def requestAndCheck( self, verb, url, parameters, input ):
status, headers, output = self.requestRaw( verb, url, parameters, input )
if status >= 400:
raise GithubException.GithubException( status, output )
return headers, output
def requestRaw( self, verb, url, parameters, input ):
assert verb in [ "HEAD", "GET", "POST", "PATCH", "PUT", "DELETE" ]
assert url.startswith( "https://api.github.com" )
url = url[ len( "https://api.github.com" ) : ]
headers = dict()
if self.__authorizationHeader is not None:
headers[ "Authorization" ] = self.__authorizationHeader
cnx = httplib.HTTPSConnection( "api.github.com", strict = True )
cnx.request(
verb,
self.__completeUrl( url, parameters ),
json.dumps( input ),
headers
)
response = cnx.getresponse()
status = response.status
headers = dict( response.getheaders() )
output = self.__structuredFromJson( response.read() )
cnx.close()
if "x-ratelimit-remaining" in headers and "x-ratelimit-limit" in headers:
self.rate_limiting = ( int( headers[ "x-ratelimit-remaining" ] ), int( headers[ "x-ratelimit-limit" ] ) )
# print verb, url, parameters, input, "==>", status, str( headers )[ :30 ], str( output )[ :30 ]
return status, headers, output
def __completeUrl( self, url, parameters ):
if parameters is None or len( parameters ) == 0:
return url
else:
return url + "?" + urllib.urlencode( parameters )
def __structuredFromJson( self, data ):
if len( data ) == 0:
return None
else:
return json.loads( data )