## Install latest dataflows framework

In [None]:
%%bash
python3 -m pip install dataflows==0.0.64 sqlalchemy==1.3.11 knesset-data==2.1.5 tableschema-sql==1.3.1

Restart the kernel

## Setup DB connection

Should connect to the redash_reader read-only user

In [None]:
import getpass

# postgresql://redash_reader:SECRET_PASSWORD@DB_HOST:5432/postgres

redash_reader_connection_string = getpass.getpass('redash_reader connection string:')

In [None]:
from sqlalchemy import create_engine

redash_reader_db_engine = create_engine(redash_reader_connection_string)

In [None]:
def stream_from_db(sql):
    conn = redash_reader_db_engine.connect()
    proxy = conn.execution_options(stream_results=True).execute(sql)
    while True:
        rows = proxy.fetchmany(100)
        if len(rows) == 0:
            break
        for row in rows:
            yield dict(zip(proxy.keys(), row))
    conn.close()

In [None]:
for row in stream_from_db('select * from people_committees_meeting_speaker_stats limit 5'):
    print(row)

## Generate the detailed package

output data is spread over the following directory structure:
    
```
./data/detailed_speaker_parts/Knesset{KnessetNum}/Committee{committee_name}/datapackage.json
```

In [None]:
%%bash
rm -rf data/detailed_speaker_parts

### Split sessions to groups

In [None]:
from collections import defaultdict
from dataflows import Flow, printer


SPEAKER_STATS_PART_GROUPS = {}
SESSIONS = {}


for session in stream_from_db("""
select "CommitteeSessionID", "KnessetNum", "committee_name", "StartDate" 
from committees_kns_committeesession 
where "KnessetNum" > 14 and "KnessetNum" < 22
and related_to_legislation = true
order by "KnessetNum", "committee_name", "StartDate"
"""):
    SESSIONS[session['CommitteeSessionID']] = session
    key = '{}-{}'.format(
        session['KnessetNum'],
        session['committee_name'],
    )
    subkey = '{}'.format(
        session['StartDate'].year,
#         session['StartDate'].month
    )
    if key not in SPEAKER_STATS_PART_GROUPS:
        SPEAKER_STATS_PART_GROUPS[key] = {}
    if subkey not in SPEAKER_STATS_PART_GROUPS[key]:
        SPEAKER_STATS_PART_GROUPS[key][subkey] = set()
    SPEAKER_STATS_PART_GROUPS[key][subkey].add(session['CommitteeSessionID'])

    
def _get_keys_num_session_ids():
    max_subkey_session_ids = 0
    for key, subkeys in SPEAKER_STATS_PART_GROUPS.items():
        num_session_ids = 0
        for subkey, session_ids in subkeys.items():
            num_session_ids += len(session_ids)
            if len(session_ids) > max_subkey_session_ids:
                max_subkey_session_ids = len(session_ids)
        yield {'key': key, 'num_session_ids': num_session_ids}
    print('max_subkey_session_ids = {}'.format(max_subkey_session_ids))
    assert max_subkey_session_ids < 500
    

Flow(
    _get_keys_num_session_ids(),
    printer(tablefmt='html', num_rows=1)
).process()[1]

In [None]:
from dataflows import Flow, load, printer, add_field, dump_to_path, sort_rows, update_resource, checkpoint
from collections import defaultdict
from fuzzywuzzy import fuzz
import traceback
import json
import os
import datetime
import traceback


LIMIT_ROWS_PER_PART_GROUP=None

# FILTER_PART_GROUPS=lambda session, stats: session['KnessetNum'] == 20 and stats['num_sessions'] <= 50
# FILTER_PART_GROUPS=None

# מס כנסת, תאריך, ועדה, הצח פרטית או ממשלתית (אם קיים), שם החוק (אם קיים), שם הדובר, שיוך הדובר, מספר הפעמים שקיבל זכות דיבור,
# אחוז מספר המילים שאמר מסך כל המילים שנאמרו על ידי סך כל הדוברים בפרוטוקול, אורך זכות הדיבור הארוכה ביותר שלו# # .


def _get_part_group_target_path(part_group_key):
    tmp = part_group_key.split('-')
    KnessetNum = tmp[0]
    committee_name = '-'.join(tmp[1:])
    return 'data/detailed_speaker_parts/Knesset{}/{}/'.format(KnessetNum, committee_name[:60])


