## Setup

Mostly importing various libraries and reading in secrets

In [None]:
import requests
from tqdm.auto import tqdm

In [None]:
tqdm.pandas()

In [None]:
import json
import hashlib
import io

In [None]:
from db_utils import minio_utils

In [None]:
secrets = json.load(open("/home/jovyan/secrets/secrets.json"))

## Worked Example (January 2017)

A lot of the values are hard-coded here to make the code easier to ready.

### Loading Data

First up, downloading the detailed billing data from our internal object store. It is organised by `[year]-[month]`, which is determined by the bill's posting date.

**NB** This means it is possible for a bill to refer to billing periods many months, or even years in the past.

In [None]:
detailed_bill_df = minio_utils.minio_to_dataframe(
    minio_bucket="sap-r3-isu-connector.detailed-billing-2017-01",
    minio_key=secrets["minio"]["edge"]["access"],
    minio_secret=secrets["minio"]["edge"]["secret"],
    data_classification=minio_utils.DataClassification.EDGE,
    use_cache=True
)

In [None]:
detailed_bill_df.shape

In [None]:
detailed_bill_df.columns

In [None]:
detailed_bill_df.nunique()

### Preparing for sharing

The below categories were provided by Ken Sinclair-Smith in the Water Demand Management and Strategy branch in the Water and Sanitation department. This is the *definitive* list for all water supply-related tariffs.

In [None]:
WATER_CATEGORIES = {
 'BULK-WT',
 'BW-WT-O',
 'DPMTCLUS',
 'DPMTSTANDP',
 'GOVT-WT',
 'MISC-WT',
 'SCH-SP-WT',
 'WA-CLU-IND',
 'WAT-COMASS',
 'WAT-DP-IND',
 'WATER-CLU',
 'WATER-COM',
 'WATER-DPMT',
 'WATERFULLD',
 'WATER-IND',
 'WATER-INDI',
 'WAT-IND135',
 'WAT-IND180',
 'WAT-IND240',
 'WAT-IND300',
 'WAT-INDASS',
 'WATR-B-DEP',
 'WAVAIL',
 'WAVAILO',
 'WCONTRACTS',
 'WHSHELTER',
 'WINFORMAL',
 'WMISC-EX',
 'WMUNICIPAL',
 'WOAGESHELT',
 'WSOCHOUASS',
 'WSPRIN-COM',
 'WSPRINGIRR',
 'WSPRINGSCH',
 'WSPRI-SCHU',
 'WSPRNGDPMT',
 'WSTANDPIPE',
 'WSUBSLIP',
 'WTFLN-COMC',
 'WTFLN-COMU',
 'WTFLN-DOMC',
 'WTFLN-DPMC',
 'WTFLN-DPMU',
 'WTFLN-GLFU',
 'WTFLN-INDU',
 'WTFLN-SCHC',
 'WTFLN-SCHU',
 'WTFLNTGF-C',
 'WTFLN-USER',
 'WTFL-STADE',
 'WTFL-STANC',
}

These are the fields that we are going to share:

In [None]:
detailed_df_fields_to_share = [
    'InvoiceNumber', 'AmountBilled', 'UnitofMeasureCode', 'QuantityBilled', 
    'RateCategory', 'RateTypeCode',
    'BillingPeriodStart', 'BillingPeriodEnd'
]

Now, applying the filters:
1. Only selecting those fields we want to share.
2. Only selecting those line items that pertain to Water.

In [None]:
share_df = detailed_bill_df[
    detailed_df_fields_to_share
].query("RateCategory.isin(@WATER_CATEGORIES)").copy()

This is the field that we're going to hash so as to provide additional protection to residents' privacy.

In [None]:
detailed_df_fields_to_anonymise = [
    "InvoiceNumber"
]

**NB** I'm using the SHA256 algorithm to hash the values.

In [None]:
for anon_field in detailed_df_fields_to_anonymise: 
    share_df["{}Hashed".format(anon_field)] = share_df[anon_field].progress_apply(
        lambda val: hashlib.sha256(
            str.encode(val + secrets['epru-uct']['hash_salt'])
        ).hexdigest()
    )
    
