In [None]:
%matplotlib inline

In [None]:
import os
import collections
import datetime
import json

import numpy
import matplotlib
matplotlib.rcParams['font.size'] = 16
import matplotlib.pyplot
import pandas

import tokio
import tokio.connectors.nersc_isdct

In [None]:
TARGET_DATE = datetime.datetime(2016, 3, 11)

date_start = TARGET_DATE
date_end = TARGET_DATE + datetime.timedelta(days=1)

print("Returning data from %s to %s" % (date_start, date_end))

In [None]:
class DailyTraffic(dict):
    def __init__(self, date, use_caches=False, *args, **kwargs):
        super(DailyTraffic, self).__init__(self, *args, **kwargs)
        self.date = date
        self.cache_file = None
        if use_caches:
            self.cache_file = 'dailytraffic_%s.json' % self.date.strftime("%Y-%m-%d")
            self.load_cache()

    def __str__(self):
        summaries_for_df = collections.defaultdict(dict)
        ret = ""
        for system, iovolumes in self.items():
            ret += "%12s %s read, %s written\n" % (
                    system,
                    tokio.common.humanize_bytes(iovolumes['read'], fmt="%6.1f %3s"),
                    tokio.common.humanize_bytes(iovolumes['write'], fmt="%6.1f %3s"))
        return ret
    
    def _store_rw_bytes(self, system, read, write):
        added = {
            'read': read,
            'write': write,
        }
        self[system] = added
        return added

    def load_cache(self):
        if self.cache_file and os.path.isfile(self.cache_file):
            print("Loading cache from %s" % self.cache_file)
            self.update(json.load(open(self.cache_file, 'r')))

    def save_cache(self):
        if self.cache_file is not None and not os.path.isfile(self.cache_file):
            json.dump(self, open(self.cache_file, 'w'))
            print("Dumped to %s" % self.cache_file)

    def get_lustre(self, system):
        if system in self:
            return self.get(system)
        
        totals = {}
        for rw in 'read', 'write':
            tmp_df = tokio.tools.hdf5.get_dataframe_from_time_range(
                fsname=system,
                dataset_name='datatargets/%sbytes' % rw,
                datetime_start=self.date,
                datetime_end=(self.date + datetime.timedelta(days=1, seconds=-1)))
            if tmp_df is not None:
                totals[rw] = tmp_df.sum().sum()
            else:
                totals[rw] = -1.0

        return self._store_rw_bytes(system, totals['read'], totals['write'])
        
    def get_isdct(self, system):
        if system in self:
            return self.get(system)

        isdct_file = tokio.tools.common.enumerate_dated_files(
            start=self.date,
            end=(self.date + datetime.timedelta(days=1)),
            template=tokio.config.CONFIG['isdct_files'])

        zero_reads = 0
        zero_writes = 0
        read_tot = -1.0
        write_tot = -1.0
        if len(isdct_file) == 2:
            yesterday_isdct = tokio.connectors.nersc_isdct.NerscIsdct(isdct_file[0])
            today_isdct = tokio.connectors.nersc_isdct.NerscIsdct(isdct_file[-1])
            isdct_diff = today_isdct.diff(yesterday_isdct)#, report_zeros=False)
            for devicedata in isdct_diff['devices'].values():
                if 'data_units_written_bytes' not in devicedata:
                    zero_writes += 1
                if 'data_units_read_bytes' not in devicedata:
                    zero_reads += 1
                read_tot += devicedata.get('data_units_read_bytes', 0.0)
                write_tot += devicedata.get('data_units_written_bytes', 0.0)

        return self._store_rw_bytes(system, read_tot, write_tot)

    def get_hpss(self, system):
        if system in self:
            return self.get(system)
        
        hpss_file = tokio.tools.common.enumerate_dated_files(
            start=self.date,
            end=(self.date + datetime.timedelta(days=1, seconds=-1)),
            template=tokio.config.CONFIG['hpss_report_files'])
        if len(hpss_file) == 0:
            return self._store_rw_bytes(system, read=-0.0, write=-0.0)
        else:
            assert len(hpss_file) == 1
        hpss_dict = tokio.connectors.hpss.HpssDailyReport(hpss_file[0])
        totals = hpss_dict[system]['io totals by client application']['total']

        return self._store_rw_bytes(system, 
                             read=totals['read_gb'] * 2**30,
                             write=totals['write_gb'] * 2**30)

    def to_dataframe(self, *args, **kwargs):
        default_kwargs = {
            'orient': 'index'
        }
        default_kwargs.update(**kwargs)
        return pandas.DataFrame.from_dict(self, *args, **default_kwargs)

In [None]:
io_traffic = DailyTraffic(TARGET_DATE, use_caches=True)

## Collect data from storage systems

In [None]:
for lustre_fs in 'cscratch', 'scratch1', 'scratch2', 'scratch3', 'coribb':
    io_traffic.get_lustre(lustre_fs)

In [None]:
io_traffic.get_isdct('coribb')

In [None]:
io_traffic.get_hpss('archive')