def _dump_sessions_parts_to_path(part_group_key, parts):
    print('dumping part_group_key {} with {} parts'.format(part_group_key, len(parts)))
    # first_session = SESSIONS[parts[0]['CommitteeSessionID']]
    target_path = _get_part_group_target_path(part_group_key)
    parts_session_ids = set()
    for part in parts:
        session = SESSIONS[part['CommitteeSessionID']]
        if part['CommitteeSessionID'] not in parts_session_ids:
            parts_session_ids.add(part['CommitteeSessionID'])
            session['headers'] = {}
            session['headers_stats'] = {
                'num_words': 0,
                'num_parts': 0
            }
        header = part['header']
        if header is not None and len(header.strip()) > 2:
            if not session['headers'].get(header):
                session['headers'][header] = {
                    'header': header,
                    'Note': part['Note'],
                    'topics': part['topics'],
                    'KnessetNum': part['KnessetNum'],
                    'StartDate': part['StartDate'],
                    'committee_name': part['committee_name'],
                    'bill_types': part['bill_types'],
                    'bill_names': part['bill_names'],
                    'categories': set(),
                    'num_words': 0,
                    'num_parts': 0,
                    'longest_part_num_words': 0,
                    'name_roles': set()
                }
            header = session['headers'][header]
            if part['part_categories']:
                for part_category in part['part_categories'].split(','):
                    header['categories'].add(part_category)
            if part['name_role']:
                header['name_roles'].add(part['name_role'])
            header['num_parts'] += 1
            session['headers_stats']['num_parts'] += 1
            header['num_words'] += part['body_num_words']
            session['headers_stats']['num_words'] += part['body_num_words']
            if part['body_num_words'] > header['longest_part_num_words']:
                header['longest_part_num_words'] = part['body_num_words']
    num_headers = 0
    for session_id in parts_session_ids:
        session = SESSIONS[session_id]
        for header in session['headers'].values():
            num_headers += 1
            header['categories'] = [cat for cat in header['categories'] if cat]
            header['name_roles'] = list(header['name_roles'])
            header['session_num_words'] = session['headers_stats']['num_words']
            header['session_num_parts'] = session['headers_stats']['num_parts']
            try:
                header['percent_num_words'] = round(header['num_words'] / header['session_num_words'] * 100, 2)
            except Exception:
                traceback.print_exc()
                print('error getting percent_num_words for session {} (num_words = {}, session_num_words = {})'.format(session_id, header['num_words'], header['session_num_words']))
                header['percent_num_words'] = 0
    print('Saving {} parts, {} headers to {}'.format(len(parts), num_headers, target_path))
    
    def _header_iterator():        
        for session_id in parts_session_ids:
            session = SESSIONS[session_id]
            if len(session['headers']) > 0:
                for header in session['headers'].values():
                    if (
                        (
                            len(header['categories']) == 0 
                            and header['num_parts'] == 1 
                            and header['percent_num_words'] < 0.5
                        ) or header['header'].strip() in [
                            'מוזמנים',
                            'סדר היום',
                            'מנהלת הועדה',
                            'מנהלת הוועדה'   
                        ]
                    ):
                        pass
                    else:
                        yield header

    Flow(
        iter(parts),
        _header_iterator(),
        update_resource('res_1', name='parts', path='parts.csv'),
        update_resource('res_2', name='headers', path='headers.csv'),
        dump_to_path(target_path)
    ).process()    
    for session_id in parts_session_ids:
        SESSIONS[session_id]['headers'] = None
        SESSIONS[session_id]['headers_stats'] = None


def process_part_group_subkeys(part_group_key, part_group_subkeys):
    print('processing part_group {} with {} subkeys'.format(part_group_key, len(part_group_subkeys)))
    all_rows = []
    for i, session_ids in enumerate(part_group_subkeys.values()):
        print('processing subkey {} with {} sessions'.format(i, len(session_ids)))
        session_ids_sql = ','.join([str(session_id) for session_id in session_ids])
        for row in stream_from_db("""
            select 
                sessions."CommitteeSessionID",
                sessions."KnessetNum",
                sessions.committee_name,
                parts."part_index",
                parts.header,
                parts.body_num_words,
                parts.body_num_words / parts_sum_words.sum_words * 100 percent_num_words,
                parts.part_categories,
                parts.name_role,
                sessions."StartDate",
                sessions."Note",
                sessions.topics,
                sessions.bill_names,
                sessions.bill_types
            from 
                people_committees_meeting_speaker_stats parts, 
                committees_kns_committeesession sessions,
                (
                    select
                        parts."CommitteeSessionID",
                        sum(parts."body_num_words") sum_words
                    from
                        people_committees_meeting_speaker_stats parts
                    where "CommitteeSessionID" in ({})
                    group by parts."CommitteeSessionID"
                ) parts_sum_words
            where 
                parts."CommitteeSessionID" in ({})
                and parts."CommitteeSessionID" = sessions."CommitteeSessionID"
                and parts_sum_words."CommitteeSessionID" = sessions."CommitteeSessionID"
            order by sessions."KnessetNum", sessions.committee_name, sessions."StartDate", parts.part_index
        """.format(session_ids_sql, session_ids_sql)):
            all_rows.append(row)
            stats['num_part_rows'] += 1
        if LIMIT_ROWS_PER_PART_GROUP and len(all_rows) > LIMIT_ROWS_PER_PART_GROUP:
            break
    print('loaded {}/{} part rows'.format(len(all_rows), stats['num_part_rows']))
    _dump_sessions_parts_to_path(part_group_key, all_rows)
        

