In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
SWOB_DIR = ''
SWOB_FILE = ''

In [None]:
import dask
import dask.bag as db
import dask.dataframe as dd
import dask.distributed
import dask_jobqueue
import itertools
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import pathlib
import re
import xml.dom.minidom
import seaborn as sns

In [None]:
DATA_DIR = pathlib.Path(os.getenv('DATA_DIR'))

# Boot up slurm cluster

In [None]:
cluster = dask_jobqueue.SLURMCluster(
    cores=12,
    processes=6,
    memory='128G',
    env_extra=['source ~/.bash_profile','conda activate smc01'],
    name='smc01-dask',
    local_directory=DATA_DIR / 'dask',
    walltime='9:00:00',
    job_mem='12G',
    job_cpu=4,
)

In [None]:
cluster.scale(jobs=6)  # Scale to two working nodes as configured.

In [None]:
client = dask.distributed.Client(cluster)
client

# Load observation files in a dask bag

In [None]:
def string_to_dict(obs_xml_string):
    obs_data = xml.dom.minidom.parseString(obs_xml_string)
    metadata = obs_data.getElementsByTagName('identification-elements')[0]

    metadata_dict = {}

    for element in metadata.childNodes:
        variable = element.attributes['name'].value
        value = element.attributes['value'].value
        metadata_dict[variable] = value
        
    obs_dict = {}

    elements = obs_data.getElementsByTagName('elements')
    
    if elements:
        for element in elements[0].childNodes:
            variable = element.attributes['name'].value
            value = element.attributes['value'].value
            obs_dict[variable] = value
        
    return {**metadata_dict, **obs_dict}

In [None]:
SWOB_PATH = pathlib.Path(SWOB_DIR)

In [None]:
swob_files = list(SWOB_PATH.glob('**/*.xml'))

In [None]:
len(swob_files)

In [None]:
hourly_swob_files = [x for x in swob_files if 'minute' not in x.stem] # Keep only hourly swob files.

In [None]:
len(hourly_swob_files) / len(swob_files)

In [None]:
N_OBS = len(hourly_swob_files)
N_OBS

In [None]:
bag = db.read_text(hourly_swob_files, files_per_partition=20)

In [None]:
obs_dicts = bag.map(string_to_dict)

In [None]:
obs_dicts = obs_dicts.repartition(partition_size='10MB')

In [None]:
obs_dicts = obs_dicts.persist()

# Reduce to count keys

Check what keys are contained in the observations.

In [None]:
def dict_to_key_count(d):
    return {k: 1 for k in d}

In [None]:
def merge_count_dicts(counts1, counts2):
    counts = counts1
    
    for k, val in counts2.items():
        if k in counts:
            counts[k] += val
        else:
            counts[k] = val
            
    return counts

In [None]:
def count_keys(bag):
    return bag.map(dict_to_key_count).fold(merge_count_dicts).compute()

In [None]:
computed_counts = count_keys(obs_dicts)

In [None]:
def print_key_report(counts, filter_re='.*', total=N_OBS):
    
    compiled_filter = re.compile(filter_re)
    for k, v in sorted(counts.items(), key=lambda x: x[1], reverse=True):
        if compiled_filter.match(k):
            print('{:50}{:7} ({:6.1%})'.format(k, v, v / total))

In [None]:
print_key_report(computed_counts)

In [None]:
obs_dicts.filter(lambda x: 'tc_id' in x).pluck('tc_id').frequencies().compute()

## Corrections

Le champ "cor" semble servir à indiquer des corrections, mais puisqu'on parle de moins de .5% des observations, on va juste ignorer le champ.

In [None]:
obs_dicts.filter(lambda x: 'cor' in x).pluck('cor').frequencies().compute()

## Station type

Certaines stations indiquent un type de station.
Le type 18 semble impliquer une bouée marine au lieu d'une station terrestre.

In [None]:
obs_dicts.filter(lambda x: ('stn_typ' in x)).pluck('stn_typ').frequencies().compute()

Les observations qui ont un ICAO_STN_ID ont de quoi à faire avec l'aviation. Elles ont un profil de variables différent (plus de champs disponibles tout le temps).

In [None]:
airport_obs = obs_dicts.filter(lambda x: 'icao_stn_id' in x)

