In [1]:
import functools
import itertools
import collections
import typing
import re
import attr
import pprint

import xlrd

PAT_MULTIPLE_SPACES = re.compile(r'[\s\r\n]+')
PAT_DASH_WITHIN_WORD = re.compile(r'(\w)-(\w)')

def normalize_str(s: str):
    assert isinstance(s, str), repr(type(s))
    s = PAT_MULTIPLE_SPACES.sub(' ', s)
    s = PAT_DASH_WITHIN_WORD.sub(r'\1\2', s)
    s = s.strip()
    return s

# Row primitive

class VPO1Row:
    def __init__(self, cells):
        cells = list(cells)
        self.cells = cells
        self.atomic_to_full = [i if c is not None
                               else None
                               for i, c in enumerate(cells)]
        self.atomic_to_full[0] = 0
        self._n_cells = sum(map(lambda x: x is not None, self.atomic_to_full))
        for i, ref in enumerate(self.atomic_to_full):
            if ref is None:
                self.atomic_to_full[i] = self.atomic_to_full[i - 1]
        if self[0] is None:
            return
        for i in range(1, len(self.cells)):
            assert self[i] is not None, repr({'cells': self.cells, 'i': i})
    @property
    def n_full_cells(self):
        return self._n_cells
    @property
    def n_atomic_cells(self):
        return len(self.cells)
    def clone(self):
        return VPO1Row(self.cells)
    def __getitem__(self, atomic_cell_id):
        cell_id = self.atomic_to_full[atomic_cell_id]
        return self.cells[cell_id]
    def __repr__(self):
        return repr(self.cells)
    def __len__(self):
        return len(self.cells)
    def __iter__(self):
        return iter(self.cells)
    def full_cells(self):
        yield self[0]
        for i in range(1, len(self.atomic_to_full)):
            if self.atomic_to_full[i] != self.atomic_to_full[i - 1]:
                yield self[i]
                
class VPO1Error(Exception):
    pass


# First we want to transform "rows" as "arrays of cells" into typed rows

@attr.s
class MetadataRow:
    field = attr.ib()
    value = attr.ib()
    

@attr.s
class DFHeader:
    col_names = attr.ib()
    col_numbers = attr.ib()
    
@attr.s
class DFDataRow:
    name = attr.ib()
    group = attr.ib()
    cells = attr.ib()

class Stream:
    def __init__(self, iterator):
        self.it = iterator
        self.deque = collections.deque()
        
    def get(self):
        if not self.deque:
            self.deque.appendleft(next(self.it))
        # next() has already raise StopIteration by this moment
        return self.deque.popleft()
    def unget(self, c):
        self.deque.appendleft(c)
    
class StreamBranch(Stream):
    def __init__(self, stream: Stream):
        super(StreamBranch, self).__init__([])
        assert stream is not None
        self.stream = stream
        self.backup = []
    def get(self):
        value = self.stream.get()
        self.backup.append(value)
        return value
    def unget(self, c):
        if self.backup:
            self.backup.pop()
        self.stream.unget(c)
    def __enter__(self):
        return self
    def __exit__(self, exc_type, exc_value, traceback):
        if exc_value is None:
            return
        while self.backup:
            self.stream.unget(self.backup.pop())


# We're going to yield the control over "rows" to the "state"
# so that when reading e.g. header
# its inner state is not visible to the global "reader"
# but only to the header's "reader".

def read_region(stream: Stream):
    row = stream.get()
    if row.n_full_cells != 1:
        stream.unget(row)
        raise VPO1Error('Expected region, got a row with {} cells'.format(row.n_full_cells))
    yield MetadataRow('region', row[0])
    yield from read_funded_by(stream)
    
def read_funded_by(stream: Stream):
    row = stream.get()
    if row.n_full_cells != 1:
        stream.unget(row)
        raise VPO1Error('Expected funded_by, got a row with {} cells'.format(row.n_full_cells))
    yield MetadataRow('funded_by', row[0])
    yield from read_time_involvement(stream)
    
