forked from ipython/ipython
/
gh_api.py
201 lines (166 loc) · 6.44 KB
/
gh_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
"""Functions for Github authorisation."""
from __future__ import print_function
try:
input = raw_input
except NameError:
pass
import os
import requests
import getpass
import json
# Keyring stores passwords by a 'username', but we're not storing a username and
# password
fake_username = 'ipython_tools'
class Obj(dict):
"""Dictionary with attribute access to names."""
def __getattr__(self, name):
try:
return self[name]
except KeyError:
raise AttributeError(name)
def __setattr__(self, name, val):
self[name] = val
token = None
def get_auth_token():
global token
if token is not None:
return token
import keyring
token = keyring.get_password('github', fake_username)
if token is not None:
return token
print("Please enter your github username and password. These are not "
"stored, only used to get an oAuth token. You can revoke this at "
"any time on Github.")
user = input("Username: ")
pw = getpass.getpass("Password: ")
auth_request = {
"scopes": [
"public_repo",
"gist"
],
"note": "IPython tools",
"note_url": "https://github.com/ipython/ipython/tree/master/tools",
}
response = requests.post('https://api.github.com/authorizations',
auth=(user, pw), data=json.dumps(auth_request))
response.raise_for_status()
token = json.loads(response.text)['token']
keyring.set_password('github', fake_username, token)
return token
def make_auth_header():
return {'Authorization': 'token ' + get_auth_token()}
def post_issue_comment(project, num, body):
url = 'https://api.github.com/repos/{project}/issues/{num}/comments'.format(project=project, num=num)
payload = json.dumps({'body': body})
requests.post(url, data=payload, headers=make_auth_header())
def post_gist(content, description='', filename='file', auth=False):
"""Post some text to a Gist, and return the URL."""
post_data = json.dumps({
"description": description,
"public": True,
"files": {
filename: {
"content": content
}
}
}).encode('utf-8')
headers = make_auth_header() if auth else {}
response = requests.post("https://api.github.com/gists", data=post_data, headers=headers)
response.raise_for_status()
response_data = json.loads(response.text)
return response_data['html_url']
def get_pull_request(project, num):
"""get pull request info by number
"""
url = "https://api.github.com/repos/{project}/pulls/{num}".format(project=project, num=num)
response = requests.get(url)
response.raise_for_status()
return json.loads(response.text, object_hook=Obj)
def get_pulls_list(project):
"""get pull request list
"""
url = "https://api.github.com/repos/{project}/pulls".format(project=project)
response = requests.get(url)
response.raise_for_status()
return json.loads(response.text)
# encode_multipart_formdata is from urllib3.filepost
# The only change is to iter_fields, to enforce S3's required key ordering
def iter_fields(fields):
fields = fields.copy()
for key in ('key', 'acl', 'Filename', 'success_action_status', 'AWSAccessKeyId',
'Policy', 'Signature', 'Content-Type', 'file'):
yield (key, fields.pop(key))
for (k,v) in fields.items():
yield k,v
def encode_multipart_formdata(fields, boundary=None):
"""
Encode a dictionary of ``fields`` using the multipart/form-data mime format.
:param fields:
Dictionary of fields or list of (key, value) field tuples. The key is
treated as the field name, and the value as the body of the form-data
bytes. If the value is a tuple of two elements, then the first element
is treated as the filename of the form-data section.
Field names and filenames must be unicode.
:param boundary:
If not specified, then a random boundary will be generated using
:func:`mimetools.choose_boundary`.
"""
# copy requests imports in here:
from io import BytesIO
from requests.packages.urllib3.filepost import (
choose_boundary, six, writer, b, get_content_type
)
body = BytesIO()
if boundary is None:
boundary = choose_boundary()
for fieldname, value in iter_fields(fields):
body.write(b('--%s\r\n' % (boundary)))
if isinstance(value, tuple):
filename, data = value
writer(body).write('Content-Disposition: form-data; name="%s"; '
'filename="%s"\r\n' % (fieldname, filename))
body.write(b('Content-Type: %s\r\n\r\n' %
(get_content_type(filename))))
else:
data = value
writer(body).write('Content-Disposition: form-data; name="%s"\r\n'
% (fieldname))
body.write(b'Content-Type: text/plain\r\n\r\n')
if isinstance(data, int):
data = str(data) # Backwards compatibility
if isinstance(data, six.text_type):
writer(body).write(data)
else:
body.write(data)
body.write(b'\r\n')
body.write(b('--%s--\r\n' % (boundary)))
content_type = b('multipart/form-data; boundary=%s' % boundary)
return body.getvalue(), content_type
def post_download(project, filename, name=None, description=""):
"""Upload a file to the GitHub downloads area"""
if name is None:
name = os.path.basename(filename)
with open(filename, 'rb') as f:
filedata = f.read()
url = "https://api.github.com/repos/{project}/downloads".format(project=project)
payload = json.dumps(dict(name=name, size=len(filedata),
description=description))
response = requests.post(url, data=payload, headers=make_auth_header())
response.raise_for_status()
reply = json.loads(response.content)
s3_url = reply['s3_url']
fields = dict(
key=reply['path'],
acl=reply['acl'],
success_action_status=201,
Filename=reply['name'],
AWSAccessKeyId=reply['accesskeyid'],
Policy=reply['policy'],
Signature=reply['signature'],
file=(reply['name'], filedata),
)
fields['Content-Type'] = reply['mime_type']
data, content_type = encode_multipart_formdata(fields)
s3r = requests.post(s3_url, data=data, headers={'Content-Type': content_type})
return s3r