-
Notifications
You must be signed in to change notification settings - Fork 41
/
__init__.py
161 lines (141 loc) · 4.33 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
from sys import version_info
if version_info[0] >= 3:
from urllib.parse import urlencode
from urllib.request import Request, urlopen
import ssl
else:
from urllib import urlencode
from urllib2 import Request, urlopen
import ssl
import fred.config as c
from json import loads
try:
from pandas import DataFrame
_has_pandas = True
except ImportError:
DataFrame = None
_has_pandas = False
# consider putting this in ~/.fred or env var
_USE_JOBLIB_CACHE = True
_THROTTLE_REQUESTS = True
def _fetch(url, ssl_verify = True):
"""
Helper funcation to fetch content from a given url.
"""
req = Request(url)
if ssl_verify:
page = urlopen(req)
else:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
page = urlopen(req, context=ctx)
content = page.read().decode('utf-8')
page.close()
return content
def _url_builder(url_root,api_key,path,params):
"""
Helper funcation to build a parameterized url.
"""
params['api_key'] = api_key
url_end = urlencode(params)
url = "%s%s%s" % (url_root,path,url_end)
return url
def _convert(frame):
"""
Helper funcation to build a parameterized url.
"""
frame = frame.convert_objects(convert_numeric=True)
for column in frame:
if column in c.dates:
frame[column] = frame[column].astype('datetime64')
return frame
def _dict(content):
"""
Helper funcation that converts text-based get response
to a python dictionary for additional manipulation.
"""
if _has_pandas:
data = _data_frame(content).to_dict(orient='records')
else:
response = loads(content)
key = [x for x in response.keys() if x in c.response_data][0]
data = response[key]
return data
def _data_frame(content):
"""
Helper funcation that converts text-based get response
to a pandas dataframe for additional manipulation.
"""
response = loads(content)
key = [x for x in response.keys() if x in c.response_data][0]
frame = DataFrame(response[key])
final_frame = _convert(frame)
return final_frame
def _csv(content):
"""
Helper funcation that converts text-based get response
to comma separated values for additional manipulation.
"""
response = _data_frame(content).to_csv(index=False)
return response
def _tab(content):
"""
Helper funcation that converts text-based get response
to tab separated values for additional manipulation.
"""
response = _data_frame(content).to_csv(index=False,sep='\t')
return response
def _pipe(content):
"""
Helper funcation that converts text-based get response
to pipe separated values for additional manipulation.
"""
response = _data_frame(content).to_csv(index=False,sep='|')
return response
def _numpy(content):
"""
Helper funcation that converts text-based get response
to comma separated values for additional manipulation.
"""
response = _data_frame(content).values
return response
def _json(content):
"""
Pass response
"""
return content
def _xml(content):
"""
Pass response
"""
return content
def _dispatch(response_type):
if _has_pandas:
dispatch = {'dict': _dict,'json': _json,
'xml': _xml,'df':_data_frame,
'csv':_csv,'numpy':_numpy,
'tab': _tab,'pipe': _pipe}
else:
dispatch = {'dict': _dict,'json': _json,'xml': _xml, }
return dispatch[response_type]
def _get_request(url_root,api_key,path,response_type,params, ssl_verify):
"""
Helper funcation that requests a get response from FRED.
"""
url = _url_builder(url_root,api_key,path,params)
content = _fetch(url, ssl_verify)
response = _dispatch(response_type)(content)
return response
if _USE_JOBLIB_CACHE:
import joblib
one_gb = 1000000000
location = '/tmp/joblib_cache'
memory = joblib.Memory(location, verbose=1, bytes_limit=one_gb)
if _THROTTLE_REQUESTS:
from ratelimit import limits, sleep_and_retry
period_seconds = 1
calls_per_second = 20
_get_request = memory.cache(sleep_and_retry(limits(calls=calls_per_second, period=period_seconds)(_get_request)))
else:
_get_request = memory.cache(_get_request)