-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.py
executable file
·88 lines (71 loc) · 2.18 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#! /usr/bin/env python
#
# Mixpanel, Inc. -- http://mixpanel.com/
#
# Python API client library to consume mixpanel.com analytics data.
import hashlib
import ast
import urllib
import pickle
import time
try:
import json
except ImportError:
import simplejson as json
class Mixpanel(object):
ENDPOINT = 'http://mixpanel.com/api'
VERSION = '2.0'
def __init__(self, api_key, api_secret):
self.api_key = api_key
self.api_secret = api_secret
def request(self, methods, params):
"""
methods - List of methods to be joined, e.g. ['events', 'properties', 'values']
will give us http://mixpanel.com/api/2.0/events/properties/values/
params - Extra parameters associated with method
"""
params['api_key'] = self.api_key
params['expire'] = int(time.time()) + 600 # Grant this request 10 minutes.
if 'sig' in params: del params['sig']
params['sig'] = self.hash_args(params)
request_url = '/'.join([self.ENDPOINT, str(self.VERSION)] + methods) + '/?' + self.unicode_urlencode(params)
request = urllib.urlopen(request_url)
data = request.read()
return data
def unicode_urlencode(self, params):
"""
Convert lists to JSON encoded strings, and correctly handle any
unicode URL parameters.
"""
if isinstance(params, dict):
params = params.items()
for i, param in enumerate(params):
if isinstance(param[1], list):
params[i] = (param[0], json.dumps(param[1]),)
return urllib.urlencode(
[(k, isinstance(v, unicode) and v.encode('utf-8') or v) for k, v in params]
)
def hash_args(self, args, secret=None):
"""
Hashes arguments by joining key=value pairs, appending a secret, and
then taking the MD5 hex digest.
"""
for a in args:
if isinstance(args[a], list): args[a] = json.dumps(args[a])
args_joined = ''
for a in sorted(args.keys()):
if isinstance(a, unicode):
args_joined += a.encode('utf-8')
else:
args_joined += str(a)
args_joined += '='
if isinstance(args[a], unicode):
args_joined += args[a].encode('utf-8')
else:
args_joined += str(args[a])
hash = hashlib.md5(args_joined)
if secret:
hash.update(secret)
elif self.api_secret:
hash.update(self.api_secret)
return hash.hexdigest()