-
Notifications
You must be signed in to change notification settings - Fork 4
/
backfill.py
122 lines (102 loc) · 4 KB
/
backfill.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import logging, itertools
from mapreduce import control
from mapreduce import operation as op
import request_handler
import models
import facebook_util
from nicknames import get_nickname_for
from google.appengine.ext import db
def cache_user_nickname(user_data):
if not user_data or not user_data.user:
return
current_nickname = get_nickname_for(user_data)
if user_data.user_nickname != current_nickname:
user_data.user_nickname = current_nickname
yield op.db.Put(user_data)
def check_user_properties(user_data):
if not user_data or not user_data.user:
return
if not user_data.current_user:
logging.critical("Missing current_user: %s" % user_data.user)
if not user_data.user_id:
logging.critical("Missing user_id: %s" % user_data.user)
if not user_data.user_email:
logging.critical("Missing user_email: %s" % user_data.user)
if user_data.current_user.email() != user_data.user_email:
logging.warning("current_user does not match user_email: %s" % user_data.user)
if facebook_util.is_facebook_user_id(user_data.user_id) or facebook_util.is_facebook_user_id(user_data.user_email):
if user_data.user_id != user_data.user_email:
logging.critical("facebook user's user_id does not match user_email: %s" % user_data.user)
def remove_deleted_studentlists(studentlist):
try:
deleted = studentlist.deleted
del studentlist.deleted
if deleted:
yield op.db.Delete(studentlist)
else:
yield op.db.Put(studentlist)
except AttributeError:
pass
# do nothing, as this studentlist is fine.
def dedupe_related_videos(exercise):
exvids = exercise.related_videos_query().fetch(100)
video_keys = set()
for exvid in exvids:
video_key = exvid.video.key()
if video_key in video_keys:
logging.critical("Deleting ExerciseVideo for %s, %s",
exercise.name,
video_key.id_or_name())
yield op.db.Delete(exvid)
else:
video_keys.add(video_key)
def migrate_userdata(key):
def tn(key):
user_data = db.get(key)
# remove blank entries if present
user_data.all_proficient_exercises.remove('')
user_data.proficient_exercises.remove('')
user_data.badges.remove('')
user_data.put()
db.run_in_transaction(tn, key)
class StartNewBackfillMapReduce(request_handler.RequestHandler):
def get(self):
# pass
# Admin-only restriction is handled by /admin/* URL pattern
# so this can be called by a cron job.
# Start a new Mapper task.
mapreduce_id = control.start_map(
name="migrate_userdata",
handler_spec="backfill.migrate_userdata",
reader_spec="mapreduce.input_readers.DatastoreKeyInputReader",
reader_parameters={
"entity_kind": "models.UserData",
"processing_rate": 200,
},
shard_count=64,
queue_name="backfill-mapreduce-queue",
)
self.response.out.write("OK: " + str(mapreduce_id))
def transactional_entity_put(entity_key):
def entity_put(entity_key):
entity = db.get(entity_key)
entity.put()
db.run_in_transaction(entity_put, entity_key)
class BackfillEntity(request_handler.RequestHandler):
def get(self):
entity = self.request_string("kind")
if not entity:
self.response.out.write("Must provide kind")
return
mapreduce_id = control.start_map(
name="Put all UserData entities",
handler_spec="backfill.transactional_entity_put",
reader_spec="mapreduce.input_readers.DatastoreKeyInputReader",
reader_parameters={
"entity_kind": entity,
"processing_rate": 200
},
shard_count=64,
queue_name="backfill-mapreduce-queue",
)
self.response.out.write("OK: " + str(mapreduce_id))