def _is_part_group_updated(part_group_key):
    datapackage_json_path = '{}datapackage.json'.format(_get_part_group_target_path(part_group_key))
    if os.path.exists(datapackage_json_path):
        with open(datapackage_json_path) as f:
            datapackage = json.load(f)
            if datapackage.get('updated-datetime'):
                # datetime.datetime.strptime(datapackage['updated-datetime'], '%Y-%m-%d') - datetime.timedelta(days=5)
                return True
    return False
    

def _set_part_group_updated(part_group_key):
    datapackage_json_path = '{}datapackage.json'.format(_get_part_group_target_path(part_group_key))
    if os.path.exists(datapackage_json_path):
        with open(datapackage_json_path) as f:
            datapackage = json.load(f)
        datapackage['updated-datetime'] = datetime.datetime.now().strftime('%Y-%m-%d')
        with open(datapackage_json_path, 'w') as f:
            json.dump(datapackage, f)
    

def _part_groups_iterator(stats):
    for part_group_key, part_group_subkeys in SPEAKER_STATS_PART_GROUPS.items():
        stats['num_part_groups'] += 1
        if not _is_part_group_updated(part_group_key):
            yield part_group_key, part_group_subkeys

    
stats = defaultdict(int)        

for part_group_key, part_group_subkeys in _part_groups_iterator(stats):
    try:
        process_part_group_subkeys(part_group_key, part_group_subkeys)
        _set_part_group_updated(part_group_key)
    except Exception:
        traceback.print_exc()
        
print('Great Success!')
print(dict(stats))

In [None]:
from dataflows import duplicate, add_field, delete_fields, update_resource
import os

def _process_speakers(rows):
    if rows.res.name == 'speakers':
        speakers = {}
        for header in rows:
            if header['header'] and len(header['header']) > 2:
                if header['header'].strip() in speakers:
                    speaker = speakers[header['header']]
                else:
                    speaker = speakers[header['header']] = {
                        **{k: header[k] for k in ['Note', 'topics', 'KnessetNum', 'committee_name']},
                        'num_meetings': 0,
                        'header': header['header'],
                        'bill_types': set(),
                        'bill_names': set(),
                        'categories': set(),
                        'name_roles': set(),
                        **{k: 0 for k in ['num_words', 'num_parts', 'longest_part_num_words']}
                    }
                speaker['num_meetings'] += 1
                for bill_type in header['bill_types']:
                    speaker['bill_types'].add(bill_type)
                for bill_name in header['bill_names']:
                    speaker['bill_names'].add(bill_name)
                for category in header['categories']:
                    speaker['categories'].add(category)
                for name_role in header['name_roles']:
                    speaker['name_roles'].add(name_role)
                speaker['num_words'] += header['num_words']
                speaker['num_parts'] += header['num_parts']
                if speaker['longest_part_num_words'] == 0 or speaker['longest_part_num_words'] < header['longest_part_num_words']:
                    speaker['longest_part_num_words'] = header['longest_part_num_words']
        for speaker in speakers.values():
            speaker.update(bill_types=list(speaker['bill_types']),
                           bill_names=list(speaker['bill_names']),
                           categories=list(speaker['categories']),
                           name_roles=list(speaker['name_roles']),
                           )
            yield speaker
    else:
        yield from rows

        
def add_sessions(part_group_key):
    tmp = part_group_key.split('-')
    KnessetNum = tmp[0]
    committee_name = '-'.join(tmp[1:]).replace("'", "''")
    yield from stream_from_db("""
        select * from committees_kns_committeesession
        where "KnessetNum" = {}
        and committee_name = '{}'
    """.format(KnessetNum, committee_name))
    
        
stats = defaultdict(int)

for part_group_key, part_group_subkeys in SPEAKER_STATS_PART_GROUPS.items():
    stats['num_part_groups'] += 1
    datapackage_json_path = '{}datapackage.json'.format(_get_part_group_target_path(part_group_key))
    if os.path.exists(datapackage_json_path) and _is_part_group_updated(part_group_key):
        print('processing part_group_key {}'.format(part_group_key))
        with open(datapackage_json_path) as f:
            datapackage = json.load(f)
        if datapackage['count_of_rows'] > 0:
            Flow(
                load(datapackage_json_path, resources=['headers', 'parts']),
                duplicate('headers', 'speakers', 'speakers.csv'),
                add_field('num_meetings', resources='speakers', type='number'),
                _process_speakers,
                delete_fields([
                    'Note', 'topics', 'StartDate', 'session_num_words', 'session_num_parts', 'percent_num_words',
                    'bill_types', 'bill_names'
                ], resources='speakers'),
                sort_rows('{num_words:09}', reverse=True, resources='speakers'),
                add_sessions(part_group_key),
                update_resource('res_4', name='sessions', path='sessions.csv'),
                # printer(tablefmt='html', num_rows=1),
                dump_to_path(_get_part_group_target_path(part_group_key))
            ).process()
        _set_part_group_updated(part_group_key)
        
print('Great Success!')
print(dict(stats))

## Create zip archive

In [None]:
%%bash
apk add zip
rm -f ./data/detailed_speaker_parts.zip
zip -r ./data/detailed_speaker_parts.zip ./data/detailed_speaker_parts