## imports, constants, basic functions

In [1]:
from os import environ
import datetime, re
from functools import partial
from dataflows import (Flow, checkpoint, load, printer, 
                       update_resource, join, filter_rows, dump_to_path, set_type)
from datapackage_pipelines_knesset.common_flow import load_knesset_data as _load_knesset_data
from bill_dates import init_knessetdates, load_knessetdate_row, calc_knessetdates_range, to_date


BILL_ID_COLUMN = 'מזהה ייחודי של הצעות חוק'
USE_DATA = False


if USE_DATA:
    environ['KNESSET_PIPELINES_DATA_PATH'] = '../knesset-data-pipelines/data'
    load_knesset_data = partial(_load_knesset_data, use_data=True)
else:
    load_knesset_data = _load_knesset_data


def set_dates_any(package):
    for resource in package.pkg.descriptor['resources']:
        for field in resource['schema']['fields']:
            if field['type'] == 'datetime' or field['type'] == 'date':
                if field['format'].startswith('%'):
                    field['format'] = 'any'
    yield package.pkg
    yield from package
    

## Load source data

In [None]:
SOURCE_DATA_CHECKPOINT_NAME = 'hesderim_bill_splits_source_data'

!{'rm -rf .checkpoints/%s' % SOURCE_DATA_CHECKPOINT_NAME}

Flow(
    load('טבלת חוקי הסדרים - 10.10.18 (1).csv', name='hesderim_bill_ids'),
    load_knesset_data('knesset/kns_knessetdates/datapackage.json', resources=['kns_knessetdates']),
    load_knesset_data('bills/kns_bill/datapackage.json', resources=['kns_bill']),
    update_resource('kns_bill', name='kns_bill_0', path='kns_bill_0.csv'),
    load_knesset_data('bills/kns_billsplit/datapackage.json', resources=['kns_billsplit']),
    load_knesset_data('knesset/kns_status/datapackage.json', resources=['kns_status']),
    load_knesset_data('committees/kns_committeesession/datapackage.json', resources=['kns_committeesession']),
    load_knesset_data('committees/kns_cmtsessionitem/datapackage.json', resources=['kns_cmtsessionitem']),
    filter_rows(equals=[{'ItemTypeID': 2}], resources=['kns_cmtsessionitem']),
    load_knesset_data('plenum/kns_plenumsession/datapackage.json', resources=['kns_plenumsession']),
    load_knesset_data('plenum/kns_plmsessionitem/datapackage.json', resources=['kns_plmsessionitem']),
    filter_rows(equals=[{'ItemTypeID': 2}], resources=['kns_plmsessionitem']),
    load_knesset_data('bills/kns_bill/datapackage.json', resources=['kns_bill']),
    set_dates_any,
    checkpoint(SOURCE_DATA_CHECKPOINT_NAME),
).process()[1]

In [None]:
from dataflows import printer
Flow(checkpoint('hesderim_bill_splits_source_data'), printer(tablefmt='html', num_rows=1)).process()[1]

## Join Hesderim data

In [None]:
def hesderim_bill_ids():
    
    def step(rows):
        if rows.res.name == 'hesderim_bill_ids':
            yield from ({'hesderim_bill_id': int(row[BILL_ID_COLUMN])}
                        for row in rows)
        else:
            yield from rows
    
    return Flow(
        step,
        update_resource('hesderim_bill_ids', schema={'fields': [{'name': 'hesderim_bill_id', 'type': 'integer'}]})
    )
    
    
def hesderim_bill_splits():
    hesderim_bill_ids = set()
    
    def step(rows):
        if rows.res.name == 'hesderim_bill_ids':
            for row in rows:
                hesderim_bill_ids.add(row['hesderim_bill_id'])
                yield row
        elif rows.res.name == 'kns_billsplit':
            for row in rows:
                if row['MainBillID'] in hesderim_bill_ids:
                    yield {'hesderim_bill_id': int(row['MainBillID']),
                           'bill_id': int(row['SplitBillID'])}
        else:
            yield from rows
    
    return Flow(
        step,
        update_resource('kns_billsplit', name='hesderim_bill_splits', path='hesderim_bill_splits.csv',
                        schema={'fields': [{'name': 'hesderim_bill_id', 'type': 'integer'},
                                           {'name': 'bill_id', 'type': 'integer'}]})
    )


def hesderim_bills_committee_sessions():    
    return Flow(
        join(source_name='kns_committeesession', source_key=['CommitteeSessionID'], 
             target_name='kns_cmtsessionitem', target_key=['CommitteeSessionID'], 
             fields={'KnessetNum': {}, 'StartDate': {}, 'CommitteeID': {}}),
        join(source_name='kns_status', source_key=['StatusID'], source_delete=False,
             target_name='kns_cmtsessionitem', target_key=['StatusID'],
             fields={'StatusDesc': {'name': 'Desc'}}),
        join(source_name='hesderim_bill_splits', source_key=['bill_id'], source_delete=False,
             target_name='kns_cmtsessionitem', target_key=['ItemID'],
             fields={'hesderim_bill_id': {}}),
        filter_rows(not_equals=[{'hesderim_bill_id': None}], resources=['kns_cmtsessionitem']),
    )