def read_time_involvement(stream: Stream):
    row = stream.get()
    if row.n_full_cells != 1:
        stream.unget(row)
        raise VPO1Error('Expected time_involvement, got a row with {} cells'.format(row.n_full_cells))
    yield MetadataRow('time_involvement', row[0])
    yield from read_either(stream, read_section, read_chapter)

def read_either(stream: Stream, *readers):
    errors = []
    ok = False
    for read in readers:
        try:
            yield from read(stream)
            ok = True
            break
        except VPO1Error as e:
            errors.append(e)
    if not ok:
        raise VPO1Error('``except_either``: all readers failed; errors: {}'.format([repr(e) for e in errors]))

PAT_CHAPTER = re.compile(r'^Раздел (?P<number>\d+)\. (?P<name>.*)$')
def read_chapter(stream: Stream):
    row = stream.get()
    if row.n_full_cells != 1:
        stream.unget(row)
        raise VPO1Error('Expected chapter, got a row with {} cells'.format(row.n_full_cells))
    m = PAT_CHAPTER.match(row[0])
    if not m:
        raise VPO1Error('Expected chapter. No match')
    yield MetadataRow('chapter', (m.group('number'), m.group('name')))

PAT_SECTION = re.compile(r'^(?P<number>\d(?:\.\d+)*)\. (?P<name>.*)$')
def read_section(stream: Stream):

    row = stream.get()
    if row.n_full_cells != 1:
        stream.unget(row)
        raise VPO1Error('Expected section, got a row with {} cells'.format(row.n_full_cells))
    m = PAT_SECTION.match(row[0])
    if not m:
        raise VPO1Error('Expected section. No match')
    yield MetadataRow('section', (m.group('number'), m.group('name')))
    yield from read_either(stream, read_units, read_df_header)

PAT_UNITS = re.compile(r'.*ОКЕИ[^:]*:\s?(?P<units>.*)\s*$')
def read_units(stream):
    row = stream.get()
    if row.n_full_cells != 1:
        stream.unget(row)
        raise VPO1Error('Expected units, got a row with {} cells'.format(row.n_full_cells))
    m = PAT_UNITS.match(row[0])
    if not m:
        raise VPO1Error('Expected units. No match')
    yield MetadataRow('units', m.group('units'))
    yield from read_df_header(stream)

def peek_stream_cols_no(stream: Stream):
    row = stream.get()
    n_cols = len(row.cells)
    stream.unget(row)
    return n_cols

def read_df_header(stream: Stream):
    with StreamBranch(stream) as s:
        n_cols = peek_stream_cols_no(s)
        colnames = [list() for i in range(n_cols)]
        is_colnumbers = lambda row: (
            row.n_full_cells == len(colnames)
            and row[0] == 1
            and row[1] == 2
            and all(
                isinstance(j, (int, float))
                and j == int(j)
                for j in row))
        row = s.get()
        while not is_colnumbers(row):
            for i in range(len(colnames)):
                if row[i] is None:
                    continue
                colnames[i].append(row[i])
            row = s.get()
        colnums = tuple(map(int, row.cells))
        colnames = [' '.join(col) for col in colnames]
        colnames = [normalize_str(col) for col in colnames]
        yield DFHeader(colnames, colnums)
    yield from read_df_data(colnums, stream)
    
def read_df_data(colnums, stream: Stream):
    try:
        row = stream.get()
        rowname = row[0]
        if rowname is not None:
            rowname = normalize_str(rowname)
            group = row[1]
            group = int(group) if int(group) == group else group
            data = itertools.zip_longest(colnums, row)
            data = list(itertools.islice(data, 2, None))
            yield DFDataRow(rowname, group, data)
        yield from read_df_data(colnums, stream)
    except StopIteration:
        pass
    
def read_sheet(rows: Stream):
    # that's one way:
    # we just enter initial state
    # whose generator knows what to call next
    #
    # semantically, however, it's rather meaningless
    yield from read_region(rows)
    # we'd rather want to look it like:
    # yield from read_region(rows)
    # yield from read_funded_by(rows)
    # yield from read_time_involvement(rows)
    # yield from read_chapter_and_or_section(rows)
    # yield from try_read_units(rows)
    # yield from try_read_df(rows)
    #
    # but that's for next iteration

