This notebook creates reports for a number of reference sites and sets up a periodic job to recompute each of those reports.

In [None]:
import datetime as dt
import json
import logging
import os 
import random
from collections import defaultdict
from functools import partial

from flask import Flask
import pandas as pd

from solarforecastarbiter import datamodel
from solarforecastarbiter.io import api
from solarforecastarbiter.reports.main import infer_timezone
from sfa_api import jobs
from sfa_api.utils import storage_interface

A number of config parameters must be set in order for this to function including:
- API_USER
- API_PASSWORD
- BASE_URL
- MYSQL_USER
- MYSQL_PASSWORD
- MYSQL_HOST
- MYSQL_PORT
- MYSQL_DATABASE

In [None]:
app = Flask('create_report_jobs')
app.config.from_envvar('REPORT_CONFIG')

In [None]:
logging.basicConfig(level=logging.INFO)
user = app.config['API_USER']
password = app.config['API_PASSWORD']
token = api.request_cli_access_token(user, password)
base_url = app.config['BASE_URL']
session = api.APISession(base_url=base_url, access_token=token)
organization = 'Reference'

In [None]:
networks = ('NOAA SURFRAD', 'NOAA SOLRAD', 'DOE RTC', 'NREL MIDC')
reference_sites = []
for site in session.list_sites():
    if site.provider == organization:
        try:
            extra = json.loads(site.extra_parameters)
        except Exception:
            logging.warning(f'Unable to decode extra params for {site.name}')
            continue
        if 'network' in extra and extra['network'] in networks:
            reference_sites.append(site)

In [None]:
variables = ('ghi', 'dni', 'ac_power')

forecasts = defaultdict(partial(defaultdict, list))
for fx in session.list_forecasts():
    if fx.site in reference_sites and fx.variable in variables:
        forecasts[fx.variable][fx.site.site_id].append(fx)
        
observations = defaultdict(dict)
for obs in session.list_observations():
    if obs.site in reference_sites and obs.variable in variables:
        observations[obs.variable][obs.site.site_id] = obs

In [None]:
base_report_parameters = datamodel.ReportParameters(
    name='replaceme',
    start=pd.Timestamp('2020-01-01T00:00:00Z'),
    end=pd.Timestamp('2020-12-31T23:59:59Z'),
    object_pairs=(),
    metrics=('mbe', 'mae', 'rmse'),
    categories=('total', 'month', 'hour', 'weekday', 'date'),
    filters=[datamodel.QualityFlagFilter(['NIGHTTIME', 'USER FLAGGED'])]
)


In [None]:
report_ids = []
current_reports = {r.report_parameters.name: (r.report_id, r.report_parameters)
                   for r in session.list_reports()}
for site in reference_sites:
    for variable in variables:
        sitename = site.name[:50]
        report_name = f'2020 {sitename} {datamodel.COMMON_NAMES[variable]}'
        if report_name in current_reports:
            params = current_reports[report_name]
            report_ids.append(
                (params[0], infer_timezone(params[1])))
            continue
        try:
            obs = observations[variable][site.site_id]
        except KeyError:
            logging.warning(f'Missing observation for {variable} {site.name}')
            continue
        try:
            fxs = forecasts[variable][site.site_id]
        except KeyError:
            logging.warning(f'Missing forecast for {variable} {site.name}')
            continue
        object_pairs = [
            datamodel.ForecastObservation(
                observation=obs, forecast=fx
            ) for fx in fxs
        ]
        if len(object_pairs) == 0:
            logging.warning(f'No forecast/observation pairs for {variable} {site.name}')
            continue
        report_params = base_report_parameters.replace(
            name=report_name, object_pairs=object_pairs
        ) 
        logging.info(f'Creating report {report_name}')
        new_report = session.create_report(
            datamodel.Report(report_parameters=report_params)
        )
        tz = infer_timezone(report_params)
        report_ids.append((new_report.report_id, tz))


In [None]:
with app.app_context():
    current_jobs = storage_interface._call_procedure('list_jobs', with_current_user=False)

In [None]:
user_id = session.get_user_info()['user_id']

In [None]:
report_jobs = [job['parameters']['report_id'] for job in current_jobs 
               if job['organization_name'] == organization and 
               job['job_type'] == 'periodic_report']


In [None]:
new_jobs = []
for report_id, tz in report_ids:
    hr = pd.Timestamp('20200101T0000', tz=tz).tz_convert('UTC').hour
    cron = f'{random.randint(0, 30)} {hr} * * *'
    timeout = '5m'
    job_name = f'{report_id} regen'
    if report_id in report_jobs:
        logging.warning(f'Job to recompute report {report_id} exists')
    else:
        with app.app_context():
            jobs.create_job('periodic_report', job_name, user_id, cron, timeout,
                            report_id=report_id, base_url=base_url)
        logging.info(f'Created report job for {report_id}')
        new_jobs.append(job_name)
    

In [None]:
with app.app_context():
    alljobs = storage_interface._call_procedure('list_jobs', with_current_user=False)

In [None]:
assert set(new_jobs) <= {j['name'] for j in alljobs}