def hesderim_bills_plenum_sessions():
    return Flow(
        join(source_name='kns_plenumsession', source_key=['PlenumSessionID'], 
             target_name='kns_plmsessionitem', target_key=['PlenumSessionID'], 
             fields={'KnessetNum': {}, 'StartDate': {}}),
        join(source_name='kns_status', source_key=['StatusID'], source_delete=False,
             target_name='kns_plmsessionitem', target_key=['StatusID'],
             fields={'StatusDesc': {'name': 'Desc'}}),
        join(source_name='hesderim_bill_splits', source_key=['bill_id'], source_delete=False,
             target_name='kns_plmsessionitem', target_key=['ItemID'],
             fields={'hesderim_bill_id': {}}),
        filter_rows(not_equals=[{'hesderim_bill_id': None}], resources=['kns_plmsessionitem']),
    )


def join_hesderim_kns_bill():
    return Flow(
        join(source_name='hesderim_bill_splits', source_key=['bill_id'], source_delete=False,
             target_name='kns_bill', target_key=['BillID'],
             fields={'hesderim_bill_id': {}}),
        filter_rows(not_equals=[{'hesderim_bill_id': None}], resources=['kns_bill']),
    )

JOIN_HESDERIM_DATA_CHECKPOINT_NAME = 'hesderim_bill_splits_join_hesderim_data'

!{'rm -rf .checkpoints/%s' % JOIN_HESDERIM_DATA_CHECKPOINT_NAME}

Flow(
    checkpoint(SOURCE_DATA_CHECKPOINT_NAME),
    hesderim_bill_ids(),
    hesderim_bill_splits(),
    hesderim_bills_committee_sessions(),
    hesderim_bills_plenum_sessions(),
    join_hesderim_kns_bill(),
    checkpoint(JOIN_HESDERIM_DATA_CHECKPOINT_NAME)
).process()[1]


In [10]:
Flow(checkpoint(JOIN_HESDERIM_DATA_CHECKPOINT_NAME), printer(tablefmt='html', num_rows=1)).process()[1]

using checkpoint data from .checkpoints/hesderim_bill_splits_join_hesderim_data


#,hesderim_bill_id (integer)
1,2007975.0
2,2007976.0
...,
36,150136.0


#,KnessetDateID (integer),KnessetNum (integer),Name (string),Assembly (integer),Plenum (integer),PlenumStart (datetime),PlenumFinish (datetime),IsCurrent (boolean),LastUpdatedDate (datetime)
1,1.0,13.0,השלוש-עשרה,1.0,2.0,1992-10-26 00:00:00+00:00,1993-03-24 00:00:00+00:00,False,2016-02-24 14:56:01+00:00
2,2.0,13.0,השלוש-עשרה,2.0,2.0,1993-05-03 00:00:00+00:00,1993-08-04 00:00:00+00:00,False,2016-02-24 14:56:16+00:00
...,,,,,,,,,
150,151.0,20.0,העשרים,1.0,5.0,2018-10-14 00:00:00+00:00,,True,2018-10-15 12:06:35+00:00


#,BillID (integer),KnessetNum (integer),Name (string),SubTypeID (integer),SubTypeDesc (string),PrivateNumber (integer),CommitteeID (integer),StatusID (integer),Number (integer),PostponementReasonID (integer),PostponementReasonDesc (string),PublicationDate (datetime),MagazineNumber (integer),PageNumber (integer),IsContinuationBill (boolean),SummaryLaw (string),PublicationSeriesID (integer),PublicationSeriesDesc (string),PublicationSeriesFirstCall (string),LastUpdatedDate (datetime)
1,426521.0,18.0,"הצעת חוק התכנון והבניה (תיקון מס' 100) (חובת התקנת מערכת סולרית), התשע""ב-2012",54.0,פרטית,3840.0,656.0,113.0,471.0,,,,,,,,,,"הצ""ח הכנסת (מתשס""ג 10/2002) - 471 ,מיום 11:00:00",2013-05-23 18:37:56+00:00
2,426868.0,18.0,"הצעת חוק העזרה הרפואית הבסיסית, התשע""ב-2012",54.0,פרטית,4046.0,,104.0,,,,,,,,,,,,2013-05-23 18:37:56+00:00
...,,,,,,,,,,,,,,,,,,,,
42351,2070403.0,20.0,"הצעת חוק התכנון והבנייה (תיקון - איחוד וחלוקה בתכנית לצורכי ציבור), התשע""ח-2018",54.0,פרטית,5565.0,,104.0,,,,,,,,,,,,2018-11-06 18:05:51+00:00


#,hesderim_bill_id (integer),bill_id (integer)
1,325830.0,326098.0
2,484923.0,485362.0
...,,
234,2007976.0,2009482.0


#,StatusID (integer),Desc (string),TypeID (integer),TypeDesc (string),OrderTransition (integer),IsActive (boolean),LastUpdatedDate (datetime)
1,6.0,בטיפול המשרד הנשאל,1.0,שאילתה,,,2013-10-06 14:08:19+00:00
2,8.0,השר סירב לענות,1.0,שאילתה,,,2013-10-06 14:08:19+00:00
...,,,,,,,
79,6040.0,פורסם ברשומות,6003.0,חוק אב,,True,2016-05-09 15:30:56+00:00


