/
process_fts_updates
executable file
·62 lines (48 loc) · 1.83 KB
/
process_fts_updates
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#!/usr/bin/env python2.7
import psycopg2
import psycopg2.extensions
import select
import time
import logging
def update_fts_columns(cursor):
cursor.execute("SELECT id, message_id FROM fts_update_log;")
ids = []
for (id, message_id) in cursor.fetchall():
cursor.execute("UPDATE zerver_message SET "
"search_tsvector = to_tsvector('zulip.english_us_search', "
"subject || rendered_content) "
"WHERE id = %s", (message_id,))
ids.append(id)
cursor.execute("DELETE FROM fts_update_log WHERE id = ANY(%s)", (ids,))
def am_master(cursor):
cursor.execute("SELECT pg_is_in_recovery()")
return not cursor.fetchall()[0][0]
logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s")
logger = logging.getLogger("process_fts_updates")
logger.setLevel(logging.DEBUG)
logger.info("process_fts_updates starting")
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../../..'))
os.environ['DJANGO_SETTINGS_MODULE'] = 'zproject.settings'
from django.conf import settings
password = getattr(settings, 'LOCAL_DATABASE_PASSWORD')
conn = psycopg2.connect("user=zulip password=%s" %password)
cursor = conn.cursor()
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
first_check = True
while not am_master(cursor):
if first_check:
first_check = False
logger.info("In recovery; sleeping")
time.sleep(5)
logger.info("Not in recovery; listening for FTS updates")
cursor.execute("LISTEN fts_update_log;")
update_fts_columns(cursor)
# TODO: If we go back into recovery, we should stop processing updates
while True:
if select.select([conn], [], [], 30) != ([], [], []):
conn.poll()
while conn.notifies:
conn.notifies.pop()
update_fts_columns(cursor)