In [None]:
%%capture

import requests
import numpy as np
import xarray as xr
import dask
import dask.dataframe as dd
import pandas as pd
from dask.distributed import progress
from distributed import Client
from datetime import datetime, timedelta


# url = "http://smartmet.fmi.fi/timeseries?producer={}&tz=gmt&precision=auto&starttime={}&endtime={}&param=fmisid,longitude,latitude,utctime,elevation,{}&format=json".format(

def start_client():
    client = Client("dask-development-scheduler:8786")
    client.restart()
    return client
    

def download_obs_data_flash(time_interval, format: str = "json"):
    parameters = ["flash_id","longitude","latitude","utctime","altitude","peak_current"]
    url = "http://smartmet.fmi.fi/timeseries"
    payload = {"producer": "flash", 
               "tz": "gmt",
               "format": format,
               "param": ','.join(parameters),
               "starttime": time_interval["starttime"],
               "endtime": time_interval["endtime"]}
    r = requests.get(url, params=payload)
    trad_obs = r.json()
    data_df = pd.DataFrame(trad_obs)
    data_df.rename(columns={"flash_id": "station_id","peak_current": "flash","altitude": "elevation"}, inplace=True)
    data_ds = xr.Dataset.from_dataframe(data_df)
    return data_ds
    

def get_obs_data_delay(producer, time_range, observation=None, parameters=None):
    def generate_time_interval(time_range):
        now = datetime.now()
        time_intervals = []
        for i in range(time_range + 1):
            endtime = now + timedelta(days=i - 1)
            starttime = endtime - timedelta(days=1)
            time_interval =  { "starttime": datetime.strftime(starttime, "%Y-%m-%dT00:00:00"),
                               "endtime": datetime.strftime(endtime, "%Y-%m-%dT00:00:00") }
            time_intervals.append(time_interval)
        return time_intervals
    
    def merge_delayed(d):
        return xr.merge(d, compat='override')  # yhdistetään eri asemilta, testivaiheessa yritetään saada homma toimimaan ja fiksataan myöhemmin
        
    time_intervals = generate_time_interval(time_range)
    if producer == 'flash':
        lots_of_data = [dask.delayed(download_obs_data_flash)(t) for t in time_intervals]
    elif producer == 'observation_fmi':
        lots_of_data = [dask.delayed(download_obs_data_temp)(p, t) for p in parameters for t in time_intervals]
    elif producer == 'wind':
        lots_of_data = [dask.delayed(download_obs_data_wind)(p, t) for p in parameters for t in time_intervals]
    else:
        lots_of_data = [dask.delayed(download_obs_selected)(producer, p, t) for p in parameters for t in time_intervals]
    even_more_data = dask.delayed(merge_delayed)(d for d in lots_of_data)
    return even_more_data


def merge_obs_delayed(client, even_more_data):
    merged = client.persist(even_more_data)
    progress(merged)
    print("Done!")
    return merged