#,CmtSessionItemID (integer),ItemID (integer),CommitteeSessionID (integer),Ordinal (integer),StatusID (integer),Name (string),ItemTypeID (integer),LastUpdatedDate (datetime),CommitteeID (integer),KnessetNum (integer),StartDate (datetime),StatusDesc (string),hesderim_bill_id (integer)
1,45722.0,88727.0,65530.0,1.0,113.0,"סעיף 96 להצעת חוק המדיניות הכלכלית לשנת הכספים 2004 (תיקוני חקיקה), התשס""ד-2003 - תיקון פקודת המכרות",2.0,2013-05-23 18:38:01+00:00,23.0,16.0,2003-11-19 09:00:00+00:00,בהכנה לקריאה שנייה-שלישית בוועדה,16633.0
2,45725.0,88727.0,65853.0,1.0,113.0,"סעיף 96 להצעת חוק המדיניות הכלכלית לשנת הכספים 2004 (תיקוני חקיקה), התשס""ד-2003 - תיקון פקודת המכרות",2.0,2013-05-23 18:38:01+00:00,23.0,16.0,2003-12-22 10:00:00+00:00,בהכנה לקריאה שנייה-שלישית בוועדה,16633.0
...,,,,,,,,,,,,,
754,849306.0,569078.0,569296.0,1.0,113.0,"חוק לתיקון פקודת בריאות העם (מס' 29), התשע""ו-2015",2.0,2018-10-07 09:19:44+00:00,928.0,20.0,2015-10-22 12:30:00+00:00,בהכנה לקריאה שנייה-שלישית בוועדה,568878.0


#,plmPlenumSessionID (integer),ItemID (integer),PlenumSessionID (integer),ItemTypeID (integer),ItemTypeDesc (string),Ordinal (integer),Name (string),StatusID (integer),IsDiscussion (integer),LastUpdatedDate (datetime),KnessetNum (integer),StartDate (datetime),StatusDesc (string),hesderim_bill_id (integer)
1,48354.0,88726.0,16556.0,2.0,הצעת חוק,4.0,"הצעת חוק המדיניות הכלכלית לשנת הכספים 2004 (תיקוני חקיקה), התשס""ד-2003 (מיועדת רק לצורך חלוקת הצעת החוק)",111.0,1.0,2013-05-23 18:38:01+00:00,16.0,1899-12-30 10:00:00+00:00,לדיון במליאה לקראת הקריאה הראשונה,16633.0
2,48355.0,88727.0,16556.0,2.0,הצעת חוק,5.0,"סעיף 96 להצעת חוק המדיניות הכלכלית לשנת הכספים 2004 (תיקוני חקיקה), התשס""ד-2003 - תיקון פקודת המכרות",111.0,1.0,2013-05-23 18:38:01+00:00,16.0,1899-12-30 10:00:00+00:00,לדיון במליאה לקראת הקריאה הראשונה,16633.0
...,,,,,,,,,,,,,,
102,861672.0,569078.0,572668.0,2.0,הצעת חוק,22.0,"חוק לתיקון פקודת בריאות העם (מס' 29), התשע""ו-2015",117.0,1.0,2018-10-07 09:19:44+00:00,20.0,2015-12-14 16:00:00+00:00,לדיון במליאה לקראת קריאה שלישית,568878.0


#,BillID (integer),KnessetNum (integer),Name (string),SubTypeID (integer),SubTypeDesc (string),PrivateNumber (integer),CommitteeID (integer),StatusID (integer),Number (integer),PostponementReasonID (integer),PostponementReasonDesc (string),PublicationDate (datetime),MagazineNumber (integer),PageNumber (integer),IsContinuationBill (boolean),SummaryLaw (string),PublicationSeriesID (integer),PublicationSeriesDesc (string),PublicationSeriesFirstCall (string),LastUpdatedDate (datetime),hesderim_bill_id (integer)
1,474507.0,18.0,"סעיף 2 (חוק מיסוי מקרקעין - הגדלת ההיצע של דירות מגורים - הוראת שעה) בהצעת חוק לצמצום הגירעון ולהתמודדות עם השפעות המשבר בכלכלה העולמית (תיקוני חקיקה), התשע""ב-2012",53.0,ממשלתית,,653.0,113.0,723.0,,,,,,,,,,,2013-05-23 18:38:01+00:00,474392.0
2,88726.0,16.0,"הצעת חוק המדיניות הכלכלית לשנת הכספים 2004 (תיקוני חקיקה), התשס""ד-2003 (מיועדת רק לצורך חלוקת הצעת החוק)",53.0,ממשלתית,,23.0,113.0,64.0,,,,,,,,,,,2013-05-23 18:38:01+00:00,16633.0
...,,,,,,,,,,,,,,,,,,,,,
234,569078.0,20.0,"חוק לתיקון פקודת בריאות העם (מס' 29), התשע""ו-2015",53.0,ממשלתית,,928.0,118.0,951.0,,,2015-12-23 00:00:00+00:00,2516.0,303.0,,"שר הבריאות, בהסכמת שר האוצר ובאישור ועדת העבודה, הרווחה והבריאות של הכנסת, יקבע בתקנות כללים שיחולו על רופא, בין שכיר ובין עצמאי, שייעץ לאדם או טיפל בו במסגרת שירות מרפאה ציבורי או שירות מרפאה קהילתי, לעניין איסור המשך טיפול באותו אדם או ייעוץ לו שלא במימון ציבורי, במשך תקופה של בין ארבעה לשמונה חודשים, כפי שייקבע בתקנות. השר רשאי לקבוע בתקנות הקלות או הגבלות במקום האיסור האמור לעיל, אם מצא שניתן להסתפק בכך לשם מניעת ניגוד עניינים בין עבודתו של הרופא במסגרת שירות מרפאה ציבורי או שירות מרפאה קהילתי ובין עיסוקו כרופא שלא במימון ציבורי. שר הבריאות ידווח לוועדה פעמיים בשנה על יישומו של ההסדר המוצע ועל השלכותיו במשך שלוש שנים מיום חקיקתו. תקנות אלו יכול שיקבעו תחולה הדרגתית, לרבות בהתחשב במיקום גיאוגרפי, ובלבד שעד תום שלוש שנים מיום תחילתו של חוק זה יחולו התקנות באופן מלא על כלל הרופאים. שר הבריאות ושא האוצר רשאים להאריך, בצו, את התקופה האמורה בשנה אחת.",6071.0,ספר החוקים,,2018-10-07 09:19:44+00:00,568878.0


