## Carbon Monitoring Project

[FluxNet](http://fluxnet.fluxdata.org/) is a worldwide collection of sensor stations that record a number of local variables relating to atmospheric conditions, solar flux and soil moisture. This notebook aims to visualize the data used in the carbon monitoring project [nee_data_fusion](https://github.com/greyNearing/nee_data_fusion/) using Python tools.

The goals of this notebook are to:

* examine the carbon flux measurements from each site
* determine the feasibility of using a model to predict the carbon flux at one site from every other site.
* generate and explain model

In [None]:
import numpy as np
import holoviews as hv
import pandas as pd

import hvplot.pandas
import geoviews.tile_sources as gts

hv.extension('bokeh')

## Open the `intake` catalog
This notebook uses [`intake`](https://intake.readthedocs.io/) to set up a data catalog with instructions for loading data for various projects. Before we read in any data, we'll open that catalog file and inspect the various data sources:

In [None]:
import intake

cat = intake.open_catalog('../catalog.yml')
list(cat)

## Load metadata
First we will load in the fluxnet_metadata containing some site information for each of the fluxnet sites. Included in these data are the lat and lon of each site and the vegetation encoding (more on this below). In the next cell we will read in these data and take a look at a random few lines:

In [None]:
metadata = cat.fluxnet_metadata().read()
metadata.sample(5)

The vegetation type is classified according to the categories set out in the International Geosphere–Biosphere Programme (**igbd**) with several additional categories defined on the [fluxdata website](http://www.fluxdata.org/DataInfo/Dataset%20Doc%20Lib/VegTypeIGBP.aspx).

In [None]:
igbp_vegetation = {
    'WAT': '00 - Water',
    'ENF': '01 - Evergreen Needleleaf Forest',
    'EBF': '02 - Evergreen Broadleaf Forest',
    'DNF': '03 - Deciduous Needleleaf Forest',
    'DBF': '04 - Deciduous Broadleaf Forest',
    'MF' : '05 - Mixed Forest',
    'CSH': '06 - Closed Shrublands',
    'OSH': '07 - Open shrublands',
    'WSA': '08 - Woody Savannas',
    'SAV': '09 - Savannas',
    'GRA': '10 - Grasslands',
    'WET': '11 - Permanent Wetlands',
    'CRO': '12 - Croplands',
    'URB': '13 - Urban and Built-up',
    'CNV': '14 - Cropland/Nartural Vegetation Mosaics',
    'SNO': '15 - Snow and Ice',
    'BSV': '16 - Baren or Sparsely Vegetated'
}

We can use the dictionary above to map from igbp codes to longer labels - creating a new column on our metadata. We will make this column an ordered categorical to improve visualizations.

In [None]:
from pandas.api.types import CategoricalDtype

dtype = CategoricalDtype(ordered=True, categories=sorted(igbp_vegetation.values()))
metadata['vegetation'] = (metadata['igbp']
                          .apply(lambda x: igbp_vegetation[x])
                          .astype(dtype))
metadata.sample(5)

### Visualize the fluxdata sites
The pyviz ecosystem strives to make it always straightforward to visualize your data. Here we will use Open Street Map tiles from `geoviews` 
to make a quick map of where the different sites are located and the vegetation at each site. 

In [None]:
metadata.hvplot.points('lon', 'lat', geo=True, color='vegetation',
                       height=420, width=800, cmap='Category20') * gts.OSM

## Loading FluxNet data ``extract_fluxnet.m``

The data in the [nee_data_fusion](https://github.com/greyNearing/nee_data_fusion/) repository is expressed as a collection of CSV files where the site names are expressed in the filenames.

This cell defines a function to:

* read in the data from all sites
* discard columns that we don't need
* calculate day of year

And another one to print progress

In [None]:
necessary_columns = ['P_ERA', 'TA_ERA', 'PA_ERA', 'SW_IN_ERA', 'LW_IN_ERA', 'WS_ERA',
                     'VPD_ERA', 'TIMESTAMP', 'site', 'NEE_CUT_USTAR50']
not_necessary_columns = ['SWC_F_MDS_1', 'SWC_F_MDS_2', 'SWC_F_MDS_3',
                         'TS_F_MDS_1', 'TS_F_MDS_2', 'TS_F_MDS_3']

keep_from_csv = necessary_columns + not_necessary_columns

y = 'NEE_CUT_USTAR50'

def season(df, metadata):
    """Add season column based on lat and month
    """
    site = df['site'].cat.categories.item()
    lat = metadata[metadata['site'] == site]['lat'].item()
    if lat > 0:
        seasons = {3: 'spring',  4: 'spring',  5: 'spring',
                   6: 'summer',  7: 'summer',  8: 'summer',
                   9: 'fall',   10: 'fall',   11: 'fall',
                  12: 'winter',  1: 'winter',  2: 'winter'}
    else:
        seasons = {3: 'fall',    4: 'fall',    5: 'fall',
                   6: 'winter',  7: 'winter',  8: 'winter',
                   9: 'spring', 10: 'spring', 11: 'spring',
                  12: 'summer',  1: 'summer',  2: 'summer'}
    return df.assign(season=df.TIMESTAMP.dt.month.map(seasons))

def clean_data(df):
    """Clean data columns:
    
     * adds nan col for missing columns
     * throws away un-needed columns
     * adds day of year
    """
    df = df.assign(**{col: np.nan for col in keep_from_csv if col not in df.columns})
    df = df[keep_from_csv]
    
    df = df.assign(DOY=df.TIMESTAMP.dt.dayofyear)
    df = df.assign(year=df.TIMESTAMP.dt.year)
    df = season(df, metadata)
    
    return df

def print_progress(i, new_line_at=60):
    """Print a dot for each i creating a new line every `new_line_at`
    """
    if (i + 1) % new_line_at != 0:
        print('.', end='')
    else: 
        print('.')

## Read and clean data
This will take a few minutes if the data is not cached yet. First we will get a list of all the files on the s3 bucket, then we will iterate over those files and cache, read, and munge the data in each one. This is necessary since the columns in each file don't necessarily match the columns in the other files. Before we concatenate across sites, we need to do some cleaning. 

In [None]:
from s3fs import S3FileSystem
s3 = S3FileSystem(anon=True)
s3_paths = s3.glob('earth-data/carbon_flux/nee_data_fusion/FLX*')

In [None]:
datasets = []
skipped = []
used = []

for i, s3_path in enumerate(s3_paths):
    print_progress(i)
    
    dd = cat.fluxnet_daily(s3_path=s3_path).to_dask()
    site = dd['site'].cat.categories.item()
    
    if not set(dd.columns) >= set(necessary_columns):
        skipped.append(site)
        continue

    datasets.append(clean_data(dd))
    used.append(site)

print()
print('Found {} fluxnet sites with enough data to use - skipped {}'.format(len(used), len(skipped)))

Now that we have a list of datasets, we will concatenate across all rows.

In [None]:
import dask
X = dask.dataframe.concat(datasets).compute()
X.columns

We'll also set the dtype of site to category. This will come in handy later.

In [None]:
X['site'] = X['site'].astype('category')

### Visualize Data Availability

We can look at the sites for which we have data. We'll plot the sites on a world map again - this time using a custom colormap to denote sites with valid data, sites where data exist but were not loaded because too many fields were missing, and sites where no data was available. In addition to this map we'll get the count of different vegetation types at the sites.

In [None]:
def mapper(x):
    if x in used:
        return 'valid'
    elif x in skipped:
        return 'skipped'
    else:
        return 'no data'
    
cmap = {'valid': 'green', 'skipped': 'red', 'no data': 'gray'}

QA = metadata.copy()
QA['quality'] = QA['site'].map(mapper)


world = QA.hvplot.points('lon', 'lat', geo=True, color='quality', cmap=cmap, hover_cols=['site', 'vegetation'],
                         height=420, width=600).options(legend_position='top', tools=['hover', 'tap'])

def veg_count(data):
    veg_count = data['vegetation'].value_counts().sort_index(ascending=False)
    return veg_count.hvplot.barh(height=420, width=500)

hist = veg_count(QA[QA.quality=='valid']).relabel('Vegetation counts for valid sites')

world * gts.OSM + hist

We'll make a couple functions that generate plots on the full set of data or a subset of the data. We will use these in our dashboard

In [None]:
def one_timeseries(data):
    """Make a timeseries plot showing the mean carbon flux at each DOY as well as the min and max
    """
    return hv.Overlay([
        data.groupby(['DOY', 'year'])[y].mean().groupby('DOY').agg([np.min, np.max]).hvplot.area('DOY', 'amin', 'amax', alpha=0.2, fields={'amin': y}),
        data.groupby('DOY')[y].mean().hvplot()
    ]).options(width=800)

def one_count_plot(data):
    """Make a plot of the number of observations of each of the non-mandatory variables. 
    """
    return data[not_necessary_columns + ['site']].count().hvplot.bar(rot=90, width=300, height=300)

timeseries = one_timeseries(X)
count_plot = one_count_plot(X)
timeseries + count_plot

### Dashboard

Using the plots and functions defined above, we can make a dashboard of sites where by clicking on a site, you get the timeseries and variable count for that particular site.

In [None]:
from holoviews.streams import Selection1D
import panel as pn

In [None]:
stream = Selection1D(source=world)
empty = timeseries.relabel('No selection') + count_plot.relabel('No selection')

def on_select(index):
    if not index:
        return empty
    i = index[0]
    if i in QA[QA.quality=='valid'].index:
        site = QA.iloc[i].site
        ts = one_timeseries(X[X.site == site]).relabel(site)
        ct = one_count_plot(X[X.site == site]).relabel(site)
        return ts + ct
    else:
        return empty

one_site = hv.DynamicMap(on_select, streams=[stream])

pn.Column(pn.Row(world * gts.OSM, hist), pn.Row(one_site))

## Merge data

Now that the data are loaded in we can merge the daily data with the metadata from before.

In order to use the categorical `igbp` field, we will create a one hot encoding where each column corresponds to one of the `igbp` types, the rows correspond to observations and all the cells are filled with 0 or 1. This can be done use the method `pd.get_dummies`:

In [None]:
onehot_metadata = pd.get_dummies(metadata, columns=['igbp'])
onehot_metadata.sample(5)

We'll merge the metadata with all our daily observations - creating a tidy dataframe. 

In [None]:
df = pd.merge(X, onehot_metadata, on='site')
df.sample(5)

## Explore Data Availability
Now that all of our observations are merged with the site metadata, we can take a look at which sites have non-mandatory fields:

In [None]:
have_some_extra_vars = df[df[not_necessary_columns].notnull().any(1)]
have_some = metadata[metadata.site.isin(have_some_extra_vars.site.unique())]

In [None]:
have_all_extra_vars = df[df[not_necessary_columns].notnull().all(1)]
have_all = metadata[metadata.site.isin(have_all_extra_vars.site.unique())]

In [None]:
have_some.hvplot.points('lon', 'lat', geo=True, hover_cols=['site', 'vegetation'], height=420, width=600).options(legend_position='top').relabel('have some extra vars') * \
have_all.hvplot.points('lon', 'lat', geo=True, hover_cols=['site', 'vegetation']).relabel('have all extra vars') * gts.OSM + \
veg_count(have_some) * veg_count(have_all)

Since there seems to be a strong geographic pattern in the availablity of soil moisture and soil temperature data, we won't use those columns in our model. 

In [None]:
df = df.drop(columns=not_necessary_columns)

Now we will set data to only the rows where there are no null values:

In [None]:
df = df[df.notnull().all(1)]

## Sample the data
Now we need to split the data into columns that we'll use in the regression and columns that we'll use to explain our model.

In [None]:
explanatory_cols = ['DOY', 'lat', 'lon', 'season', 'site', 'vegetation', 'year']
data_cols = ['P_ERA', 'TA_ERA', 'PA_ERA', 'SW_IN_ERA', 'LW_IN_ERA', 'WS_ERA', 'VPD_ERA']
igbp_cols = [col for col in df.columns if col.startswith('igbp')]
regression_cols = data_cols + igbp_cols + [y]

To speed up the rest of the computations, we'll take a sample (10%) of the observations. We'll also remove some variables that we don't want to use in the linear regression.

In [None]:
df_sample = df.sample(frac=0.10)

explanatory_df = df_sample[explanatory_cols]
regression_df = df_sample[regression_cols]
regression_df = regression_df.rename(columns={y:'y'})

print("{} observations and {} variables".format(*regression_df.shape))
print("Generating a prediction with these variables: \n  {}".format(
    "\n  ".join(list(
        regression_df.columns
    ))
))

These variables are sufficient to create the linear models at every site. However, the site information is hidden from the visualization algorithm.

* Good sanity checks:
    - latitude encoded some structure, longitude does not

## Fluxnet Data Analysis

Linear models work well *at one site* but this is confounded by

* lat/lon
* day of year
* environment type

We want to generate some visualization that accounts for these 4 variables and helps generate some understanding.

That is, these observations lie on some manifold. We want to learn the structure of that manifold, and visualize each observation on that manifold.

This work attempts to find similar observations - observations that have a similar structure between the independent variables (e.g., `P_ERA`) and dependent variables (the carbon flux measurement `y`).

UMAP is a tool for this, and has firm mathematical grounding (plus, it's nice to use).

In [None]:
import umap
reduct = umap.UMAP(verbose=True, n_epochs=None)#, n_neighbors=30)

In [None]:
reduct.fit(regression_df.values)

In [None]:
embedding = reduct.embedding_
embedding

In [None]:
umapped_df = explanatory_df.join(pd.DataFrame(embedding, index=df_sample.index, columns=['x0', 'x1']))

We can explore this manifold by coloring a scatter plot according to different variables that we believe should have structure in this space.

In [None]:
%opts Scatter [height=500 width=500, colorbar=True]

color_by = 'lat'
title = 'Observations colored by {}'.format(color_by)

hv.Scatter(umapped_df, kdims=['x0', 'x1'], extents=(-15,-15,15,15)).relabel(title).options(color_index=color_by)

In [None]:
import colorcet as cc

color_by = 'DOY'
title = 'Observations colored by {}'.format(color_by)

doy = hv.Scatter(umapped_df, kdims=['x0', 'x1'], extents=(-15,-15,15,15)).relabel(title)\
    .options(color_index=color_by, cmap=cc.cm['cyclic_mrybm_35_75_c68'])

color_by = 'season'
title = 'Observations colored by {}'.format(color_by)

season = hv.Scatter(umapped_df, kdims=['x0', 'x1'], extents=(-15,-15,15,15)).relabel(title).options(color_index=color_by, cmap='Category10')

doy + season

### Taking a closer look at vegetation

We can specify a more custom color map for vegetation and rename the categories with more specific labels. 

Isolate each vegetation type so that any site eccentricities are made clear. In this, let's **color by site ID**

In [None]:
umapped_df.hvplot.scatter('x0', 'x1', by='vegetation', subplots=True, width=300, size=1, color='lat', legend=False)

Next we will train a model to predict carbon flux globally. 

## Setup Dask
With dask, we can distribute tasks over cores and do parallel computation.

In [None]:
import dask
import dask.array as da
import dask.dataframe as dd
from distributed import Client

client = Client()
client

## Prediction

In [None]:
import numpy as np
from sklearn.preprocessing import StandardScaler

X = df[regression_cols].values
y = df['NEE_CUT_USTAR50'].values

# transform data matrix so 0 mean, unit variance for each feature
X = StandardScaler().fit_transform(X)

In this case - since the dataset is not too small, we'll use classical validation. Mention that you can use leave one out. Use the full dataset with classical validation rather than (10%).

In [None]:
from sklearn.model_selection import GroupShuffleSplit

In [None]:
sep = GroupShuffleSplit()
train_idx, test_idx = next(sep.split(X, y, df.site.cat.codes))

In [None]:
train_sites = df.iloc[train_idx].site.unique()
test_sites = df.iloc[test_idx].site.unique()

train_site_metadata = metadata[metadata.site.isin(train_sites)]
test_site_metadata = metadata[metadata.site.isin(test_sites)]

Let's make a world map showing the sites that will be used as in training and those that will be used in testing:

In [None]:
train_site_metadata.hvplot.points('lon', 'lat', geo=True, hover_cols=['site', 'vegetation'], height=420, width=600).options(legend_position='top').relabel('training sites') * \
test_site_metadata.hvplot.points('lon', 'lat', geo=True, hover_cols=['site', 'vegetation']).relabel('testing sites') * gts.OSM + \
veg_count(metadata[metadata.site.isin(train_sites)]) * veg_count(metadata[metadata.site.isin(test_sites)])

In [None]:
from sklearn.model_selection import LeaveOneGroupOut
from sklearn.linear_model import LinearRegression
from dateutil import rrule
from datetime import datetime, timedelta

def fit_and_predict(X_train, y_train, X_test):
    indices = np.arange(len(X_train), dtype=int)
    
    X_train_filtered = X_train[indices.flat[:]] 
    y_train_filtered = y_train[indices.flat[:]] 
        
    model = LinearRegression()
    model.fit(X_train_filtered, y_train_filtered)
    
    return model.predict(X_test)

In [None]:
def prediction_stats(train_idx, test_idx, X, y, doy=None, predict_each='season'):
    # Use the timestamp column and move this up.
    start = datetime(2000, 1, 1)
    end = start + timedelta(days=365)
    
    if predict_each == 'month':
        get_time_id = lambda dt: dt.month
    elif predict_each == 'year':
        get_time_id = lambda dt: 1
    elif predict_each == 'season':
        # do this earlier and do a split as northern and southern hemisphere
        seasons = {'spring': [3, 4, 5],
                   'summer': [6, 7, 8],
                   'fall': [9, 10, 11],
                   'winter': [12, 1, 2]}
        seasons = {month: season_id
                   for season_id, months in enumerate(seasons.values())
                   for month in months}
        get_time_id = lambda dt: seasons[dt.month] 
    else:
        msg = "predict_each should be in {'year', 'month', 'season'}, got '{}'"
        raise ValueError(msg.format(predict_each))
    
    # from https://stackoverflow.com/questions/153584/how-to-iterate-over-a-timespan-after-days-hours-weeks-and-months-in-python
    time_partitions = {(dt - start).days: get_time_id(dt)
                       for dt in rrule.rrule(rrule.DAILY, dtstart=start, until=end)}
    time_partitions[366] = max(time_partitions.values())
    
    test_days = doy[test_idx]
    
    preds = []
    for time_partition in time_partitions.values():
        if len(time_partitions.values()) > 1:
            time_idx = [i for i, day in enumerate(doy) if time_partitions[day] == time_partition]
            
            # get the test set specific to this time instance
            time_test_idx = np.intersect1d(test_idx, time_idx)
        else:
            time_test_idx = test_idx 

        if len(time_test_idx) == 0:
            continue
            
        y_hat = fit_and_predict(X[train_idx], y[train_idx], X[time_test_idx])
        y_test = y[time_test_idx]
        preds += [{'predicted': y_hat,
                   'actual': y_test,
                   'time_partition': time_partition,
                   'corrcoef': np.corrcoef(y_hat, y_test)[0][1]}]
    actual = [p['actual'] for p in preds]
    predicted = [p['predicted'] for p in preds]
    actual = np.concatenate(actual).flat[:]
    predicted = np.concatenate(predicted).flat[:]
    return {'time_partitions': preds,
            'actual': actual,
            'predicted': predicted,
            'corrcoef': np.corrcoef(actual, predicted)[0][1]}


from sklearn.model_selection import LeaveOneGroupOut
sep = LeaveOneGroupOut()
train_idx, test_idx = list(sep.split(X, y, sites))[0]
_ = prediction_stats(train_idx, test_idx, X, y, doy=dropped['DOY'])

In [None]:
from sklearn.model_selection import LeaveOneGroupOut
sep = LeaveOneGroupOut()
corrs = []

futures = []
n_splits = sep.get_n_splits(X, y, sites)
X_future = client.scatter(X)
y_future = client.scatter(y)
doy_future = client.scatter(dropped['DOY'])
for i, (train_index, test_index) in enumerate(sep.split(X, y, sites)):
    futures += [{'site_id': i,
                 'train_index': train_index,
                 'test_index': test_index,
                 'stats': client.submit(prediction_stats,
                                        train_index,
                                        test_index,
                                        X_future,
                                        y_future,
                                        doy=doy_future)}]

In [None]:
results = client.gather(futures)

In [None]:
out = [{'site_id': result['site_id'], **result['stats']}
       for result in results]

In [None]:
df = pd.DataFrame(out)
df.head()

In [None]:
%%opts VLine [show_legend=False] VLine (color='red')
corrs = df.corrcoef[~np.isnan(df.corrcoef.values)]
frequencies, edges = np.histogram(corrs, 20)

c1 = hv.Histogram((frequencies, edges), extents=(-1, None, 1, None))
c2 = hv.VLine(np.mean(corrs), label='mean')
c3 = hv.VLine(np.median(corrs), label='median')
c1 * c2# * c3

Performance on the training data and then do it on the test data. Can we predict certain vegetations better than others. Color by correlation and then hover to see actual and predicted timeseries. 

fraction of explained variance.  Timeseries of actual and predicted.

In [None]:
np.mean(corrs)