In [None]:
n_airport_obs = airport_obs.count().compute()

In [None]:
airport_obs.pluck('stn_typ').frequencies().compute()

In [None]:
airport_counts = airport_obs.map(dict_to_key_count).fold(merge_count_dicts).compute()

In [None]:
print_key_report(airport_counts, total=n_airport_obs)

Les observations avec un icao id proviennent de Nav Canada et de la défense

In [None]:
airport_obs.pluck('data_attrib_not').frequencies().compute()

# Extract fields from dict

In [None]:
def merge_dicts(dicts):
    merged = dicts[0].copy()
    for d in dicts:
        merged.update(d)
        
    return merged

class Extractor:
    """Used to extract records from observation dictionary."""
    COLUMNS = {}
    
    def schema(self):
        return self.COLUMNS
    
    def __call__(self, obs):
        return {k: self.process_value(k, obs[k]) for k in self.COLUMNS if k in obs}
    
    def process_value(self, key, value):
        if value == 'MSNG':
            return None
        
        return value
    
    
class CompositeExtractor:
    def __init__(self, extractors):
        self.extractors = extractors
        
    def schema(self):
        return merge_dicts([e.schema() for e in self.extractors])
    
    def __call__(self, obs):
        return merge_dicts([e(obs) for e in self.extractors])


# Metadata

In [None]:
class MetadataExtractor(Extractor):
    OBLIGATORY_COLUMNS = {
        'date_tm': 'datetime64[ns]',
        'stn_nam': 'object',
        'stn_elev': 'float',
        'msc_id': 'object',
        'lat': 'float',
        'long': 'float',
    }
    
    OPTIONAL_COLUMNS = {
        'data_pvdr': 'object',
        'wmo_synop_id': 'object',
        'tc_id': 'object',
        'icao_stn_id': 'object'
    }
    
    COLUMNS = {**OBLIGATORY_COLUMNS, **OPTIONAL_COLUMNS}
    
    def __call__(self, obs):
        for k in self.OBLIGATORY_COLUMNS:
            if k not in obs:
                raise ValueError('Obligatory metadata {} not found in observation'.format(k))
        return super().__call__(obs)

In [None]:
obs_dicts.map(MetadataExtractor()).take(1)

In [None]:
MetadataExtractor().schema()

# Température

## Répartition des champs de température

In [None]:
print_key_report(computed_counts, filter_re='.*temp.*')

In [None]:
class TemperatureExtractor(Extractor):
    AIR_TEMP_SOURCES = [
        'air_temp',
        'air_temp_1',
        'avg_air_temp_pst10mts',
        'avg_air_temp_pst1hr',
    ]
    
    COLUMNS = {
        'max_air_temp_pst1hr': 'float',
        'mar_air_temp_pst6hrs': 'float',
        'max_air_temp_pst24hrs': 'float',
        'min_air_temp_pst1hr': 'float',
        'min_air_temp_pst6hrs': 'float',
        'min_air_temp_pst24hrs': 'float',
    }
    
    def __call__(self, obs):
        extracted = super().__call__(obs)
        
        for s in self.AIR_TEMP_SOURCES:
            if s in obs:
                extracted['air_temp'] = self.process_value(s, obs[s])
                extracted['air_temp_source'] = s
        
        return extracted
        
    def schema(self):
        parent = super().schema()
        parent['air_temp'] = 'float'
        parent['air_temp_source'] = 'object'
        
        return parent

def get_temp(obs):
    """Read inst. air temperature from observation. If not directly provided,
    use alternate fields as a next best guess. Also, fetch air temperature statistics
    if available."""
    temp_dict = {}
    
    temp_keys = [
        'air_temp', 
        'air_temp_1', 
        'avg_air_temp_pst10mts', 
        'avg_air_temp_pst1hr'
    ]
    
    for k in temp_keys:
        if k in obs and obs[k] != 'MSNG':
            temp_dict['air_temp_source'], temp_dict['air_temp'] = k, obs[k]
            break
            
    for metric, duration in itertools.product(['max', 'min'], ['pst1hr', 'pst6hrs', 'pst24hrs']):
        k = '{}_air_temp_{}'.format(metric, duration)
        if k in obs and obs[k] != 'MSNG':
            temp_dict[k] = obs[k]
        
    return temp_dict