{}

## Process Sessions

In [25]:
from functools import lru_cache
from datapackage import Package

def get_plenum_session_start_date(plenum_session):
    start_date = plenum_session['StartDate'].date()
    if start_date < datetime.date(1947, 1, 1):
        m = re.findall('([0-9]+)/([0-9]+)/([0-9]+)', plenum_session['Name'])
        assert m, 'failed to find date for session {}'.format(plenum_session)
        assert len(m) == 1
        start_date = datetime.date(*map(int, reversed(m[0])))
    return start_date

@lru_cache()
def get_all_joint_committee_ids():
    joint_committee_ids = {}
    def load_joint_committees(row):
        joint_committee_ids.setdefault(row['CommitteeID'], set([row['CommitteeID']])).add(row['ParticipantCommitteeID'])
        joint_committee_ids.setdefault(row['ParticipantCommitteeID'], set([row['ParticipantCommitteeID']])).add(row['CommitteeID'])
        pass
    Flow(
        load_knesset_data('committees/kns_jointcommittee/datapackage.json', resources=['kns_jointcommittee']),
        checkpoint('committees_jointcommittee.2'),
        load_joint_committees
    ).process()
    return joint_committee_ids

def get_committee_related_ids(committee_id):
    return get_all_joint_committee_ids().get(committee_id, set([committee_id]))

def is_relevant_session_for_bill_committee(session_committee_id, bill_committee_id):
    return session_committee_id in get_committee_related_ids(bill_committee_id)

def process_sessions(session_type):
    input_resource_name = {'committee': 'kns_cmtsessionitem',
                           'plenum': 'kns_plmsessionitem'}[session_type]
    output_resource_name = f'bill_{session_type}_sessions'
    first_session_field = f'{session_type}_first_session'
    last_session_field = f'{session_type}_last_session'
    num_sessions_field = f'{session_type}_num_sessions'
    bill_sessions = {}
    bill_hesderim_bill_id = {}
    
    if session_type == 'committee':
        bill_relevant_committee = {}
    elif session_type == 'plenum':
        plenum_session_dates = {}
        
    def _process_bills(rows):
        for row in rows:
            if session_type == 'committee':
                bill_relevant_committee[row['BillID']] = row['CommitteeID']
            yield row
    
    def _process_plenum_session_dates(rows):
        for row in rows:
            plenum_session_dates[row['PlenumSessionID']] = get_plenum_session_start_date(row)
            yield row
    
    def _process_sessions(rows):
        for row in rows:
            bill_sessions.setdefault(row['ItemID'], []).append(row)
            bill_hesderim_bill_id[row['ItemID']] = row['hesderim_bill_id']
            yield row
    
    def _get_bill_sessions():
        for bill_id, sessions in bill_sessions.items():
            sessions = list(sorted(sessions, key=lambda row: row['StartDate']))
            if session_type == 'plenum':
                sessions = [dict(row, StartDate=plenum_session_dates[row['PlenumSessionID']])
                            for row in sessions]
            row = {'bill_id': bill_id,
                   'hesderim_bill_id': bill_hesderim_bill_id[bill_id],
                   first_session_field: to_date(sessions[0]['StartDate']),
                   last_session_field: to_date(sessions[-1]['StartDate']),
                   num_sessions_field: len(sessions)}
            if session_type == 'committee':
                relevant_sessions = [s for s in sessions if is_relevant_session_for_bill_committee(s['CommitteeID'], bill_relevant_committee[bill_id])]
                if len(relevant_sessions) > 0:
                    row.update(relevant_committee_first_session=to_date(relevant_sessions[0]['StartDate']),
                               relevant_committee_last_session=to_date(relevant_sessions[-1]['StartDate']),
                               relevant_committee_num_sessions=len(relevant_sessions))
                else:
                    row.update(relevant_committee_first_session=None,
                               relevant_committee_last_session=None,
                               relevant_committee_num_sessions=0)
            yield row
    
    def step(package):
        package.pkg.add_resource({'name': output_resource_name,
                                  'path': f'{output_resource_name}_sessions.csv',
                                  'schema': {'fields': [
                                      {'name': 'bill_id', 'type': 'integer'},
                                      {'name': 'hesderim_bill_id', 'type': 'integer'},
                                      {'name': first_session_field, 'type': 'date'},
                                      {'name': last_session_field, 'type': 'date'},
                                      {'name': num_sessions_field, 'type': 'integer'},                                      
                                  ] + ([{'name': 'relevant_committee_first_session', 'type': 'date'},
                                        {'name': 'relevant_committee_last_session', 'type': 'date'},
                                        {'name': 'relevant_committee_num_sessions', 'type': 'integer'},] if session_type == 'committee' else [])}})
        yield package.pkg
        for rows in package:
            if rows.res.name == 'kns_bill_0':
                yield _process_bills(rows)
            elif session_type == 'plenum' and rows.res.name == 'kns_plenumsession':
                yield _process_plenum_session_dates(rows)
            elif rows.res.name == input_resource_name:
                yield _process_sessions(rows)
            else:
                yield rows
        yield _get_bill_sessions()

    return step

