In [None]:
from __future__ import absolute_import, print_function, division

# Putting what you've learnt together:

### Building an mini-ANTARES (ish) Pipeline for Data Management and Discovery
========

#### Version 0.2

***
By AA Miller 2017 Apr 10

Edited G. Narayan 2017 Apr 26

As we just saw LSST will produce an unprecedented volume of time-domain information for the astronomical sky. $>37$ trillion individual photometric measurements will be recorded. While the vast, vast majority of these measurements will simply confirm the status quo, some will represent rarities that have never been seen before (e.g., LSST may be the first telescope to discover the electromagnetic counterpart to a LIGO graviational wave event), which the community will need to know about in ~real time. 

Storing, filtering, and serving this data is going to be a huge <del>nightmare</del> challenge. ANTARES is one proposed solution to this challenge. In this exercise you will build a miniature version of ANTARES, which will require the application of several of the lessons from earlier this week. 


Many of the difficult, and essential, steps necessary for ANTARES will be skipped here as they are too time consuming or beyond the scope of what we have previously covered. We will point out these challenges are we come across them.

---

## 0) There is a README.MD file along with this notebook.

Before running this notebook you will need to follow the instructions in the README to setup your notebook and download the data.

---

In [1]:
!tar -xzf training_set_for_LSST_DSFP.tar.gz
!tar -xzf test_set_for_LSST_DSFP.tar.gz

If everything worked OK up to this point, congratulations! You now have access to a (mini) parallel computing environment of your own.

In [1]:
# first we need to construct a client that will interface with our cluster
from ipyparallel import Client, require
worker = Client()

In [2]:
# once we create a client, we can decide how to allocate tasks across the cluster
# we've got however many 'engines' you started in the cluster
# lets just use all of them

lview = worker[:]

# now if you want to import packages, you can import them across all the 'engines'
with lview.sync_imports():    
    import numpy as np
    import scipy.stats as spstat
    import pandas as pd
    import os

# there's not much point in creating plots on the engines - we want to actually see them presumably
import matplotlib.pyplot as plt
%matplotlib notebook

# If everything has worked so far, you should see a list of worker IDs, and a corresponding list of process IDs.
# You can verify it with a `ps`
ar = lview.apply_async(os.getpid)
print("Engine ID:PID mapping: ", ar.get_dict())

importing numpy on engine(s)
importing scipy.stats on engine(s)
importing pandas on engine(s)
importing os on engine(s)
Engine ID:PID mapping:  {0: 27248, 1: 27261, 2: 27263, 3: 27266}


## Problem 1) Light Curve Data

We begin by ignoring the streaming aspect of the problem and instead we will work with full light curves. One way of thinking about this is that we're working in a mode where LSST has been imaging this field at least a few times, so we have some historical data on it, which we've already associated with an alert from LSST.

The collection of light curves has been curated by Gautham, and like LSST, it features objects of different types covering a large range in brightness and observations in multiple filters taken at different cadences.

As the focus of this exercise is the construction of a data management pipeline, we have already created a Python `class` to read in the data and store light curves as objects. The data are stored in flat text files with the following format:

|t               |pb   |flux        |dflux       |
|:--------------:|:---:|:----------:|-----------:|
|   56254.160000 |  i  |   6.530000 |   4.920000 |
|   56254.172000 |  z  |   4.113000 |   4.018000 |
|   56258.125000 |  g  |   5.077000 |  10.620000 |
|   56258.141000 |  r  |   6.963000 |   5.060000 |
|       .        |  .  |     .      |      .     |
|       .        |  .  |     .      |      .     |
|       .        |  .  |     .      |      .     |

and names `FAKE0XX.dat` where the `XX` is a running index from `01` to `99`. 

**Problem 1a**

Read in the data for the first light curve file and plot the $g'$ light curve for that source.

*Hint* - Use `pandas` or `numpy`.

In [4]:
!head training_set_for_LSST_DSFP/FAKE001.dat

#orig_file:/mnt/data/antares/aux/DES_SIM/DES_BLIND+HOSTZ/DES_SN152926.DAT
#ID:152926
#type:-9
#zgal:0.9080
#zgalerr:0.0199
              t   pb        flux       dflux
   56254.160000   i     6.530000    4.920000
   56254.172000   z     4.113000    4.018000
   56258.125000   g     5.077000   10.620000
   56258.141000   r     6.963000    5.060000


In [9]:
# execute this cell
from astropy import table

#lc = pd.read_csv('training_set_for_LSST_DSFP/FAKE001.dat', delim_whitespace=True, comment = '#')
lc = table.Table.read('training_set_for_LSST_DSFP/FAKE001.dat', format='ascii')

plt.errorbar(np.array(lc['t'][lc['pb'] == 'g']), 
             np.array(lc['flux'][lc['pb'] == 'g']), 
             np.array(lc['dflux'][lc['pb'] == 'g']), fmt = 'o', color = 'green')
plt.xlabel('MJD')
plt.ylabel('flux')

<IPython.core.display.Javascript object>

<matplotlib.text.Text at 0x11c51d748>

As we have many light curve files (in principle as many as 37 billion...), we will define a light curve class to ease our handling of the data.

** Problem 1b** 

Fix the `lc` class definition below.

*Hint* - the only purpose of this problem is to make sure you actually read each line of code below, it is not intended to be difficult.