share_df.drop(detailed_df_fields_to_anonymise, axis='columns', inplace=True)

### Uploading to CKAN

Here, we finally upload the data to CKAN, using it's [`resource_create` API](https://docs.ckan.org/en/2.8/api/#ckan.logic.action.create.resource_create).

**NB** Our request goes via an internal proxy.

In [None]:
city_proxy = f"http://{secrets['proxy']['username']}:{secrets['proxy']['password']}@internet05.capetown.gov.za:8080/"
ckan_api_key = secrets["external-ckan"]["ckan-api-key"]

In [None]:
%%time

resp = requests.post(
    'https://ckan.arbidata.com/api/action/resource_create',
    data={
        "package_id": "water-transactional-data",
        "name": "Water Detailed Billing Data 2017-01",
        "resource_type": "file",
        "format": "csv",
        "mimetype": "text/csv"
    },
    headers={"X-CKAN-API-Key": ckan_api_key},
    files={
        'upload': ('water-consumption-detailed-2017-01.csv',
                   io.StringIO(share_df.to_csv(index=False)))
    }
    proxies={
        "http": city_proxy,
        "https": city_proxy
    }
)

In [None]:
resp.json()

## Bulk upload (February 2017 - December 2017)

Below is the code used for performing the bulk uploads. Eventually it will live in a script, which will be executed as part of a data pipeline.

It's unfortunately a little hard to read, as I make quite extensive use of generator comprehension statements. This is because I have to go month-by-month, as each month occupies around $\approx$ 10 GBs in memory.

In [None]:
detailed_df_suffixes = [
    "{}-{:02d}".format(year, month)
    for year in ["2017"]
    for month in range(2, 4)
]

In [None]:
detailed_bill_dfs = (
    minio_utils.minio_to_dataframe(
        minio_bucket="sap-r3-isu-connector.detailed-billing-{}".format(detailed_df_suffix),
        minio_key=secrets["minio"]["edge"]["access"],
        minio_secret=secrets["minio"]["edge"]["secret"],
        data_classification=minio_utils.DataClassification.EDGE,
        use_cache=True
    )
    for detailed_df_suffix in tqdm(
        detailed_df_suffixes,
        desc="Data Download"
    )
)

In [None]:
share_dfs = (
    detailed_bill_df[
        detailed_df_fields_to_share
    ].query("RateCategory.isin(@WATER_CATEGORIES)").copy()
    for detail_bill_df in tqdm(
        detailed_bill_dfs,
        desc="Data Filter"
    )
)

In [None]:
hashed_share_dfs = (
    share_df.merge(
        pandas.Series(
            share_df[anon_field].apply(
                lambda val: hashlib.sha256(
                    str.encode(val + secrets['epru-uct']['hash_salt'])
                ).hexdigest()
            ),
            name = "{}Hashed".format(anon_field)
        ),
        right_index=True, left_index=True
    ).drop(
        anon_field, axis='columns'
    )
    for share_df in tqdm(
        share_dfs,
        desc="Data Hash"
    )
    for anon_field in detailed_df_fields_to_anonymise
)

In [None]:
upload_resps = [
    requests.post(
        'https://ckan.arbidata.com/api/action/resource_create',
        data={
            "package_id": "water-transactional-data",
            "name": "Water Detailed Billing Data {}".format(detailed_suffix),
            "resource_type": "file",
            "format": "csv",
            "mimetype": "text/csv"
        },
        headers={"X-CKAN-API-Key": ckan_api_key},
        files={
            'upload': (
                'water-consumption-detailed-{}.csv'.format(detailed_suffix), 
                 io.StringIO(hashed_share_df.to_csv(index=False))
            )
        },
        proxies={
            "http": city_proxy,
            "https": city_proxy
        }
    )
    for hashed_share_df, detailed_suffix in tqdm(
        zip(hashed_share_dfs, detailed_df_suffixes),
        desc="Data Upload"
    )
]

In [None]:
for resp, detailed_suffix in zip(upload_resps, detailed_df_suffixes):
    print("{}: {}".format(detailed_suffix, resp))