This repository has been archived by the owner on Nov 12, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 20
/
fql.py
127 lines (91 loc) · 3.32 KB
/
fql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# -*- coding: utf-8 -*-
import urllib2
import bunch
import ujson as json
from graph import GraphException
from url_operations import add_path, update_query_params
class FQL(object):
"""
A maker of single and multiple FQL queries.
Usage
=====
Single queries:
>>> q = FQL('access_token')
>>> result = q("SELECT post_id FROM stream WHERE source_id = ...")
>>> result
[Bunch(post_id='XXXYYYZZZ'), ...]
>>> result[0]
Bunch(post_id='XXXYYYZZZ')
>>> result[0].post_id
'XXXYYYZZZ'
Multiple queries:
>>> q = FQL('access_token')
>>> result = q.multi(dict(query1="SELECT...", query2="SELECT..."))
>>> result[0].name
'query1'
>>> result[0].fql_result_set
[...]
>>> result[1].name
'query2'
>>> result[1].fql_result_set
[...]
"""
ENDPOINT = 'https://api.facebook.com/method/'
def __init__(self, access_token=None, err_handler=None):
self.access_token = access_token
self.err_handler = err_handler
def __call__(self, query, **params):
"""
Execute a single FQL query (using `fql.query`).
Example:
>>> q = FQL('access_token')
>>> result = q("SELECT post_id FROM stream WHERE source_id = ...")
>>> result
[Bunch(post_id='XXXYYYZZZ'), ...]
>>> result[0]
Bunch(post_id='XXXYYYZZZ')
>>> result[0].post_id
'XXXYYYZZZ'
"""
url = add_path(self.ENDPOINT, 'fql.query')
params.update(query=query, access_token=self.access_token,
format='json')
url = update_query_params(url, params)
return self.fetch_json(url)
def multi(self, queries, **params):
"""
Execute multiple FQL queries (using `fql.multiquery`).
Example:
>>> q = FQL('access_token')
>>> result = q.multi(dict(query1="SELECT...", query2="SELECT..."))
>>> result[0].name
'query1'
>>> result[0].fql_result_set
[...]
>>> result[1].name
'query2'
>>> result[1].fql_result_set
[...]
"""
url = add_path(self.ENDPOINT, 'fql.multiquery')
params.update(queries=json.dumps(queries),
access_token=self.access_token, format='json')
url = update_query_params(url, params)
return self.fetch_json(url)
@classmethod
def fetch_json(cls, url, data=None):
response = json.loads(cls.fetch(url, data=data))
if isinstance(response, dict):
if response.get("error_msg"):
code = response.get("error_code")
msg = response.get("error_msg")
args = response.get("request_args")
raise GraphException(code, msg, args=args)
return bunch.bunchify(response)
@staticmethod
def fetch(url, data=None):
conn = urllib2.urlopen(url, data=data)
try:
return conn.read()
finally:
conn.close()