In [15]:
class ANTARESlc():
    '''Light curve object for NOAO formatted data'''
    
    def __init__(self, filename):
        DFlc = pd.read_csv(filename, delim_whitespace=True, comment = '#')
        self.DFlc = DFlc
        self.filename = filename
        
    def plot_multicolor_lc(self):
        '''Plot the 4 band light curve'''
        fig, ax = plt.subplots()
        g = ax.errorbar(self.DFlc['t'].ix[self.DFlc['pb'] == 'g'], 
                     self.DFlc['flux'].ix[self.DFlc['pb'] == 'g'],
                     self.DFlc['dflux'].ix[self.DFlc['pb'] == 'g'],
                     fmt = 'o', color = '#78A5A3', label = r"$g'$")
        r = ax.errorbar(self.DFlc['t'].ix[self.DFlc['pb'] == 'r'], 
                     self.DFlc['flux'].ix[self.DFlc['pb'] == 'r'],
                     self.DFlc['dflux'].ix[self.DFlc['pb'] == 'r'],
                     fmt = 'o', color = '#CE5A57', label = r"$r'$")
        i = ax.errorbar(self.DFlc['t'].ix[self.DFlc['pb'] == 'i'], 
                     self.DFlc['flux'].ix[self.DFlc['pb'] == 'i'],
                     self.DFlc['dflux'].ix[self.DFlc['pb'] == 'i'],
                     fmt = 'o', color = '#E1B16A', label = r"$i'$")
        z = ax.errorbar(self.DFlc['t'].ix[self.DFlc['pb'] == 'z'], 
                     self.DFlc['flux'].ix[self.DFlc['pb'] == 'z'],
                     self.DFlc['dflux'].ix[self.DFlc['pb'] == 'z'],
                     fmt = 'o', color = '#444C5C', label = r"$z'$")
        ax.legend(fancybox = True)
        ax.set_xlabel(r"$\mathrm{MJD}$")
        ax.set_ylabel(r"$\mathrm{flux}$")

In [139]:
class ANTARESLightCurve():
    '''Light curve object for NOAO formatted data'''
    
    def __init__(self, filename):
        #DFlc = pd.read_csv(filename, delim_whitespace=True, comment = '#')
        self.DFlc = table.Table.read(filename, format='ascii')
        self.filename = filename
        
    def plot_multicolor_lc(self):
        '''Plot the 4 band light curve'''
        fig, ax = plt.subplots()
        g = ax.errorbar(self.DFlc['t'][self.DFlc['pb'] == 'g'], 
                     self.DFlc['flux'][self.DFlc['pb'] == 'g'],
                     self.DFlc['dflux'][self.DFlc['pb'] == 'g'],
                     fmt = 'o', color = '#78A5A3', label = r"$g'$")
        r = ax.errorbar(self.DFlc['t'][self.DFlc['pb'] == 'r'], 
                     self.DFlc['flux'][self.DFlc['pb'] == 'r'],
                     self.DFlc['dflux'][self.DFlc['pb'] == 'r'],
                     fmt = 'o', color = '#CE5A57', label = r"$r'$")
        i = ax.errorbar(self.DFlc['t'][self.DFlc['pb'] == 'i'], 
                     self.DFlc['flux'][self.DFlc['pb'] == 'i'],
                     self.DFlc['dflux'][self.DFlc['pb'] == 'i'],
                     fmt = 'o', color = '#E1B16A', label = r"$i'$")
        z = ax.errorbar(self.DFlc['t'][self.DFlc['pb'] == 'z'], 
                     self.DFlc['flux'][self.DFlc['pb'] == 'z'],
                     self.DFlc['dflux'][self.DFlc['pb'] == 'z'],
                     fmt = 'o', color = '#444C5C', label = r"$z'$")
        ax.legend(fancybox = True)
        ax.set_xlabel(r"$\mathrm{MJD}$")
        ax.set_ylabel(r"$\mathrm{flux}$")

**Problem 1c**

Confirm the corrections made in **1b** by plotting the multiband light curve for the source `FAKE010`.

In [137]:
%%time
lc = ANTARESlc('training_set_for_LSST_DSFP/FAKE010.dat')

lc.plot_multicolor_lc() # complete

<IPython.core.display.Javascript object>

CPU times: user 72.5 ms, sys: 3.36 ms, total: 75.9 ms
Wall time: 75.3 ms


In [144]:
%%time
lc = ANTARESLightCurve('training_set_for_LSST_DSFP/FAKE010.dat')

lc.plot_multicolor_lc() # complete

<IPython.core.display.Javascript object>

CPU times: user 73.1 ms, sys: 5.05 ms, total: 78.2 ms
Wall time: 75.4 ms


One thing that we brushed over previously is that the brightness measurements have units of flux, rather than the traditional use of magnitudes. The reason for this is that LSST will measure flux variations via image differencing, which will for some sources in some filters result in a measurement of *negative flux*. (You may have already noticed this in **1a**.) Statistically there is nothing wrong with such a measurement, but it is impossible to convert a negative flux into a magnitude. Thus we will use flux measurements throughout this exercise. 