PROCESS_SESSIONS_CHECKPOINT_NAME = 'hesderim_bill_splits_process_sessions_data'

!{'rm -rf .checkpoints/%s' % PROCESS_SESSIONS_CHECKPOINT_NAME}
!{'rm -rf .checkpoints/committees_jointcommittee.2'}

Flow(
    checkpoint(JOIN_HESDERIM_DATA_CHECKPOINT_NAME),
    load_knesset_data('plenum/kns_plenumsession/datapackage.json', resources=['kns_plenumsession']),
    load_knesset_data('committees/kns_committee/datapackage.json'),
    set_dates_any,
    process_sessions('committee'),
    process_sessions('plenum'),
    checkpoint(PROCESS_SESSIONS_CHECKPOINT_NAME)
).process()[1]

loading from url: https://storage.googleapis.com/knesset-data-pipelines/data/plenum/kns_plenumsession/datapackage.json
loading from url: https://storage.googleapis.com/knesset-data-pipelines/data/committees/kns_committee/datapackage.json
saving checkpoint to: .checkpoints/hesderim_bill_splits_process_sessions_data
using checkpoint data from .checkpoints/hesderim_bill_splits_join_hesderim_data


DEBUG   :Starting new HTTPS connection (1): storage.googleapis.com:443
DEBUG   :https://storage.googleapis.com:443 "GET /knesset-data-pipelines/data/plenum/kns_plenumsession/datapackage.json HTTP/1.1" 200 1961
DEBUG   :Starting new HTTPS connection (1): storage.googleapis.com:443
DEBUG   :https://storage.googleapis.com:443 "GET /knesset-data-pipelines/data/plenum/kns_plenumsession/datapackage.json HTTP/1.1" 200 1961
DEBUG   :Starting new HTTPS connection (1): storage.googleapis.com:443
DEBUG   :https://storage.googleapis.com:443 "GET /knesset-data-pipelines/data/committees/kns_committee/datapackage.json HTTP/1.1" 200 3820
DEBUG   :Starting new HTTPS connection (1): storage.googleapis.com:443
DEBUG   :https://storage.googleapis.com:443 "GET /knesset-data-pipelines/data/committees/kns_committee/datapackage.json HTTP/1.1" 200 3820
DEBUG   :Starting new HTTPS connection (1): storage.googleapis.com:443
DEBUG   :https://storage.googleapis.com:443 "GET /knesset-data-pipelines/data/plenum/kns_

loading from url: https://storage.googleapis.com/knesset-data-pipelines/data/committees/kns_jointcommittee/datapackage.json
saving checkpoint to: .checkpoints/committees_jointcommittee.2


DEBUG   :https://storage.googleapis.com:443 "GET /knesset-data-pipelines/data/committees/kns_jointcommittee/datapackage.json HTTP/1.1" 200 1281
DEBUG   :Starting new HTTPS connection (1): storage.googleapis.com:443
DEBUG   :https://storage.googleapis.com:443 "GET /knesset-data-pipelines/data/committees/kns_jointcommittee/datapackage.json HTTP/1.1" 200 1281
DEBUG   :Starting new HTTPS connection (1): storage.googleapis.com:443
DEBUG   :https://storage.googleapis.com:443 "GET /knesset-data-pipelines/data/committees/kns_jointcommittee/kns_jointcommittee.csv HTTP/1.1" 200 10835
DEBUG   :Starting new HTTPS connection (2): storage.googleapis.com:443
DEBUG   :https://storage.googleapis.com:443 "GET /knesset-data-pipelines/data/committees/kns_jointcommittee/kns_jointcommittee.csv HTTP/1.1" 200 10835


checkpoint saved: committees_jointcommittee.2
checkpoint saved: hesderim_bill_splits_process_sessions_data


{'count_of_rows': 52980,
 'bytes': 10990475,
 'hash': 'bea59f4e30f60d1608a1bba1663d8eaf',
 'dataset_name': None}

In [26]:
Flow(checkpoint(PROCESS_SESSIONS_CHECKPOINT_NAME), 
     printer(tablefmt='html', num_rows=1, resources='bill_.*_sessions')).process()[1]

