-
Notifications
You must be signed in to change notification settings - Fork 0
/
osrcd
executable file
·264 lines (211 loc) · 9.29 KB
/
osrcd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import (division, print_function, absolute_import,
unicode_literals)
__all__ = ["fetch"]
import gevent
from gevent import monkey
monkey.patch_all()
import re
import os
import glob
import gzip
import json
import time
import shutil
import logging
import requests
from datetime import date, timedelta
from tempfile import NamedTemporaryFile
from osrc.index import rebuild_index
from osrc.database import get_pipeline
from osrc.database import format_key as _format
# The default time-to-live for every key (approx 6 months).
DEFAULT_TTL = 6 * 30 * 24 * 60 * 60
# Make sure that the directory for the local caching of the data exists.
local_data_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)),
"data")
local_data_dir = os.environ.get("OSRC_DATA_DIR", local_data_dir)
fn_template = os.path.join(local_data_dir,
"{year}-{month:02d}-{day:02d}-{n}.json.gz")
try:
os.makedirs(local_data_dir)
except os.error:
pass
# The URL template for the GitHub Archive.
archive_url = ("http://data.githubarchive.org/"
"{year}-{month:02d}-{day:02d}-{n}.json.gz")
# Regular expression for parsing filename formats.
date_re = re.compile(r"([0-9]{4})-([0-9]{2})-([0-9]{2})-([0-9]+)\.json.gz")
def _redis_execute(pipe, cmd, key, *args, **kwargs):
key = _format(key)
r = getattr(pipe, cmd)(key, *args, **kwargs)
pipe.expire(key, DEFAULT_TTL)
return r
def _fetch_one(year, month, day, n):
kwargs = {"year": year, "month": month, "day": day, "n": n}
local_fn = fn_template.format(**kwargs)
# Skip if the file exists.
if os.path.exists(local_fn):
return
# Download the remote file.
remote = archive_url.format(**kwargs)
r = requests.get(remote)
if r.status_code == requests.codes.ok:
# Atomically write to disk.
# http://stackoverflow.com/questions/2333872/ \
# atomic-writing-to-file-with-python
f = NamedTemporaryFile("wb", delete=False)
f.write(r.content)
f.flush()
os.fsync(f.fileno())
f.close()
shutil.move(f.name, local_fn)
def fetch(year, month, day):
"""
Asynchronously download all the event archives for one day of activity
from the GitHub Archive.
:param year: The 4-digit year of the date.
:param month: The integer id of the target month ``[1, 12]``.
:param day: The integer id of the target day ``[1, 31]``.
"""
jobs = [gevent.spawn(_fetch_one, year, month, day, n) for n in range(24)]
gevent.joinall(jobs)
def process(filename):
"""
Process a single gzipped archive file and push the results to the database.
:param filename: The absolute path to the archive file.
"""
# Figure out the day of the week from the filename (this is probably not
# always right but it'll do).
year, month, day, hour = map(int, date_re.findall(filename)[0])
weekday = date(year=year, month=month, day=day).strftime("%w")
# Set up a redis pipeline.
pipe = get_pipeline()
# Unzip and load the file.
strt = time.time()
count = 0
with gzip.GzipFile(filename) as f:
# One event per line.
events = [line.decode("utf-8", errors="ignore") for line in f]
count = len(events)
# One event per line.
for n, line in enumerate(events):
# Parse the JSON of this event.
try:
event = json.loads(line)
except:
logging.warn("Failed on line {0} of {1}-{2:02d}-{3:02d}-{4}"
.format(n, year, month, day, hour))
continue
# Get the user involved and skip if there isn't one.
actor = event["actor"]
attrs = event.get("actor_attributes", {})
if actor is None or attrs.get("type") != "User":
# This was probably an anonymous event (like a gist event)
# or an organization event.
continue
# Normalize the user name.
key = actor.lower()
# Get the type of event.
evttype = event["type"]
nevents = 1
# Can this be called a "contribution"?
contribution = evttype in ["IssuesEvent", "PullRequestEvent",
"PushEvent"]
# Increment the global sum histograms.
_redis_execute(pipe, "incr", "total", nevents)
_redis_execute(pipe, "hincrby", "day", weekday, nevents)
_redis_execute(pipe, "hincrby", "hour", hour, nevents)
_redis_execute(pipe, "zincrby", "user", key, nevents)
_redis_execute(pipe, "zincrby", "event", evttype, nevents)
# Event histograms.
_redis_execute(pipe, "hincrby", "event:{0}:day".format(evttype),
weekday, nevents)
_redis_execute(pipe, "hincrby", "event:{0}:hour".format(evttype),
hour, nevents)
# User schedule histograms.
_redis_execute(pipe, "hincrby", "user:{0}:day".format(key),
weekday, nevents)
_redis_execute(pipe, "hincrby", "user:{0}:hour".format(key),
hour, nevents)
# User event type histogram.
_redis_execute(pipe, "zincrby", "user:{0}:event".format(key),
evttype, nevents)
_redis_execute(pipe, "hincrby", "user:{0}:event:{1}:day"
.format(key, evttype), weekday, nevents)
_redis_execute(pipe, "hincrby", "user:{0}:event:{1}:hour"
.format(key, evttype), hour, nevents)
# Parse the name and owner of the affected repository.
repo = event.get("repository", {})
owner, name = repo.get("owner"), repo.get("name")
if owner and name:
repo_name = "{0}/{1}".format(owner, name)
_redis_execute(pipe, "zincrby", "repo", repo_name, nevents)
# Save the social graph.
_redis_execute(pipe, "zincrby", "social:user:{0}".format(key),
repo_name, nevents)
_redis_execute(pipe, "zincrby", "social:repo:{0}"
.format(repo_name), key, nevents)
# Do we know what the language of the repository is?
language = repo.get("language")
if language:
# Which are the most popular languages?
_redis_execute(pipe, "zincrby", "lang", language, nevents)
# Total number of pushes.
if evttype == "PushEvent":
_redis_execute(pipe, "zincrby", "pushes:lang",
language, nevents)
_redis_execute(pipe, "zincrby", "user:{0}:lang"
.format(key), language, nevents)
# Who are the most important users of a language?
if contribution:
_redis_execute(pipe, "zincrby", "lang:{0}:user"
.format(language), key, nevents)
pipe.execute()
logging.info("Processed {0} events in {1} [{2:.2f} seconds]"
.format(count, filename, time.time() - strt))
def fetch_and_process(year, month, day):
logging.info("Processing data for {0:04d}-{1:02d}-{2:02d}"
.format(year, month, day))
fetch(year, month, day)
kwargs = {"year": year, "month": month, "day": day, "n": "*"}
filenames = glob.glob(fn_template.format(**kwargs))
if len(filenames) != 24:
logging.warn("Missing {0} archive files for date "
"{1:04d}-{2:02d}-{3:02d}"
.format(24 - len(filenames), year, month, day))
map(process, filenames)
if __name__ == "__main__":
import argparse
from osrc import create_app
today = date.today()
# Parse the command line arguments.
parser = argparse.ArgumentParser(description="Monitor GitHub activity.")
parser.add_argument("--since", default=None, help="The starting date.")
parser.add_argument("--config", default=None,
help="The path to the local configuration file.")
parser.add_argument("--log", default=None,
help="The path to the log file.")
args = parser.parse_args()
largs = dict(level=logging.INFO,
format="[%(asctime)s] %(name)s:%(levelname)s:%(message)s")
if args.log is not None:
largs["filename"] = args.log
logging.basicConfig(**largs)
# Initialize a flask app.
app = create_app(args.config)
# Set up the app in a request context.
with app.test_request_context():
if args.since is not None:
day = date(**dict(zip(["year", "month", "day"],
map(int, args.since.split("-")))))
while day < today:
fetch_and_process(day.year, day.month, day.day)
day += timedelta(1)
else:
yesterday = today - timedelta(1)
fetch_and_process(yesterday.year, yesterday.month, yesterday.day)
logging.info("Rebuilding index.")
rebuild_index()
logging.info("Finished.")