[Aside - if you are bored during the next break, I'd be happy to rant about why we should have ditched the magnitude system years ago.]

Using flux measurements will allow us to make unbiased measurements of the statistical distributions of the variations of the sources we care about. 

**Problem 1d**

What is `FAKE010` the source that is plotted above?

*Hint 1* - "That's no moon!".

*Hint 2* - ask Szymon or Tomas... 

**Solution 1d**

Probably a space station? Or a cosmic ray...

**Problem 1e**

To get a better sense of the data, plot the multiband light curves for sources `FAKE060` and `FAKE073`.

In [30]:
lc60  = ANTARESLightCurve('training_set_for_LSST_DSFP/FAKE060.dat')
lc60.plot_multicolor_lc()

<IPython.core.display.Javascript object>

In [31]:
lc73  = ANTARESLightCurve('training_set_for_LSST_DSFP/FAKE073.dat')
lc73.plot_multicolor_lc()

<IPython.core.display.Javascript object>

## Problem 2) Data Preparation

While we could create a database table that includes every single photometric measurement made by LSST, this ~37 trillion row db would be enormous without providing a lot of added value beyond the raw flux measurements [while this table is necessary, alternative tables may provide more useful]. Furthermore, extracting individual light curves from such a database will be slow. Instead, we are going to develop summary statistics for every source which will make it easier to select individual sources and develop classifiers to identify objects of interest.  

Below we will redefine the `ANTARESlc` class to include additional methods so we can (eventually) store summary statistics in a database table. In the interest of time, we limit the summary statistics to a relatively small list all of which have been shown to be useful for classification (see [Richards et al. 2011](http://iopscience.iop.org/article/10.1088/0004-637X/733/1/10/meta) for further details). The statistics that we include (for now) are: 

1. `Std` -- the standard deviation of the flux measurements 
2. `Amp` -- the amplitude of flux deviations
3. `MAD` -- the median absolute deviation of the flux measurements
4. `beyond1std` -- the fraction of flux measurements beyond 1 standard deviation
5. the mean $g' - r'$, $r' - i'$, and $i' - z'$ color


**Problem 2a**

Read through the expandanded definition of the `ANTARESlc` class. Feel free to improve the doc strings.

*food for thought* - if a source is observed in different filters but the observations are not simultaneous (or quasi-simultaneous), what is the meaning of a "mean color"?

*Solution to food for thought* - in this case we simply want to take the mean flux in each filter and create a statistic that is $-2.5 \log \frac{\langle f_X \rangle}{\langle f_{Y} \rangle}$, where ${\langle f_{Y} \rangle}$ is the mean flux in band $Y$, while $\langle f_X \rangle$ is the mean flux in band $X$, which can be $g', r', i', z'$. Note that our use of image-difference flux measurements, which can be negative, means you'll need to add some form a case excpetion if $\langle f_X \rangle$ or $\langle f_Y \rangle$ is negative. In these cases set the color to -999.

In [143]:


class ANTARESLightCurve():
    '''Light curve object for NOAO formatted data'''
    
    def __init__(self, dataname):
        '''Read in light curve data'''
        if isinstance(dataname, str):
            self.filename = dataname
        else:
            self.filename = 'training_set_for_LSST_DSFP/FAKE{:03}.dat'.format(dataname)
        
        self.light_curve_data = table.Table.read(self.filename, format='ascii')
        self.compute_filter_flux()
        
    def plot_multicolor_lc(self):
        '''Plot the 4 band light curve'''
        fig, ax = plt.subplots()
        
        for band, color in zip('griz', ['#78A5A3', '#CE5A57', '#E1B16A', '#444C5C']):
            ax.errorbar(self.light_curve_data['t'][self.light_curve_data['pb'] == band],
                        self.light_curve_data['flux'][self.light_curve_data['pb'] == band],
                        self.light_curve_data['dflux'][self.light_curve_data['pb'] == band],
                        fmt='o', color=color, label="${}'$".format(band))
            
        ax.legend(fancybox = True)
        ax.set_xlabel(r"$\mathrm{MJD}$")
        ax.set_ylabel(r"$\mathrm{flux}$")
        
    def compute_filter_flux(self):
        '''Store individual passband fluxes as object attributes'''
        
        data = self.light_curve_data
        for band in 'griz':
            setattr(self, band+'time', data['t'][data['pb'] == band])
            setattr(self, band+'_flux', data['flux'][data['pb'] == band])
            setattr(self, band+'_flux_unc', data['dflux'][data['pb'] == band])
        

    def compute_weighted_mean_flux(self):
        '''Measure (SNR weighted) mean flux in griz'''
        
        for band in 'griz':
            flux = getattr(self, band + '_flux')
            unc = getattr(self, band + '_flux_unc')
            setattr(self, band + '_mean', np.sum(flux*(flux/unc)**2)/np.sum((flux/unc)**2))

    def compute_normalized_flux_std(self):
        '''Measure standard deviation of flux in griz'''

        if not hasattr(self, 'g_mean'):
            self.compute_weighted_mean_flux()
        
        for band in 'griz':
            flux = getattr(self, band + '_flux')
            mean = getattr(self, band + '_mean')
            setattr(self, band + '_std',  np.std(flux/mean, ddof = 1) if len(flux) > 1 else -999)

    def compute_normalized_amplitude(self):
        '''Measure the normalized amplitude of variations in griz'''

        if not hasattr(self, 'g_mean'):
            self.compute_weighted_mean_flux()
        
        for band in 'griz':
            flux = getattr(self, band + '_flux')
            mean = getattr(self, band + '_mean')
            setattr(self, band + '_amp', (np.max(flux) - np.min(flux))/mean)

    def compute_normalized_MAD(self):
        '''Measure normalized Median Absolute Deviation (MAD) in griz'''
        
        for band in 'griz':
            setattr(self, band + '_MAD', astropy.stats.median_absolute_deviation(getattr(self, band + '_flux')))
            

    def compute_normalized_beyond_1std(self):
        '''Measure fraction of flux measurements beyond 1 std'''

        if not hasattr(self, 'g_mean'):
            self.compute_weighted_mean_flux()
        
        for band in 'griz':
            flux = getattr(self, band + '_flux')
            mean = getattr(self, band + '_mean')
            setattr(self, band + '_beyond_1std', sum(np.abs(flux - mean) > np.std(flux, ddof = 1))/len(flux))
    
    def compute_skew(self):
        '''Measure the skew of the flux measurements'''
        for band in 'griz':
            setattr(self, band + '_skew', spstat.skew(getattr(self, band + '_flux')))
        
    def compute_mean_colors(self):
        '''Measure the mean g-r, g-i, and g-z colors'''

        if not hasattr(self, 'g_mean'):
            self.compute_weighted_mean_flux()
        
        self.g_minus_r = -2.5*np.log10(self.g_mean/self.r_mean) if self.g_mean > 0 and self.r_mean > 0 else -999
        self.r_minus_i = -2.5*np.log10(self.r_mean/self.i_mean) if self.r_mean > 0 and self.i_mean > 0 else -999
        self.i_minus_z = -2.5*np.log10(self.i_mean/self.z_mean) if self.i_mean > 0 and self.z_mean > 0 else -999

**Problem 2b**

Confirm your solution to **2a** by measuring the mean colors of source `FAKE010`. Does your measurement make sense given the plot you made in **1c**?

In [62]:
lc = ANTARESLightCurve(10)# complete

lc.compute_mean_colors() # complete

print("The g'-r', r'-i', and 'i-z' colors are: "
      "{:.3f}, {:.3f}, and {:.3f}, respectively.".format(lc.g_minus_r, lc.r_minus_i, lc.i_minus_z))

The g'-r', r'-i', and 'i-z' colors are: 1.048, -0.191, and -0.193, respectively.


## Problem 3) Store the sources in a database

Building (and managing) a database from scratch is a challenging task. For (very) small projects one solution to this problem is to use [`SQLite`](http://sqlite.org/), which is a self-contained, publicly available SQL engine. One of the primary advantages of `SQLite` is that no server setup is required, unlike other popular tools such as postgres and MySQL. In fact, `SQLite` is already integrated with python so everything we want to do (create database, add tables, load data, write queries, etc.) can be done within Python.

Without diving too deep into the details, here are situations where `SQLite` has advantages and disadvantages [according to their own documentation](http://sqlite.org/whentouse.html):

*Advantages*

1. Situations where expert human support is not needed
2. For basic data analysis (`SQLite` is easy to install and manage for new projects)
3. Education and training

*Disadvantages*

1. Client/Server applications (`SQLite` does not behave well if multiple systems need to access db at the same time)
2. Very large data sets (`SQLite` stores entire db in a single disk file, other solutions can store data across multiple files/volumes)
3. High concurrency (Only 1 writer allowed at a time for `SQLite`)

From the (limited) lists above, you can see that while `SQLite` is perfect for our application right now, if you were building an actual ANTARES-like system a more sophisticated database solution would be required.  

**Problem 3a**

Import sqlite3 into the notebook. 

*Hint* - if this doesn't work, you may need to `conda install sqlite3` or `pip install sqlite3`.

In [63]:
import sqlite3

Following the `sqlite3` import, we must first connect to the database. If we attempt a connection to a database that does not exist, then a new database is created. Here we will create a new database file, called `miniANTARES.db`.

In [64]:
conn = sqlite3.connect("miniANTARES.db")

We now have a database connection object, `conn`. To interact with the database (create tables, load data, write queries) we need a cursor object.

In [65]:
cur = conn.cursor()

Now that we have a cursor object, we can populate the database. As an example we will start by creating a table to hold all the raw photometry (though ultimately we will not use this table for analysis).

*Note* - there are many cursor methods capable of interacting with the database. The most common, [`execute`](https://docs.python.org/3/library/sqlite3.html#sqlite3.Cursor.execute), takes a single `SQL` command as its argument and executes that command. Other useful methods include [`executemany`](https://docs.python.org/3/library/sqlite3.html#sqlite3.Cursor.executemany), which is useful for inserting data into the database, and [`executescript`](https://docs.python.org/3/library/sqlite3.html#sqlite3.Cursor.executescript), which take an `SQL` script as its argument and executes the script.

In many cases, as below, it will be useful to use triple quotes in order to improve the legibility of your code.

In [66]:
cur.execute("""drop table if exists rawPhot""") # drop the table if is already exists
cur.execute("""create table rawPhot(
                id integer primary key,
                objId int,
                t float, 
                pb varchar(1),
                flux float,
                dflux float) 
""")

<sqlite3.Cursor at 0x136ac83b0>

Let's unpack everything that happened in these two commands. First - if the table `rawPhot` already exists, we drop it to start over from scratch. (this is useful here, but should not be adopted as general practice)

Second - we create the new table `rawPhot`, which has 6 columns: `id` - a running index for every row in the table, `objId` - an ID to identify which source the row belongs to, `t` - the time of observation in MJD, `pb` - the passband of the observation, `flux` the observation flux, and `dflux` the uncertainty on the flux measurement. In addition to naming the columns, we also must declare their type. We have declared `id` as the primary key, which means this value will automatically be assigned and incremented for all data inserted into the database. We have also declared `pb` as a variable character of length 1, which is more useful and restrictive than simply declaring `pb` as `text`, which allows any freeform string.

Now we need to insert the raw flux measurements into the database. To do so, we will use the `ANTARESlc` class that we created earlier. As an initial example, we will insert the first 3 observations from the source `FAKE010`.

In [72]:
filename = "training_set_for_LSST_DSFP/FAKE001.dat"
lc = ANTARESLightCurve("training_set_for_LSST_DSFP/FAKE001.dat")
objId = int(filename.split('FAKE')[1].split(".dat")[0])
"""insert into rawPhot(objId, t, pb, flux, dflux) values {}""".format((objId,) + tuple(lc.light_curve_data[0]))

"insert into rawPhot(objId, t, pb, flux, dflux) values (1, 56254.160000000003, 'i', 6.5300000000000002, 4.9199999999999999)"

In [None]:
filename = "training_set_for_LSST_DSFP/FAKE001.dat"
lc = ANTARESLightCurve(filename)

objId = int(filename.split('FAKE')[1].split(".dat")[0])

cur.execute("""insert into rawPhot(objId, t, pb, flux, dflux) values {}""".format((objId,) + tuple(lc.DFlc.ix[0])))
cur.execute("""insert into rawPhot(objId, t, pb, flux, dflux) values {}""".format((objId,) + tuple(lc.DFlc.ix[1])))
cur.execute("""insert into rawPhot(objId, t, pb, flux, dflux) values {}""".format((objId,) + tuple(lc.DFlc.ix[2])))

There are two things to highlight above: (1) we do not specify an id for the data as this is automatically generated, and (2) the data insertion happens via a tuple. In this case, we are taking advantage of the fact that a Python tuple is can be concatenated:

    (objId,) + tuple(lc10.DFlc.ix[0]))
    
While the above example demonstrates the insertion of a single row to the database, it is far more efficient to bulk load the data. To do so we will delete, i.e. `DROP`, the rawPhot table and use some `pandas` manipulation to load the contents of an entire file at once via [`executemany`](https://docs.python.org/3/library/sqlite3.html#sqlite3.Cursor.executemany).

In [77]:
cur.execute("""drop table if exists rawPhot""") # drop the table if it already exists
cur.execute("""create table rawPhot(
                id integer primary key,
                objId int,
                t float, 
                pb varchar(1),
                flux float,
                dflux float) 
""")

# next 3 lines are already in name space; repeated for clarity
filename = "training_set_for_LSST_DSFP/FAKE001.dat"
lc = ANTARESLightCurve(filename)
objId = int(filename.split('FAKE')[1].split(".dat")[0])

data = [(objId,) + tuple(x) for x in lc.light_curve_data] # array of tuples

cur.executemany("""insert into rawPhot(objId, t, pb, flux, dflux) values (?,?,?,?,?)""", data)

<sqlite3.Cursor at 0x136ac83b0>

Load all of the raw photometric observations into the `rawPhot` table in the database.
Normally, you'd load these files one by one in serial, looping over each file. This is what happens in `miniAntares_serial.ipynb`. Instead, we're going to use our cluster to parallelize this operation.

**Problem 3b**
First, build a list of filenames:  

*Hint* - you can use [`glob`](https://docs.python.org/3/library/glob.html) to select all of the files being loaded.

*Hint 2* - you have already loaded the data from `FAKE001` into the table.

In [96]:
# build your list of filenames here
import os
import glob

filenames = glob.glob("training_set_for_LSST_DSFP/FAKE???.dat")
filenames = [os.path.abspath(fn) for fn in filenames]

In [97]:
# execute this
@require(ANTARESLightCurve, 'astropy.table', 'astropy.stats')
def load_data(filename):
    lc = ANTARESLightCurve(filename)
    objId = int(filename.split('FAKE')[1].split(".dat")[0])
    data = [(objId,) + tuple(x) for x in lc.light_curve_data] # array of tuples
    return data

Now, figure out how to use the `DirectView` access to the cluster to map this function on to all the filenames, and get the results. Examine a few of them.

In [98]:
result = lview.map(load_data, filenames[1:])
all_data = result.get()

And now that you have results, load them in the database as before.

In [99]:
for data in all_data: 
    cur.executemany("""insert into rawPhot(objId, t, pb, flux, dflux) values (?,?,?,?,?)""", data)

**Problem 3c**

To ensure the data have been loaded properly, select the $g'$ light curve for source `FAKE061` from the `rawPhot` table and plot the results. Does it match the plot from **1c**?

In [145]:
cur.execute('SELECT t, flux, dflux from rawPhot where pb == "g" and objId==61') # complete

data = cur.fetchall()
t, flux, dflux = np.array(data).T

fig, ax = plt.subplots()
ax.errorbar(t, flux, dflux, fmt='.') # complete
ax.set_xlabel(r"$\mathrm{MJD}$")
ax.set_ylabel(r"$\mathrm{flux}$")

<IPython.core.display.Javascript object>

<matplotlib.text.Text at 0x120c81fd0>

Now that we have loaded the raw observations, we need to create a new table to store summary statistics for each object. This table will include everything we've added to the `ANTARESlc` class. 

In [112]:
cur.execute("""drop table if exists lcFeats""") # drop the table if it already exists
cur.execute("""create table lcFeats(
                id integer primary key,
                objId int,
                gStd float,
                rStd float,
                iStd float,
                zStd float,
                gAmp float, 
                rAmp float, 
                iAmp float, 
                zAmp float, 
                gMAD float,
                rMAD float,
                iMAD float,                
                zMAD float,                
                gBeyond float,
                rBeyond float,
                iBeyond float,
                zBeyond float,
                gSkew float,
                rSkew float,
                iSkew float,
                zSkew float,
                gMinusR float,
                rMinusI float,
                iMinusZ float,
                FOREIGN KEY(objId) REFERENCES rawPhot(objId)
                ) 
""")

<sqlite3.Cursor at 0x136ac83b0>

This procedure should look familiar to above, with one exception: the addition of the `foreign key` in the `lcFeats` table. The inclusion of the `foreign key` ensures a connected relationship between `rawPhot` and `lcFeats`. In brief, a row cannot be inserted into `lcFeats` unless a corresponding row, i.e. `objId`, exists in `rawPhot`. Additionally, rows in `rawPhot` cannot be deleted if there are dependent rows in `lcFeats`. 

**Problem 3d**

Calculate features for every source in `rawPhot` and insert those features into the `lcFeats` table. You'll first need a function to calculate the features.

In [121]:
# just like last time, we'll define a function to calculate the features for one lightcurve
# execute this

@require(ANTARESLightCurve, 'numpy as np', 'scipy.stats as spstat', 'astropy.table', 'astropy.stats')
def calculate_features(filename):
    lc = ANTARESLightCurve(filename)
    objId = int(filename.split('FAKE')[1].split(".dat")[0])
    lc.compute_weighted_mean_flux()
    lc.compute_normalized_flux_std()
    lc.compute_normalized_amplitude()
    lc.compute_normalized_MAD()
    lc.compute_normalized_beyond_1std()
    lc.compute_skew()
    lc.compute_mean_colors()
    feats = (objId, lc.g_std, lc.r_std, lc.i_std, lc.z_std, 
            lc.g_amp,  lc.r_amp,  lc.i_amp,  lc.z_amp,  
            lc.g_MAD, lc.r_MAD, lc.i_MAD, lc.z_MAD, 
            lc.g_beyond_1std, lc.r_beyond_1std, lc.i_beyond_1std, lc.z_beyond_1std,
            lc.g_skew, lc.r_skew, lc.i_skew, lc.z_skew, 
            lc.g_minus_r, lc.r_minus_i, lc.i_minus_z)
    return feats

And again, map the function onto the filenames, and load the results into the table.

In [122]:
# and then lets map the function onto all the data
result = lview.map(calculate_features, filenames)

# and get the result
all_feats = result.get()

# and now load it all into our table
for feats in all_feats:
    cur.execute("""insert into lcFeats(objId, 
                                       gStd, rStd, iStd, zStd, 
                                       gAmp,  rAmp,  iAmp,  zAmp,  
                                       gMAD, rMAD, iMAD, zMAD, 
                                       gBeyond, rBeyond, iBeyond, zBeyond,
                                       gSkew, rSkew, iSkew, zSkew,
                                       gMinusR, rMinusI, iMinusZ) values {}""".format(feats))

**Problem 3e**

Confirm that the data loaded correctly by counting the number of sources with `gAmp` > 2.

How many sources have `gMinusR` = -999?

*Hint* - you should find 9 and 2, respectively.

In [134]:
cur.execute('SELECT COUNT(*) from lcFeats where gAmp > 2')
nAmp2 =  cur.fetchall()[0][0]

cur.execute('SELECT COUNT(*) from lcFeats where gMinusR == -999') 
nNoColor =  cur.fetchall()[0][0]

print("There are {:d} sources with gAmp > 2".format(nAmp2))
print("There are {:d} sources with no measured i' - z' color".format(nNoColor))

There are 9 sources with gAmp > 2
There are 2 sources with no measured i' - z' color


Finally, we close by commiting the changes we made to the database.

Note that strictly speaking this is not needed, however, were we to update any values in the database then we would need to commit those changes.

In [135]:
conn.commit()

**mini Challenge Problem**

If there is less than 45 min to go, please skip this part. 

Earlier it was claimed that bulk loading the data is faster than loading it line by line. For this problem - prove this assertion, use `%%timeit` to "profile" the two different options (bulk load with `executemany` and loading one photometric measurement at a time via for loop).

Compare both serial solutions, to load the data in parallel.

*Hint* - to avoid corruption of your current working database, `miniANTARES.db`, create a new temporary database for the pupose of running this test. Also be careful with the names of your connection and cursor variables.

In [None]:
%%timeit
# bulk load solution

tmp_conn = # complete
tmp_cur = # complete

tmp_cur.execute( # complete

for filename in filenames: 
    # complete
    # complete
    # complete
    # complete
    # complete

In [None]:
%%timeit
# bulk load solution

tmp_conn = # complete
tmp_cur = # complete

tmp_cur.execute( # complete

for filename in filenames: 
    # complete
    # complete
    # complete
    # complete
    # complete

## Problem 4) Build a Classification Model

One of the primary goals for ANTARES is to separate the Wheat from the Chaff, in other words, given that ~10 million alerts will be issued by LSST on a nightly basis, what is the single (or 10, or 100) most interesting alert.

Here we will build on the skills developed during the DSFP Session 2 to construct a machine-learning model to classify new light curves. 

Fortunately - the data that has already been loaded to miniANTARES.db is a suitable training set for the classifier (we simply haven't provided you with labels just yet). Execute the cell below to add a new table to the database which includes the appropriate labels.

In [136]:
cur.execute("""drop table if exists lcLabels""") # drop the table if it already exists
cur.execute("""create table lcLabels(
               objId int,
               label int, 
               foreign key(objId) references rawPhot(objId)
               )""")

labels = np.zeros(100)
labels[20:60] = 1
labels[60:] = 2

data = np.append(np.arange(1,101)[np.newaxis].T, labels[np.newaxis].T, axis = 1)
tup_data = [tuple(x) for x in data]

cur.executemany("""insert into lcLabels(objId, label) values (?,?)""", tup_data)

<sqlite3.Cursor at 0x136ac83b0>

For now - don't worry about what the labels mean (though if you inspect the light curves you may be able to figure this out...)

**Problem 4a**

Query the database to select features and labels for the light curves in your training set. Store the results of these queries in `numpy` arrays, `X` and `y`, respectively, which are suitable for the various `scikit-learn` machine learning algorithms.

*Hint* - recall that databases do not store ordered results.

*Hint 2* - recall that `scikit-learn` expects `y` to be a 1d array. You will likely need to convert a 2d array to 1d. 

In [None]:
cur.execute( # complete
y = # complete

cur.execute(# complete
    
X = # complete

**Problem 4b**

Train a SVM model ([`SVC`](http://scikit-learn.org/stable/modules/generated/sklearn.svm.SVC.html#sklearn.svm.SVC) in `scikit-learn`) using a radial basis function (RBF) kernel with penalty parameter, $C = 1$, and kernel coefficient, $\gamma = 0.1$.

Evaluate the accuracy of the model via $k = 5$ fold cross validation. 

*Hint* - you may find the [`model_selection`](http://scikit-learn.org/stable/modules/generated/sklearn.model_selection.cross_val_score.html#sklearn.model_selection.cross_val_score) module helpful.

In [None]:
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score

cv_scores = # complete

print("The SVM model produces a CV accuracy of {:.4f}".format( # complete

The SVM model does a decent job of classifying the data. However - we are going to have 10 million alerts every night. Therefore, we need something that runs quickly. For most ML models the training step is slow, while predictions (relatively) are fast. 

**Problem 4c**

Pick any other [classification model from `scikit-learn`](http://scikit-learn.org/stable/supervised_learning.html), and "profile" the time it takes to train that model vs. the time it takes to train an SVM model.

Is the model that you have selected faster than SVM?

*Hint* - you should import the model outside your timing loop as we only care about the training step in this case.

In [None]:
from sklearn.ensemble import RandomForestClassifier
rf_clf = # complete
svm_clf = # complete

In [None]:
%%timeit
# timing solution for RF model
# complete

In [None]:
%%timeit
# timing solution for SVM model
# complete

**Problem 4d**

Does the model you selected perform better than the SVM model? Perform a $k = 5$ fold cross validation to determine which model provides superior accuracy.

In [None]:
cv_scores = cross_val_score( # complete
    
print("The RF model produces a CV accuracy of {:.4f}".format( # complete

**Problem 4e**

Which model are you going to use in your miniANTARES? Justify your answer. 

*Write solution to **4e** here*


## Problem 5) Class Predictions for New Sources

Now that we have developed a basic infrastructure for dealing with streaming data, we may reap the rewards of our efforts. We will use our ANTARES-like software to classify newly observed sources.

**Problem 5a**

Load the light curves and features for the new observations (found in `full_testset_for_LSST_DSP`) into new tables, called `testPhot` and `testFeats`, respectively, in the database. 

In [None]:
cur.execute("""drop table if exists testPhot""") # drop the table if is already exists
cur.execute( # complete
cur.execute("""drop table if exists testFeats""") # drop the table if it already exists
cur.execute( # complete

In [None]:
new_obs_filenames = glob.glob("test_set_for_LSST_DSFP/FAKE*.dat")

result = lview.map(load_data, new_obs_filenames)
all_data = result.get()

In [None]:
for data in all_data: 
    cur.executemany("""insert into rawPhot(objId, t, pb, flux, dflux) values (?,?,?,?,?)""", data)

**Problem 5b**

Calculate features for the new observations and insert those features into a database table.

In [None]:
result = lview.map( # complete
all_feats = result.get()

In [None]:
for feats in all_feats: 
    cur.execute("""insert into testFeats(objId, 
                                       gStd, rStd, iStd, zStd, 
                                       gAmp,  rAmp,  iAmp,  zAmp,  
                                       gMAD, rMAD, iMAD, zMAD, 
                                       gBeyond, rBeyond, iBeyond, zBeyond,
                                       gSkew, rSkew, iSkew, zSkew,
                                       gMinusR, rMinusI, iMinusZ) values {}""".format(feats))

**Problem 5c**

Train the model that you adopted in **4e** on the training set, and produce predictions for the newly observed sources.

What is the class distribution for the newly detected sources?

*Hint* - the training set was constructed to have a nearly uniform class distribution, that may not be the case for the actual observed distribution of sources.

In [None]:
svm_clf = SVC(C=1.0, gamma = 0.1, kernel = 'rbf').fit( # complete

cur.execute( # complete
X_new = # complete

y_preds = # complete

print("""There are {:d}, {:d}, and {:d} sources 
         in classes 1, 2, 3, respectively""".format(*list(np.bincount(y_preds)))) # be careful using bincount

**Problem 5d**

This classification performance isn't great... Lets take a look at the actual class labels, since we sneakily left them in the file headers

In [None]:
 ! grep label test_set_for_LSST_DSFP/*dat | sed -e 's/label:/ /' | awk '{print $2+1}' | sort | uniq -c

One thing you've probably noticed is that there are observations that have very large uncertainties, and our features (which are really just moments of the flux distribution) aren't really all that robust to outliers.

Let's try a more complex description of the shape of the light curves with a Gaussian process regression.
We'll need the `sklearn.gaussian_process` module

In [None]:
import sklearn.gaussian_process as gp

In [None]:
# we'll create a function that just fits the g band lightcurve
def calculate_gpfit(filename):
    """
    Do a quick gaussian process regression of just g-band lightcurve to get a smooth representation
    This should be more robust to outliers
    """
    lc = ANTARESlc(filename)
    t = lc.gtime.values
    y = lc.gFlux.values
    dy = lc.gFluxUnc.values
    t = t.reshape(-1, 1)
    objId = int(filename.split('FAKE')[1].split(".dat")[0])
    
    # a Gaussian Process is defined by a correlation function that relates each point to each other
    # I've chosen a simple common kernel - the Matern kernel. 
    # It has a nice property in that it is 'stationary' - the covariance between points 
    # only depends on their separation
    
    base_kernel = gp.kernels.Matern(length_scale=10.,nu=2.5, length_scale_bounds=(1.,20.))
    gkernel = 0.5*(np.median(dy)**2.)*base_kernel
    
    # the second thing you need to define a Gaussian Process is a mean function, which in our case is 
    # implictly zero (this is fine for transients - they spend most of eternity at the background!)
    gband = gp.GaussianProcessRegressor(kernel=gkernel, alpha=dy**2.).fit(t, y)
    
    # now lets use the Gaussian process to predict the lightcurve on an evenly sampled grid
    # this is appropriate for Wavelets, or generally robust feature extraction
    
    newtime = np.linspace(t.min(), t.max(), 200, endpoint=True)
    newtime = newtime.reshape(-1, 1)
    gnew = gband.predict(newtime, return_cov=False)
    return (t, y , dy, newtime, gnew)

Lets look at one fit to see what's happening:

In [None]:
ftest = filenames[0:20]
gtime, gflux, gdflux, gnewtime, gnewflux = calculate_gpfit(ftest[3])

fig, ax = plt.subplots()
ax.errorbar(gtime, gflux, gdflux, fmt = 'o', color = '#78A5A3')
ax.plot(gnewtime, gnewflux, 'k-')
ax.set_xlabel(r"$\mathrm{MJD}$")
ax.set_ylabel(r"$\mathrm{flux}$")

OH YAY! That's much better a more robust to noisy ratty data, the likes of which you will get all the darn time in astronomy!

Except, is this feasible to run?? Can we practically do it on 10^4--5 alerts every 37 seconds for `ANTARES`??

Well you have the tools to examine this! Let's profile the Gaussian Processes vs all the features

In [None]:
import cProfile
def dummy_func():
    
    all_fits = []
    all_feats = []
    for filename in ftest: 
        result  = calculate_gpfit(filename)
        result2 = calculate_features(filename)
        all_feats.append(result2)
        all_fits.append(result)
    
! rm -f gpstats.cprof    
cProfile.run('dummy_func()','gpstats.cprof')

Now let's see how long that took

In [None]:
%load_ext snakeviz
%snakeviz dummy_func()

Eeek. That's way slower than the regular feature computation!
It actually turns out that this is OK for us, because the implementation of Gaussian Process Regression in sklearn is pretty slow, and there are various better packages and approximations we can make to speed things up dramatically.

For the purposes of this exercise though, we'll stop here, since it's no fun to run these GPs on 5000 lightcurves (and our implementation isn't really robust either).

## Problem 6) Anomaly Detection

As we learned earlier - one of the primary goals of ANTARES is to reduce the stream of 10 million alerts on any given night to the single (or 10, or 100) most interesting objects. One possible definition of "interesting" is rarity - in which case it would be useful to add some form of anomaly detection to the pipeline. `scikit-learn` has [several different algorithms](http://scikit-learn.org/stable/auto_examples/covariance/plot_outlier_detection.html#sphx-glr-auto-examples-covariance-plot-outlier-detection-py) that can be used for anomaly detection. Here we will employ [isolation forest](http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.IsolationForest.html) which has many parallels to random forests, which we have previously learned about.

In brief, isolation forest builds an ensemble of decision trees where the splitting parameter in each node of the tree is selected randomly. In each tree the number of branches necessary to isolate each source is measured - outlier sources will, on average, require fewer splittings to be isolated than sources in high-density regions of the feature space. Averaging the number of branchings over many trees results in a relative ranking of the anomalousness (*yes, I just made up a word*) of each source.

**Problem 6a**

Using [`IsolationForest`](http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.IsolationForest.html) in `sklearn.ensemble` - determine the 10 most isolated sources in the data set.

*Hint* - for `IsolationForest` you will want to use the `decision_function()` method rather than `predict_proba()`, which is what we have previously used with `sklearn.ensemble` models to get relative rankings from the model.

In [None]:
from sklearn.ensemble import IsolationForest

isoF_clf = # complete
isoF_clf.fit( # complete
anomaly_score = isoF_clf.decision_function(X_new)

print("The 10 most anomalous sources are: {}".format( # complete

**Problem 6b**

Plot the light curves of the 2 most anomalous sources. 

Can you identify why these sources have been selected as outliers?

In [None]:
# complete

*Write solution to **6b** here*


## Challenge Problem) Simulate a Real ANTARES

The problem that we just completed features a key difference from the true ANTARES system - namely, all the light curves analyzed had a complete set of observations loaded into the database. One of the key challenges for LSST (and by extension ANTARES) is that the data will be *streaming* - new observations will be available every night, but the full light curves for all sources won't be available until the 10 yr survey is complete. In this problem, you will use the same data to simulate an LSST-like classification problem.

Assume that your training set (i.e. the first 100 sources loaded into the database) were observed prior to LSST, thus, these light curves can still be used in their entirety to train your classification models. For the test set of observations, simulate LSST by determining the min and max observation date and take 1-d quantized steps through these light curves. On each day when there are new observations, update the feature calculations for every source that has been newly observed. Classify those sources and identify possible anomalies.

Here are some things you should think about as you build this software:

1. Should you use the entire light curves for training-set objects when classifying sources with only a few data points?
2. How are you going to handle objects on the first epoch when they are detected?
3. What threshold (if any) are you going to set to notify the community about rarities that you have discovered

*Hint* - Since you will be reading these light curves from the database (and not from text files) the `ANTARESlc` class that we previously developed will not be useful. You will (likely) either need to re-write this class to interact with the database or figure out how to massage the query results to comply with the class definitions.