using checkpoint data from .checkpoints/hesderim_bill_splits_process_sessions_data


#,bill_id (integer),hesderim_bill_id (integer),committee_first_session (date),committee_last_session (date),committee_num_sessions (integer),relevant_committee_first_session (date),relevant_committee_last_session (date),relevant_committee_num_sessions (integer)
1,88727.0,16633.0,2003-11-19,2003-12-22,2.0,2003-11-19,2003-12-22,2.0
2,88732.0,16633.0,2003-11-25,2003-12-23,2.0,2003-11-25,2003-12-23,2.0
...,,,,,,,,
220,569078.0,568878.0,2015-10-22,2015-10-22,1.0,2015-10-22,2015-10-22,1.0


#,bill_id (integer),hesderim_bill_id (integer),plenum_first_session (date),plenum_last_session (date),plenum_num_sessions (integer)
1,88726.0,16633.0,2003-11-05,2003-11-05,1.0
2,88727.0,16633.0,2003-11-05,2003-11-05,1.0
...,,,,,
52,569078.0,568878.0,2015-11-23,2015-12-14,3.0


{}

## main flow

In [27]:
def process_session_date_ranges():
    knessetdates = init_knessetdates()
    
    def process_kns_knessetdates(rows):
        for row in rows:
            yield row
            load_knessetdate_row(knessetdates, row)
        
    def process_bill_sessions(rows):
        for row in rows:
            row['plenum_days_with_pagra'], row['plenum_days_without_pagra'] = calc_knessetdates_range(row, 'plenum_first_session', 'plenum_last_session', knessetdates)
            row['committee_days_with_pagra'], row['committee_days_without_pagra'] = calc_knessetdates_range(row, 'committee_first_session', 'committee_last_session', knessetdates)
            row['relevant_committee_days_with_pagra'], row['relevant_committee_days_without_pagra'] = calc_knessetdates_range(row, 'relevant_committee_first_session', 'relevant_committee_last_session', knessetdates)
            all_sessions = []
            for k in ['plenum_first_session', 'committee_first_session', 'plenum_last_session', 'committee_last_session']:
                if row[k]:
                    all_sessions.append(row[k])
            all_sessions = list(sorted(all_sessions))
            if len(all_sessions) > 0:
                row['all_first_session'], row['all_last_session'] = all_sessions[0], all_sessions[-1]
                row['all_days_with_pagra'], row['all_days_without_pagra'] = calc_knessetdates_range(row, 'all_first_session', 'all_last_session', knessetdates)
            else:
                row['all_first_session'], row['all_last_session'] = None, None
                row['all_days_with_pagra'], row['all_days_without_pagra'] = 0, 0
            yield row
    
    def process_rows(rows):
        if rows.res.name == 'bill_sessions':
            yield from process_bill_sessions(rows)
        elif rows.res.name == 'kns_knessetdates':
            yield from process_kns_knessetdates(rows)
        else:
            yield from rows
            
    def add_fields(package):
        for resource in package.pkg.descriptor['resources']:
            if resource['name'] == 'bill_sessions':
                resource['schema']['fields'] += [{'name': k, 'type': v} for k, v in {
                    'plenum_days_with_pagra': 'integer',
                    'plenum_days_without_pagra': 'integer',
                    'committee_days_with_pagra': 'integer',
                    'committee_days_without_pagra': 'integer',
                    'relevant_committee_days_with_pagra': 'integer',
                    'relevant_committee_days_without_pagra': 'integer',
                    'all_first_session': 'date',
                    'all_last_session': 'date',
                    'all_days_with_pagra': 'integer',
                    'all_days_without_pagra': 'integer'
                }.items()]
        yield package.pkg
        yield from package
    
    return Flow(
        process_rows,
        add_fields
    )

FINAL_BILL_SESSIONS_CHECKPOINT_NAME = 'hesderim_bill_splits_final_bill_sessions'

!{'rm -rf .checkpoints/%s' % FINAL_BILL_SESSIONS_CHECKPOINT_NAME}

Flow(
    load(f'.checkpoints/{PROCESS_SESSIONS_CHECKPOINT_NAME}/datapackage.json', resources=['kns_committee', 'kns_knessetdates']),
    load(f'.checkpoints/{PROCESS_SESSIONS_CHECKPOINT_NAME}/datapackage.json', resources=['bill_plenum_sessions']),
    load(f'.checkpoints/{PROCESS_SESSIONS_CHECKPOINT_NAME}/datapackage.json', resources=['bill_committee_sessions']),
    load(f'.checkpoints/{JOIN_HESDERIM_DATA_CHECKPOINT_NAME}/datapackage.json', resources=['kns_bill']),
    update_resource('kns_bill', name='bill_sessions', path='bill_sessions.csv'),
    join(source_name='bill_committee_sessions', source_key=['bill_id'],
         target_name='bill_sessions', target_key=['BillID'],
         fields={'committee_first_session': {}, 
                 'committee_last_session': {},
                 'committee_num_sessions': {},
                 'relevant_committee_first_session': {}, 
                 'relevant_committee_last_session': {},
                 'relevant_committee_num_sessions': {}}),
    join(source_name='bill_plenum_sessions', source_key=['bill_id'],
         target_name='bill_sessions', target_key=['BillID'],
         fields={'plenum_first_session': {}, 
                 'plenum_last_session': {},
                 'plenum_num_sessions': {},}),
    join(source_name='kns_committee', source_key=['CommitteeID'],
         target_name='bill_sessions', target_key=['CommitteeID'],
         fields={'CommitteeName': {'name': 'Name'}}),
    process_session_date_ranges(),
    printer(num_rows=1, tablefmt='html', resources=['bill_sessions']),
    dump_to_path('data/hesderim_bill_splits')
).process()[1]