In [None]:
temps = obs_dicts.map(TemperatureExtractor()).take(10)

In [None]:
temps

In [None]:
print_key_report(no_temp_counts, total=no_temp_n_obs)

In [None]:
no_temp.pluck('data_pvdr').frequencies().compute()

In [None]:
no_bc = obs_dicts.filter(lambda x: 'data_pvdr' not in x or x['data_pvdr'] != 'BC-ENV')

In [None]:
no_bc.count().compute() / N_OBS

In [None]:
no_bc_count = count_keys(no_bc)

In [None]:
print_key_report(no_bc_count, filter_re='.*temp.*', total=no_bc.count().compute())

In [None]:
def take_temp_keys(d):
    base_dict = {k: d[k] for k in ['msc_id']}
    return {**{k: d[k] for k in d if 'temp' in k}, **base_dict}

def msng_to_nan(d):
    return {k: None if d[k] == 'MSNG' else d[k] for k in d}

In [None]:
obs_dicts.map(take_temp_keys).take(1, npartitions=-1)

In [None]:
temp_keys = obs_dicts.map(take_temp_keys).map(dict_to_key_count).fold(merge_count_dicts).compute()

In [None]:
temp_meta = dd.utils.make_meta([(k, 'f8') for k in temp_keys if k != 'msc_id'] + [('msc_id', 'string')])
temp_df = obs_dicts.map(take_temp_keys).map(msng_to_nan).to_dataframe(meta=temp_meta).compute()

In [None]:
temp_df.groupby('msc_id').mean()

In [None]:
temp_df[temp_df['']].isnull().mean()

In [None]:
obs_dicts.map(get_temp).filter(lambda x: 'air_temp' in x).pluck('air_temp_source').frequencies().compute()

In [None]:
temps = obs_dicts.map(get_temp).filter(lambda x: 'air_temp' in x).pluck('air_temp').compute()

In [None]:
temps = np.array(temps, dtype=np.float)

In [None]:
np.min(temps)

In [None]:
obs_dicts.map(get_temp).filter(lambda x: 'air_temp' in x).filter(lambda x: float(x['air_temp']) < -600).take(1, npartitions=-1)

# Vent

In [None]:
print_key_report(computed_counts, filter_re='.*wnd.*')

In [None]:
class WindExtractor(Extractor):
    COLUMNS = {
        'avg_wnd_dir_10m_pst10mts': 'float',
        'avg_wnd_spd_10m_pst10mts': 'float',
        'avg_wnd_dir_10m_pst2mts': 'float',
        'avg_wnd_spd_10m_pst2mts': 'float',
        'avg_wnd_dir_10m_pst1hr': 'float',
        'avg_wnd_spd_10m_pst1hr': 'float',
        'pk_wnd_rmk': 'object',
        'max_wnd_spd_10m_pst1hr': 'float',
        'wnd_dir_10m_pst1hr_max_spd': 'float',
        'max_wnd_gst_spd_10m_pst10mts': 'float',
        'wnd_dir_10m_pst10mts_max_spd': 'float',              
    }

In [None]:
ddf['date_tm']

# Pression

In [None]:
class PressureExtractor(Extractor):
    COLUMNS = {
        'altmetr_setng': 'float',
        'mslp': 'float',
        'pres_tend_amt_pst3hrs': 'float',
        'pres_tend_char_pst3hrs': 'float',
        'stn_pres': 'float',
    }
    
    def process_value(self, key, value):
        pre_processed = super().process_value(key, value)
        
        if pre_processed:
            value = float(pre_processed)

            if value > 0.0:
                return value
        
        return None

# Humidité

# Précipitations

# Visibilité

# Nuages

# Complete dict

In [None]:
extractor = CompositeExtractor([WindExtractor(), MetadataExtractor(), PressureExtractor()])
df = obs_dicts.map(extractor).to_dataframe(meta=extractor.schema()).compute()

In [None]:
obs_dicts.map(extractor).random_sample(0.001).take(1, npartitions=-1)

In [None]:
sns.scatterplot(x='stn_elev', y='stn_pres', data=df[df['data_pvdr'] == 'NAV CANADA'])

In [None]:
df['stn_pres'].min()