# Data Streams

In [1]:
from pathlib import Path
import json
import pandas as pd
import numpy as np
import shutil
import re
import requests
import h5py

import sys
import os

module_path = os.path.abspath(os.pardir)
if module_path not in sys.path:
    sys.path.append(module_path)

In [2]:
from app.utils.naming import component_to_csv_file, format_component_name

In [3]:
def create_stream(p, c, col=None):
    stream = {
        'urlCode': 'API_PY',
        'endpoint': f'/data/?product={p["product"]}&component={c["name"]}',
        'dataType': c['dataType'],
        'keywords': c['keywords'],
        'description': c.get('description', '')
    }
    if col:
        stream['endpoint'] += f'&field={col}'
        
        if p['product'] == 'ons/england/mortality':
            stream['description'] = col.split('___')[0] + ', ' + c['description']
        else:
            desc = c.get('description', '')
            if desc:
                stream['description'] = col + ', ' + desc
            else:
                stream['description'] = col
                
    return stream

In [4]:
def generate_streams(manifest, names=None, folder='../../data/live/', split=True):
    folder = Path(folder)
    streams = []
    for p in manifest:
        for c in p['components']:
            if names and c['name'] not in names:
                continue
                
            # Register the component
            filepath = component_to_csv_file(folder, p['product'], c['name'])
            df = pd.read_csv(filepath, index_col=0)
            streams.append(create_stream(p, c))
            
            # Each field in a component csv file should be registered separately as individual data streams
            if split and len(df.columns) > 1:
                for col in df.columns:
                    stream = create_stream(p, c, col)
                    col = format_component_name(col)
                    # female___1-14 years: 2 separate keywords
                    extra_keywords = col.split('___') if '___' in col else [col]
                    stream['keywords'] = stream['keywords'] + extra_keywords
                    streams.append(stream)
    return streams

In [5]:
def test_endpoints(streams, base_url='http://localhost:3000/stat/v1'):
    # Can the endpoints be accessed?
    for s in streams:
        response = requests.get(base_url + s['endpoint'])
        assert response.status_code == 200

In [6]:
def test_streams(streams):
    # There should be 15 age_group/gender/scotland stream, 1 overall and 14 age_group x gender for covid_deaths
    subset = [s for s in streams if all(k in s['keywords'] for k in {'scotland', 'covid_deaths', 'age_group', 'gender'})]
    assert len(subset) == 15
    assert len([s for s in subset if 'male' in s['keywords']]) == 7
    assert len([s for s in subset if 'female' in s['keywords']]) == 7
    assert len([s for s in subset if '1_14_years' in s['keywords']]) == 2

## Create streams

In [7]:
# with open('../manifest/manifest.json') as f:
#     manifest = json.load(f)

In [8]:
# streams = generate_streams(manifest)
# # test_endpoints(streams)
# # test_streams(streams)
# len(streams)

## Register

In [9]:
def get_token(prod=False):
    url = 'http://vis.scrc.uk/api/v1/auth/login' if prod else 'http://localhost:2000/api/v1/auth/login'
    token = None
    try:
        res = requests.post(url, {'password': "kCXTZR5P3BtyPgGL", 'email': "phong@admin.com"})
        if res and res.json() and res.json()['token']:
            token = res.json()['token']

    except ConnectionError as e:
        print("token request: error = ", e)

    except Exception as e:
        print("Something went wrong", e)

    else:
        return token

def register(data, token, prod=False):
    url = 'http://vis.scrc.uk/api/v1/ontology/data' if prod else 'http://localhost:2000/api/v1/ontology/data'
    headers = {'Authorization': 'Bearer ' + token}
    try:
        response = requests.post(url, data, headers=headers)
        response = response.json()
        if 'message' in response:
            print(response)
    except Exception as e:
        print(e)

### 1. Agegroup/gender

In [63]:
for s in streams:
    s['keywords'] = s['keywords'] + ['phong']

In [64]:
age_gen_streams = [s for s in streams if all(k in s['keywords'] for k in {'scotland', 'covid_deaths', 'age_group', 'gender'})]

In [None]:
for s in age_gen_streams:
    register(s, token)

### 2. Mock England agegroup/gender

In [None]:
s = json.dumps(age_gen_streams)
wales_streams = json.loads(s.replace('scotland', 'wales'))
for s in wales_streams:
    register(s, token)

### 3. ONS

In [35]:
with open('../manifest/ons-manifest.json') as f:
    ons_manifest = json.load(f)
ons_streams = generate_streams(ons_manifest)
for s in ons_streams:
    s['keywords'] = s['keywords'] + ['phong']

In [None]:
test_endpoints(ons_streams)

In [None]:
for s in ons_streams:
    register(s, token)

---

## 1. Dashboards

