In [None]:
import calendar
import csv
import datetime
import dateutil.parser
import json
import msgpack
import pytz
import sqlalchemy
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import sessionmaker

In [None]:
project_file = "data/projects-2016-10-14.json"
map_file = "output/07_create_article_project_map/2017-03-27 14:10:49 77c76e1/articles_projects.m"
history_file = "data/final_history_output.csv"
skip_file = "skipped.csv"
index_by_project = False
batch_size = 100000
min_insert_size = 3333
between_sleep_sec = 20.0
sleep_sec = 0.1
log_sec = 20.0
history_columns = [
    "article_name"
     , "article_namespace"
     , "article_id"
     , "redirect"
     , "revision_num"
     , "revision_id"
     , "timestamp"
     , "contributor_name"
     , "contributor_id"
     , "minor"
     , "comment"
     , "length_bytes"
     , "diff_bytes"
     , "deleted"
]
exclude_columns = set()

In [None]:
import database
from database.schema import (
    article_project,
    article_project_names,
    revision_table
)

In [None]:
class HistoryIndexer(object):
    
    def __init__(self, project_file, map_file, history_file, engine, log, skipped):
        self.engine = engine
        self.history_file = history_file
        self.project_file = project_file
        self.map_file = map_file
        self.log = log
        self.to_insert = []
        self.profile = {
            "db":0,
            "search":0,
            "io":0,
            "parse_text":0,
            "parse_row":0,
            "parse_time":0,
            "load":0,
            "log":0,
            "sleep":0}
        self.article_ids = None
        self.project_tables = {}
        self.project_table = None
        self.timestamps = {}
        self.skipped = skipped
        self.last_sleep = time.time()
        self.last_log = time.time()
        
    def log_profile(self, project_id=None):
        elapsed = time.time() - self.begin
        self.log.info("Status\t%d:%s\ttot:%.0f\tlog:%.0f\tload:%.0f\tio:%.0f\tp_row:%.0f\tp_time:%.0f\tsearch:%.0f\tdb:%.0f\tsleep:%.0f" % (
            self.rows_completed,
            project_id or '',
            elapsed,
            self.profile["log"],
            self.profile["load"],
            self.profile["io"],
            self.profile["parse_row"],
            self.profile["parse_time"],
            self.profile["search"],
            self.profile["db"],
            self.profile["sleep"]))

    def load_projects(self):
        self.projects = {}
        self.projects_by_name = {}
        with open(self.project_file, 'rb') as f:
            for i, row in enumerate(f):
                data = json.loads(row)
                project_id = data['project_id']
                self.projects[project_id] = data
        with open(self.map_file, 'rb') as f:
            self.article_projects = msgpack.unpackb(f.read())
        
    def get_project_table(self, project_id):
        try:
            return self.project_tables[project_id]
        except KeyError:
            self.profile["load_start"] = time.time()
            t = revision_table(project_id).__table__.insert()
            self.project_tables[project_id] = t
            self.profile["load"] += time.time() - self.profile["load_start"]
            return t
                    
    def index_batch(self, data):
        log = self.log
        profile = self.profile
        
        ready_for_insert = set()
        for i, row in enumerate(data):
            try:
                datum = {
                    "article_name": row[0]
                     , "article_namespace": row[1]
                     , "article_id": row[2]
                     , "redirect": row[3]
                     , "revision_num": row[4]
                     , "revision_id": row[5]
                     , "timestamp": row[6]
                     , "contributor_name": row[7]
                     , "contributor_id": row[8]
                     , "minor": row[9]
                     , "comment": row[10]
                     , "length_bytes": row[11]
                     , "diff_bytes": row[12]
                     , "deleted": row[13]
                }
            except IndexError:
                self.skipped.write(",".join(row) + "\n")
                self.rows_completed += 1
                continue
            # Only index main and talk namespaces
            if int(datum["article_namespace"]) not in [0,1]:
                self.rows_completed += 1
                continue
            # Get list of projects for the article
            self.profile["search_start"] = time.time()
            try:
                article_id = int(datum["article_id"])
                if article_id == 0:
                    raise ValueError
            except ValueError:
                self.skipped.write(",".join(row) + "\n")
                self.profile["search"] += time.time() - self.profile["search_start"]
                self.rows_completed += 1
                continue
            try:
                projects = self.article_projects[article_id]
            except KeyError:
                # Article not in any projects
                self.profile["search"] += time.time() - self.profile["search_start"]
                self.rows_completed += 1
                continue
            self.profile["search"] += time.time() - self.profile["search_start"]
            # Parse time and convert to naive UTC
            self.profile["parse_time_start"] = time.time()
            dt = dateutil.parser.parse(datum["timestamp"])
            if dt.tzinfo is not None:
                dt = dt.astimezone(pytz.utc).replace(tzinfo=None)
            # Get unix timestamp representation
            unixtime = calendar.timegm(dt.timetuple())
            datum["timestamp"] = unixtime
            self.profile["parse_time"] += time.time() - self.profile["parse_time_start"]
            # Compare revision time to project membership times
            current_projects = set()
            for project_id, membership in projects.iteritems():
                if unixtime >= membership[0] and unixtime <= membership[1]:
                    current_projects.add(project_id)
            if len(current_projects) == 0:
                self.rows_completed += 1
                continue
            # Clean datum cells
            if datum["contributor_id"] == '':
                datum["contributor_id"] = 0
            else:
                datum["contributor_id"] = int(datum["contributor_id"])
            if datum["minor"] == '1':
                datum["minor"] = True
            else:
                datum["minor"] = False
            if datum["length_bytes"] == '':
                datum["length_bytes"] = 0
            else:
                datum["length_bytes"] = int(datum["length_bytes"])
            if datum["diff_bytes"] == '':
                datum["diff_bytes"] = 0
            else:
                datum["diff_bytes"] = int(datum["diff_bytes"])
            if datum["deleted"] == '1':
                datum["deleted"] = True
            else:
                datum["deleted"] = False
            # Queue the article for indexing
            for project_id in current_projects:
                try:
                    self.to_insert_project[project_id].append(datum)
                except KeyError:
                    self.to_insert_project[project_id] = [datum]
            self.rows_completed += 1
        # Determine if any queues have enough entries to insert into the db
        for project_id, revisions in self.to_insert_project.items():
            if len(revisions) >= min_insert_size:
                ready_for_insert.add(project_id)
        # Insert revisions for tables with enough entries
        log.info("Executing inserts for %d tables" % len(ready_for_insert))
        for project_id in list(ready_for_insert):
            #log.debug("Inserting %d" % project_id)
            try:
                t = self.project_tables[project_id]
            except KeyError:
                self.profile["load_start"] = time.time()
                t = revision_table(project_id).__table__.insert()
                self.project_tables[project_id] = t
                self.profile["load"] += time.time() - self.profile["load_start"]
            self.profile["db_start"] = time.time()
            self.conn.execute(t, self.to_insert_project[project_id])
            self.to_insert_project[project_id] = []
            self.profile["db"] += time.time() - self.profile["db_start"]
        t = time.time()
        if t - self.last_sleep > between_sleep_sec:
            time.sleep(sleep_sec)
            self.last_sleep = t
        if t - self.last_log > log_sec:
            self.profile["log_start"] = time.time()
            elapsed = t - self.begin
            self.log.info("Status\t%d:%s\ttot:%.0f\tlog:%.0f\tload:%.0f\tio:%.0f\tp_row:%.0f\tp_time:%.0f\tsearch:%.0f\tdb:%.0f\tsleep:%.0f" % (
                self.rows_completed,
                project_id or '',
                elapsed,
                self.profile["log"],
                self.profile["load"],
                self.profile["io"],
                self.profile["parse_row"],
                self.profile["parse_time"],
                self.profile["search"],
                self.profile["db"],
                self.profile["sleep"]))
            t = time.time()
            self.last_log = t
            self.profile["log"] += t - self.profile["log_start"]

    def flush(self):
        self.log.info("Flushing queued revisions")
        for project_id, revisions in self.to_insert_project.items():
            if len(revisions) > 0:
                self.log.info("  Flusing %d revisions for project %d" %
                    (len(revisions), project_id))
                try:
                    t = self.project_tables[project_id]
                except KeyError:
                    self.profile["load_start"] = time.time()
                    t = revision_table(project_id).__table__.insert()
                    self.project_tables[project_id] = t
                    self.profile["load"] += time.time() - self.profile["load_start"]
                self.profile["db_start"] = time.time()
                self.conn.execute(t, revisions)
                self.profile["db"] += time.time() - self.profile["db_start"]
            self.to_insert_project[project_id] = []
        self.log.info("Flush complete. %d rows completed" % self.rows_completed)
        
    def index_all(self, resume=False, to_skip=0):
        if not resume:
            self.rows_completed = 0
            self.rows_inserted = 0
            self.begin = time.time()
            self.to_insert_project = {}
        try:
            self.conn = self.engine.connect()
            Session = sessionmaker()
            Session.configure(bind=self.engine)
            self.session = Session()
            with open(self.history_file, 'rb') as f:
                # Iterate through history batch_size lines at a time
                # Skip header
                f.next()
                reader = csv.reader(f)
                if resume:
                    to_skip = self.rows_completed
                skipped = 0
                while skipped < to_skip:
                    reader.next()
                    skipped += 1
                self.rows_completed = skipped
                data = []
                try:
                    while 1:
                        self.log.info("Beginning batch\t%d" % self.rows_completed)
                        batch_start = time.time()
                        data = []
                        self.profile["io_start"] = time.time()
                        for i in range(batch_size):
                            data.append(reader.next())
                        self.profile["io"] += time.time() - self.profile["io_start"]
                        # Process the batch
                        if (index_by_project):
                            for project_id in self.projects.keys():
                                self.index_project_batch(project_id, data)
                            self.rows_completed += len(data)
                        else:
                            self.index_batch(data)
                        if self.rows_completed > 0:
                            elapsed = time.time() - batch_start
                            self.log.info("Batch complete\t%d\t%d\t%f\t%f" % (
                                self.rows_completed,
                                batch_size,
                                elapsed,
                                elapsed / float(batch_size)))
                            self.log_profile()
                except StopIteration:
                    # Finish processing the last batch of data
                    self.profile["io"] += time.time() - self.profile["io_start"]
                    if (index_by_project):
                        for project_id in self.projects.keys():
                            self.index_project_batch(project_id, data)
                        self.rows_completed += len(data)
                    else:
                        self.index_batch(data)
                    self.flush()
                    log.info("Successful completion. Cleaning up...")
        except KeyboardInterrupt:
            self.log.error("Exception: %s" % str(sys.exc_info()[1]))
            self.flush()
        except:
            self.log.error("Exception: %s" % str(sys.exc_info()[1]))
            raise
        finally:
            try:
                self.conn.close()
            except AttributeError:
                pass
            try:
                self.session.close()
            except AttributeError:
                pass

In [None]:
import logbook
exp = logbook.Experiment("09_load_revisions")
log = exp.get_logger()
skipped = open(exp.get_filename(skip_file), "wb")
hi = HistoryIndexer(project_file, map_file, history_file, database.engine, log, skipped)
hi.load_projects()

In [None]:
to_skip = 0
hi.index_all(to_skip=to_skip)