#,BillID (integer),KnessetNum (integer),Name (string),SubTypeID (integer),SubTypeDesc (string),PrivateNumber (integer),CommitteeID (integer),StatusID (integer),Number (integer),PostponementReasonID (integer),PostponementReasonDesc (string),PublicationDate (datetime),MagazineNumber (integer),PageNumber (integer),IsContinuationBill (boolean),SummaryLaw (string),PublicationSeriesID (integer),PublicationSeriesDesc (string),PublicationSeriesFirstCall (string),LastUpdatedDate (datetime),hesderim_bill_id (integer),committee_first_session (date),committee_last_session (date),committee_num_sessions (integer),relevant_committee_first_session (date),relevant_committee_last_session (date),relevant_committee_num_sessions (integer),plenum_first_session (date),plenum_last_session (date),plenum_num_sessions (integer),CommitteeName (string),plenum_days_with_pagra (integer),plenum_days_without_pagra (integer),committee_days_with_pagra (integer),committee_days_without_pagra (integer),relevant_committee_days_with_pagra (integer),relevant_committee_days_without_pagra (integer),all_first_session (date),all_last_session (date),all_days_with_pagra (integer),all_days_without_pagra (integer)
1,474507.0,18.0,"סעיף 2 (חוק מיסוי מקרקעין - הגדלת ההיצע של דירות מגורים - הוראת שעה) בהצעת חוק לצמצום הגירעון ולהתמודדות עם השפעות המשבר בכלכלה העולמית (תיקוני חקיקה), התשע""ב-2012",53.0,ממשלתית,,653.0,113.0,723.0,,,,,,,,,,,2013-05-23 18:38:01+00:00,474392.0,,,,,,,,,,הכספים,,,,,,,,,0.0,0.0
2,88726.0,16.0,"הצעת חוק המדיניות הכלכלית לשנת הכספים 2004 (תיקוני חקיקה), התשס""ד-2003 (מיועדת רק לצורך חלוקת הצעת החוק)",53.0,ממשלתית,,23.0,113.0,64.0,,,,,,,,,,,2013-05-23 18:38:01+00:00,16633.0,2003-11-19,2003-12-01,4.0,2003-11-19,2003-12-01,4.0,2003-11-05,2003-11-05,1.0,הכלכלה,0.0,0.0,12.0,12.0,12.0,12.0,2003-11-05,2003-12-01,26.0,26.0
...,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
234,569078.0,20.0,"חוק לתיקון פקודת בריאות העם (מס' 29), התשע""ו-2015",53.0,ממשלתית,,928.0,118.0,951.0,,,2015-12-23 00:00:00+00:00,2516.0,303.0,,"שר הבריאות, בהסכמת שר האוצר ובאישור ועדת העבודה, הרווחה והבריאות של הכנסת, יקבע בתקנות כללים שיחולו על רופא, בין שכיר ובין עצמאי, שייעץ לאדם או טיפל בו במסגרת שירות מרפאה ציבורי או שירות מרפאה קהילתי, לעניין איסור המשך טיפול באותו אדם או ייעוץ לו שלא במימון ציבורי, במשך תקופה של בין ארבעה לשמונה חודשים, כפי שייקבע בתקנות. השר רשאי לקבוע בתקנות הקלות או הגבלות במקום האיסור האמור לעיל, אם מצא שניתן להסתפק בכך לשם מניעת ניגוד עניינים בין עבודתו של הרופא במסגרת שירות מרפאה ציבורי או שירות מרפאה קהילתי ובין עיסוקו כרופא שלא במימון ציבורי. שר הבריאות ידווח לוועדה פעמיים בשנה על יישומו של ההסדר המוצע ועל השלכותיו במשך שלוש שנים מיום חקיקתו. תקנות אלו יכול שיקבעו תחולה הדרגתית, לרבות בהתחשב במיקום גיאוגרפי, ובלבד שעד תום שלוש שנים מיום תחילתו של חוק זה יחולו התקנות באופן מלא על כלל הרופאים. שר הבריאות ושא האוצר רשאים להאריך, בצו, את התקופה האמורה בשנה אחת.",6071.0,ספר החוקים,,2018-10-07 09:19:44+00:00,568878.0,2015-10-22,2015-10-22,1.0,2015-10-22,2015-10-22,1.0,2015-11-23,2015-12-14,3.0,"ועדת העבודה, הרווחה והבריאות",21.0,21.0,0.0,0.0,0.0,0.0,2015-10-22,2015-12-14,53.0,53.0


{'count_of_rows': 384,
 'bytes': 131611,
 'hash': '25369b3a6a0cf7093cd9981b651912a6',
 'dataset_name': None}