```
var DATASTREAM_1 = "data/nhs_health_board_date_covid19_patients_in_hospital_confirmed_normalized.csv";
var DATASTREAM_2 = "data/nhs_health_board_date_covid19_patients_in_hospital_confirmed.csv";
var DATASTREAM_3 = "data/nhs_health_board_date_covid19_patients_in_icu_confirmed.csv";
var DATASTREAM_4 = "data/nhsboard_date_total_daily_tests_reported.csv";
var DATASTREAM_5 = "data/nhs_health_board_week_covid_related_deaths.csv";
var DATASTREAM_6 = "data/nhs_health_board_week_all_deaths.csv";
```

In [72]:
with open('../manifest/manifest.json') as f:
    manifest = json.load(f)

names = [
#     'date-country-new_cases_reported',
#     'date-country-covid19_patients_in_hospital-confirmed',
#     'date-country-covid19_patients_in_icu-confirmed',
#     'nhsboard/date-total_daily_tests_reported',
#     'nhsboard/date-total_daily_tests_reported_normalized',
#     'nhs_health_board/date-covid19_patients_in_hospital-confirmed',
#     'nhs_health_board/date-covid19_patients_in_hospital-confirmed_normalized',
#     'nhs_health_board/date-covid19_patients_in_icu-confirmed',
#     'nhs_health_board/date-covid19_patients_in_icu-confirmed_normalized',
#     'nhs_health_board/week-covid_related_deaths',
#     'nhs_health_board/week-covid_related_deaths_normalized',
#     'nhs_health_board/week-all_deaths',
#     'nhs_health_board/week-all_deaths_normalized',
    'council_area/week-all_deaths',
    'council_area/week-covid_related_deaths'
]

db_streams = generate_streams(manifest, names)
for s in db_streams:
    s['keywords'] = s['keywords'] + ['bdb']
    if not s['description']:
        s['description'] = 'Scotland'
        s['keywords'] = s['keywords'] + ['all_local_authorities']

In [74]:
test_endpoints(db_streams)

In [75]:
token = get_token(prod=True)
for s in db_streams:
    register(s, token, prod=True)

## 2. Opendata

In [11]:
with open('../manifest/opendata-manifest.json') as f:
    manifest = json.load(f)

od_streams = generate_streams(manifest, split=False)
test_endpoints(od_streams)
len(od_streams)

48

In [27]:
token = get_token(prod=True)

In [18]:
for s in od_streams:
    register(s, token, prod=True)

In [29]:
with open('../manifest/opendata-manifest.json') as f:
    manifest = json.load(f)

names = [
    'daily_health_boards',
    'daily_local_authorities'
]

streams = generate_streams(manifest, names, split=False)
test_endpoints(streams)
len(streams)

2

In [30]:
for s in streams:
    register(s, token, prod=True)

## 3. Cumulative

In [23]:
with open('../manifest/manifest.json') as f:
    manifest = json.load(f)

names = [
    'test_result/date-people_tested_for_covid19-cumulative',
    'testing_location/date-covid19_tests_carried_out-cumulative',
    'date-country-covid19_confirmed_deaths_registered-cumulative'
]

streams = generate_streams(manifest, names)
test_endpoints(streams)
len(streams)

8

In [24]:
token = get_token(prod=True)

In [25]:
for s in streams:
    register(s, token, prod=True)

## 4. Models

In [58]:
def generate_model_streams(manifest):
    streams = []
    for p in manifest:
        for c in p['components']:
            stream = {
                'urlCode': 'API_PY',
                'endpoint': f'/{p["model"]}/{c["name"]}',
                'dataType': c['dataType'],
                'keywords': c['keywords'],
                'description': c['description']
            }
            streams.append(stream)
    return streams
            
with open('../manifest/model-manifest.json') as f:
    manifest = json.load(f)
    
streams = generate_model_streams(manifest)
test_endpoints(streams)

In [60]:
for s in streams:
    register(s, token, prod=True)

## 5. Correlations

In [17]:
def generate_corr_streams(manifest):
    streams = []
    for d in manifest:
        stream = {
            'urlCode': 'API_PY',
            'endpoint': f'/correlation?{d["name"]}',
            'dataType': d['dataType'],
            'keywords': d['keywords'],
            'description': d['description']
        }
        streams.append(stream)
    return streams
            
with open('../manifest/corr-manifest.json') as f:
    manifest = json.load(f)
    
streams = generate_corr_streams(manifest)
test_endpoints(streams)

In [23]:
token = get_token(prod=True)
for s in streams:
    register(s, token, prod=True)

## 6. Location

In [11]:
with open('../manifest/manifest.json') as f:
    manifest = json.load(f)

names = [
    'location_type/week-all_deaths',
    'location_type/week-covid_related_deaths'
]

streams = generate_streams(manifest, names)
test_endpoints(streams)

In [14]:
token = get_token(prod=True)
for s in streams:
    register(s, token, prod=True)