## Summarize data

In [None]:
print(io_traffic)

In [None]:
summary_tibs = io_traffic.to_dataframe() / 2**40

In [None]:
fig, ax = matplotlib.pyplot.subplots(figsize=(8,6))

summary_tibs.T.plot(kind='bar', stacked=True, ax=ax, width=0.9)

ax.yaxis.grid()
ax.set_axisbelow(True)
ax.set_ylabel("Data Moved (TiB)")
ax.set_title("Storage Activity at NERSC on %s" % TARGET_DATE.strftime("%b %d, %Y"))

## Plot multiple dates

In [None]:
summary_df = io_traffic.to_dataframe(orient='columns')
summary_df['date'] = TARGET_DATE
summary_df['rw'] = summary_df.index.values
summary_df.index = list(range(len(summary_df)))

summary_df

In [None]:
DATE_START = datetime.datetime(2016, 1, 1)
DATE_END = datetime.datetime.now()
#DATE_END = DATE_START + datetime.timedelta(days=30)

In [None]:
summary_df = None
now = DATE_START
while now < DATE_END:
    print("Processing %s" % now)
    io_traffic = DailyTraffic(now, use_caches=True)
    
    try:
        for lustre_fs in 'cscratch', 'scratch1', 'scratch2', 'scratch3', 'coribb':
            try:
                io_traffic.get_lustre(lustre_fs)
            except OSError:
                pass
    #   io_traffic.get_isdct('coribb') 
        io_traffic.get_hpss('archive')

        now_df = io_traffic.to_dataframe(orient='columns')
        now_df['date'] = now.date()
        now_df['rw'] = now_df.index.values

        if summary_df is not None:
            summary_df = pandas.concat([summary_df, now_df])
        else:
            summary_df = now_df
        io_traffic.save_cache()
    except:
        pass
    now += datetime.timedelta(days=1)

summary_df.index = list(range(len(summary_df)))

In [None]:
filt_date = summary_df['date'] < datetime.datetime.now().date()

plot_df = summary_df[summary_df['date'] < datetime.datetime.now().date()]

data_cols = [x for x in list(summary_df.columns) if x != 'rw' and x != 'coribb' and x != 'date']

plot_dfs = {
     'read': plot_df[plot_df['rw'] == 'read'],
     'write': plot_df[plot_df['rw'] == 'write'],
}

for key, plot_df in plot_dfs.items():
    dt_index = pandas.DatetimeIndex(plot_df['date'])
    plot_dfs[key].set_index(dt_index, inplace=True)
    plot_dfs[key].replace(to_replace=-1.0, value=0.0, inplace=True)
    plot_dfs[key].fillna(value=0.0, inplace=True)
    
    # optional - resample to a different time interval
    plot_dfs[key] = plot_dfs[key].resample('M').sum()

In [None]:
UNIT = 2**40
UNIT_LABEL = "TiB"
YTICKWIDTH = 256

UNIT = 2**50
UNIT_LABEL = "PiB"
YTICKWIDTH = 10



fig, ax = matplotlib.pyplot.subplots(figsize=(12,8))

x = next(iter(plot_dfs.values())).index.values
y = {
    'read': numpy.array([0.0 for y in range(len(x))]),
    'write': numpy.array([0.0 for y in range(len(x))]),
}

ydirections = [
    {
        'y': 0.25,
        's': '%s Written' % UNIT_LABEL,
    },
    {
        'y': 0.75,
        's': '%s Read' % UNIT_LABEL,
    },
]

for rw, plot_df in plot_dfs.items():
    for isys, system in enumerate(sorted(data_cols)):
        this = y[rw] + plot_df[system].values / UNIT * (-1.0 if rw == 'write' else 1.0)
        ax.fill_between(x, y[rw], this,
                        label="%s" % (system) if rw == 'read' else None,
                        color='C%d' % (isys % len(plot_df.columns)),
                       )
        y[rw] = this

# Make the y axis mirrored
xmin, xmax = ax.get_xlim()
ymin, ymax = ax.get_ylim()
ymax = max(abs(ymin), ymax)
ax.set_ylim(-ymax, ymax)

# Draw the zero point
ax.plot((xmin, xmax), (0, 0), ls='-', color='black')
ax.set_xlim(min(x), max(x))

# Make the tick marks more sensible
ymax = int(min(ymax,max(ax.get_yticks())))
new_yticks = [-1 * y for y in list(range(0, ymax + YTICKWIDTH, YTICKWIDTH))[::-1][:-1]] + list(range(0, ymax + YTICKWIDTH, YTICKWIDTH))
ax.set_yticks(new_yticks)
ax.set_yticklabels([abs(y) for y in new_yticks])
ax.grid()
ax.set_axisbelow(True)

for kwargs in ydirections:
    ax.text(x=-0.1,
            ha='center',
            va='center',
            rotation=90,
            transform=ax.transAxes,
             **kwargs)

ax.legend(ncol=2, loc='lower right') # 'upper right' if writes are greater than reads

fig.autofmt_xdate()