In [33]:
columns = {
    "מזהה הצעת חוק ההסדרים שהצעה זו פוצלה ממנה": {"name": "hesderim_bill_id"},
    "מזהה הצעת החוק": {"name": "BillID"},
    "מספר הכנסת": {"name": "KnessetNum"},
    "שם הצעת החוק": {"name": "Name"},
    "מספר ישיבות מליאה המקושרות להצעת החוק": {"name": "plenum_num_sessions"},
    "מספר ישיבות ועדה המקושרות להצעת החוק": {"name": "committee_num_sessions"},
    "מזהה הועדה המטפלת": {"name": "CommitteeID"},
    "שם הועדה המטפלת": {"name": "CommitteeName"},
    "מספר ישיבות בועדה המטפלת": {"name": "relevant_committee_num_sessions"},
    "ישיבה ראשונה בועדה המטפלת": {"name": "relevant_committee_first_session"},
    "ישיבה אחרונה בועדה המטפלת": {"name": "relevant_committee_last_session"},
    "מספר ימים בין הישיבה הראשונה והישיבה האחרונה בועדה המטפלת (כולל פגרות)": {"name": "relevant_committee_days_with_pagra"},
    "מספר ימים בין הישיבה הראשונה והישיבה האחרונה בועדה המטפלת (לא כולל פגרות)": {"name": "relevant_committee_days_without_pagra"},
    "מספר ימים בין הישיבה הראשונה והאחרונה של מליאה או ועדות  מתוך כל הישיבות המקושרות להצעה (כולל פגרות)": {"name": "all_days_with_pagra"},
    "מספר ימים בין הישיבה הראשונה והאחרונה של מליאה או ועדות  מתוך כל הישיבות המקושרות להצעה (לא כולל פגרות)": {"name": "all_days_without_pagra"},
}

def set_columns(package):
    
    def get_fields(descriptor):
        source_fields = {field['name']: field for field in descriptor['schema']['fields']}
        for column_name, column in columns.items():
            field = source_fields[column['name']]
            yield dict(field, name=column_name)
    
    def get_row(row):
        for column_name, column in columns.items():
            yield column_name, row[column['name']]
    
    def get_resource(resource):
        for row in resource:
            yield dict(get_row(row))
    
    resource_descriptor = package.pkg.get_resource('bill_sessions').descriptor
    resource_descriptor['schema'] = {'fields': list(get_fields(resource_descriptor))}
    yield Package({'resources': [resource_descriptor]})
    for resource in package:
        yield get_resource(resource)
    
Flow(
    load('data/hesderim_bill_splits/datapackage.json', resources=['bill_sessions']),
    set_columns,
    dump_to_path('data/hesderim_bill_splits_export'),
    printer(tablefmt='html', num_rows=1)
).process()[1]

#,מזהה הצעת חוק ההסדרים שהצעה זו פוצלה ממנה (integer),מזהה הצעת החוק (integer),מספר הכנסת (integer),שם הצעת החוק (string),מספר ישיבות מליאה המקושרות להצעת החוק (integer),מספר ישיבות ועדה המקושרות להצעת החוק (integer),מזהה הועדה המטפלת (integer),שם הועדה המטפלת (string),מספר ישיבות בועדה המטפלת (integer),ישיבה ראשונה בועדה המטפלת (date),ישיבה אחרונה בועדה המטפלת (date),מספר ימים בין הישיבה הראשונה והישיבה האחרונה בועדה המטפלת (כולל פגרות) (integer),מספר ימים בין הישיבה הראשונה והישיבה האחרונה בועדה המטפלת (לא כולל פגרות) (integer),מספר ימים בין הישיבה הראשונה והאחרונה של מליאה או ועדות מתוך כל הישיבות המקושרות להצעה (כולל פגרות) (integer),מספר ימים בין הישיבה הראשונה והאחרונה של מליאה או ועדות מתוך כל הישיבות המקושרות להצעה (לא כולל פגרות) (integer)
1,474392.0,474507.0,18.0,"סעיף 2 (חוק מיסוי מקרקעין - הגדלת ההיצע של דירות מגורים - הוראת שעה) בהצעת חוק לצמצום הגירעון ולהתמודדות עם השפעות המשבר בכלכלה העולמית (תיקוני חקיקה), התשע""ב-2012",,,653.0,הכספים,,,,,,0.0,0.0
2,16633.0,88726.0,16.0,"הצעת חוק המדיניות הכלכלית לשנת הכספים 2004 (תיקוני חקיקה), התשס""ד-2003 (מיועדת רק לצורך חלוקת הצעת החוק)",1.0,4.0,23.0,הכלכלה,4.0,2003-11-19,2003-12-01,12.0,12.0,26.0,26.0
...,,,,,,,,,,,,,,,
234,568878.0,569078.0,20.0,"חוק לתיקון פקודת בריאות העם (מס' 29), התשע""ו-2015",3.0,1.0,928.0,"ועדת העבודה, הרווחה והבריאות",1.0,2015-10-22,2015-10-22,0.0,0.0,53.0,53.0


{'count_of_rows': 234,
 'bytes': 77778,
 'hash': '675fe6d1ebfef9f65e147f7157e53d93',
 'dataset_name': None}