In [2]:
example_path = 'Своды ВПО-1 2018/Государственные/Воронежская область_ГОС_очная.xls'
workbook = xlrd.open_workbook(example_path)
sheet = workbook.sheet_by_name('Р2_1_2 (4)')

def unpack_cell(cell):
    # TODO: apply normalization, stripping str's, etc -- right here
    return cell.value if cell.ctype != 0 else None

def unpack_row(row):
    return map(unpack_cell, row)

for evt in read_sheet(Stream(map(VPO1Row,
                                  map(unpack_row,
                                      sheet.get_rows())))):
    print(evt)

MetadataRow(field='region', value='Воронежская область')
MetadataRow(field='funded_by', value='Государственные, Муниципальные')
MetadataRow(field='time_involvement', value='очная')
MetadataRow(field='section', value=('2.1.2', 'Распределение численности студентов по курсам, направлениям подготовки и специальностям'))
MetadataRow(field='units', value='человек - 792')
DFHeader(col_names=['Наименование направления подготовки (специальности)', '№ строки', 'Код классификатора', 'Код направления подготовки (специальности)', 'Численность студентов 7 курса всего (сумма гр. 49, 5153)', 'Численность студентов 7 курса из них лица с ОВЗ, инвалиды, детиинвалиды', 'В том числе (из гр.47) обучаются за счет бюджетных ассигнований федерального бюджета всего', 'В том числе (из гр.47) обучаются за счет бюджетных ассигнований федерального бюджета из них лица с ОВЗ, инвалиды, детиинвалиды', 'В том числе (из гр.47) обучаются за счет бюджетных ассигнований бюджета субъекта Российской Федерации из них лица с О

In [3]:
class MUST_FILL:
    pass

@attr.s
class VPO1DataFrame:
    region = attr.ib()
    funded_by = attr.ib()
    time_involvement = attr.ib()
    section = attr.ib()
    columns = attr.ib(factory=collections.OrderedDict)
    alt_colnames = attr.ib(factory=lambda: collections.defaultdict(set))
    rows = attr.ib(factory=lambda: collections.defaultdict(list))
    
    def update_meta(self, row: MetadataRow):
        setattr(self, row.field, row.value)
    def update_columns(self, header: DFHeader):
        for i, name in zip(header.col_numbers, header.col_names):
            self.columns[i] = name
            self.alt_colnames[i].add(name)
    def update_row(self, row: DFDataRow):
        key = (row.group, row.name)
        #if all(colnum == cell_colnum
        #           for colnum, (cell_colnum, cell_data)
        #           in itertools.zip_longest(
        #               collections.deque(self.columns.keys(), maxlen=len(row.cells)),
        #               row.cells)):
        #    self.rows[key].extend(map(lambda c: c[1], row.cells))
        #    return
        maxcol = max((colnum for colnum, data in row.cells))
        maxcol = max(maxcol, len(self.rows[key]) - 1)
        self.rows[key].extend((None for _ in range(1 + maxcol - len(self.rows[key]))))
        for colnum, data in row.cells:
            self.rows[key][colnum - 1] = data

def read_dataframes(sheets): # sheets are expected to be iters of rows
    dataframes = dict() # maps section into df
    errors = list()
    n_sheets = 0
    for sheet in sheets:
        n_sheets += 1
        try:
            new_events = read_sheet(Stream(sheet))
            events = collections.deque()
            events.append(next(new_events))
            df = None
            def wait_for_section(evt):
                if not (isinstance(evt, MetadataRow) and evt.field == 'section'):
                    events.append(evt)
                    return
                if evt.value not in dataframes:
                    dataframes[evt.value] = VPO1DataFrame(None, None, None, section=evt.value)
                df = dataframes[evt.value]
                actions[MetadataRow] = df.update_meta
                actions[DFHeader] = df.update_columns
                actions[DFDataRow] = df.update_row
            actions = collections.defaultdict(
                lambda: (lambda evt: None),
                {
                    MetadataRow: wait_for_section
                })
            while events:
                # TODO: prevent dead loop
                evt = events.popleft()
                actions[type(evt)](evt)
                try:
                    events.appendleft(next(new_events))
                except StopIteration:
                    pass
        except VPO1Error as e:
            errors.append(e)
    if len(errors) == n_sheets:
        raise VPO1Error('No parseable sheets. Errors: {}'.format(pprint.pformat(errors)))
    return dataframes

In [4]:
def workbook_to_dataframes(wb):
    if isinstance(wb, str):
        wb = xlrd.open_workbook(wb)
    sheets = (map(VPO1Row, map(unpack_row, s.get_rows()))
             for s in wb.sheets())
    return read_dataframes(sheets)

In [5]:
pprint.pprint(workbook_to_dataframes(example_path))

{('1.2', 'Сведения об образовательных программах, реализуемых организацией'): VPO1DataFrame(region='Воронежская область', funded_by='Государственные, Муниципальные', time_involvement='очная', section=('1.2', 'Сведения об образовательных программах, реализуемых организацией'), columns=OrderedDict([(1, 'Наименование образовательных программ'), (2, '№ строки'), (3, 'Число реализуемых образовательных программ -всего, единиц'), (4, 'из них прошли профессионально - общественную аккредитацию работодателями и их объединениями'), (5, 'Численность обучающихся - всего, человек'), (6, 'Сетевая форма реализации образовательных программ число программ (из графы 3), реализуемых с использованием сетевой формы'), (7, 'Сетевая форма реализации образовательных программ численность обучающихся (из графы 5) по программам, реализуемым с использованием сетевой формы - всего'), (8, 'Сетевая форма реализации образовательных программ в том числе (из графы 7) с использованием ресурсов иностранных организаций'), 



In [6]:
import pymongo
client = pymongo.MongoClient()
db = client['uni']

In [7]:
def workbook_to_mongo(workbook, vpo1_collection):
    for section, vpo1_df in workbook_to_dataframes(workbook).items():
        vpo1df_to_mongo(vpo1_df, vpo1_collection)

def vpo1df_to_mongo(vpo1_df, vpo1_collection, verbose=False):
    section = vpo1_df.section
    df = dict()
    id_fields = ['region', 'funded_by', 'time_involvement', 'section']
    for field in id_fields:
        df[field] = getattr(vpo1_df, field)
    df['section'] = vpo1_df.section[0]
    df['section_name'] = vpo1_df.section[1]
    data = list()
    row_keys = list(vpo1_df.rows.keys())
    for colnum in vpo1_df.columns.keys():
        colname = vpo1_df.columns[colnum]
        alt_names = vpo1_df.alt_colnames[colnum]
        records = list()
        for k in row_keys:
            group, rowname = k
            value = vpo1_df.rows[k]
            if colnum - 1 >= len(value):
                raise Exception('section={}, k={}, colnum={}, real_len={}, array={}'.format(
                    section, k, colnum, len(value), value))
            value = value[colnum - 1]
            records.append(dict(name=rowname, group=group, value=value))
        data.append(dict(colnum=colnum, name=colname, alt_names=list(alt_names), records=records))
    df['data'] = data
    query = {f: df[f] for f in id_fields}
    existing = vpo1_collection.find_one(query)
    if existing:
        vpo1_collection.update_one(
            dict(_id=existing['_id']),
            {'$set': df},
            upsert=False)
        if verbose:
            print(df['section'])
    else:
        _id = vpo1_collection.insert_one(df)
        if verbose:
            print(df['section'])

In [8]:
workbook_to_mongo(workbook, db.vpo1)



In [9]:
import os


class VPO1Set:
    @staticmethod
    def split_name_country(s):
        PAT_COUNTRY = re.compile(r'СВОД_ВПО1_(?P<funded_by>[А-Я]+)_(?P<time_involvement>[А-Яа-я\s-]+)\.xls')
        m = PAT_COUNTRY.match(s)
        if not m:
            return None
        return (s, dict(region='Russia',
                    funded_by=m.group('funded_by'),
                    time_involvement=m.group('time_involvement')))
    @staticmethod
    def split_name_region(s):
        PAT_REGION = re.compile(r'(?P<region>[а-яА-Я\s-]+)_(?P<funded_by>[А-Я]+)_(?P<time_involvement>[а-яА-Я\s-]+)\.xls')
        m = PAT_REGION.match(s)
        if not m:
            return None
        return (s, dict(m.groupdict()))
    def __init__(self, path, deep=False):
        self.files = []
        self.file_to_traits = []
        for root, dirs, files in os.walk(path):
            files = ((split(f) for split in [self.split_name_country, self.split_name_region])
                     for f in files)
            files = (itertools.islice((splitted for splitted in f if splitted is not None), 0, 1)
                     for f in files)
            files = itertools.chain.from_iterable(files)
            files = list(files)
            for filename, traits in files:
                self.files.append(os.path.join(root, filename))
                self.file_to_traits.append(traits)
        VPO1SET_TRAITS = ['region', 'funded_by', 'time_involvement']
        for field in VPO1SET_TRAITS:
            setattr(self, 'unique_{}'.format(field), list(set(map(lambda x: x[field], self.file_to_traits))))

    def __iter__(self):
        workbooks = (workbook_to_dataframes(f) for f in self.files)
        workbooks = (wb.values() for wb in workbooks)
        workbooks = itertools.chain.from_iterable(workbooks)
        return workbooks

In [13]:
%%time

for vpo1_df in VPO1Set('.'):
    vpo1df_to_mongo(vpo1_df, db.vpo1)



CPU times: user 1min 54s, sys: 1.24 s, total: 1min 55s
Wall time: 3min 3s


In [14]:
workbook_to_dataframes(workbook)



{('1.2',
  'Сведения об образовательных программах, реализуемых организацией'): VPO1DataFrame(region='Воронежская область', funded_by='Государственные, Муниципальные', time_involvement='очная', section=('1.2', 'Сведения об образовательных программах, реализуемых организацией'), columns=OrderedDict([(1, 'Наименование образовательных программ'), (2, '№ строки'), (3, 'Число реализуемых образовательных программ -всего, единиц'), (4, 'из них прошли профессионально - общественную аккредитацию работодателями и их объединениями'), (5, 'Численность обучающихся - всего, человек'), (6, 'Сетевая форма реализации образовательных программ число программ (из графы 3), реализуемых с использованием сетевой формы'), (7, 'Сетевая форма реализации образовательных программ численность обучающихся (из графы 5) по программам, реализуемым с использованием сетевой формы - всего'), (8, 'Сетевая форма реализации образовательных программ в том числе (из графы 7) с использованием ресурсов иностранных организаций')

In [15]:
set(itertools.islice((vpo1_df.section for vpo1_df in VPO1Set('.')), 1000))



{('1.2', 'Сведения об образовательных программах, реализуемых организацией'),
 ('2.1.2',
  'Распределение численности студентов по курсам, направлениям подготовки и специальностям'),
 ('2.1.3',
  'Распределение выпуска бакалавров, специалистов, магистров по направлениям подготовки и специальностям'),
 ('2.10', 'Результаты приема на обучение по программам магистратуры'),
 ('2.11',
  'Направление на работу выпускников, обучавшихся по очной форме обучения за счет бюджетных ассигнований'),
 ('2.12',
  'Распределение численности студентов, приема и выпуска по гражданству'),
 ('2.13',
  'Распределение численности студентов, приема и выпуска по возрасту и полу'),
 ('2.2', 'Движение численности студентов'),
 ('2.3',
  'Обучение в рамках квоты целевого приема и по договорам о целевом обучении'),
 ('2.4', 'Обучение по договорам об оказании платных образовательных услуг'),
 ('2.5', 'Обучение лиц с ограниченными возможностями здоровья и инвалидов'),
 ('2.6',
  'Численность студентов очной формы об