# Create the full test vector dataframe using Spark temp tables

## First! we set up the Spark Context

In [1]:
import matplotlib
matplotlib.use('Agg')
%matplotlib inline

import matplotlib.pyplot as plt
plt.rcParams['axes.labelsize'] = 14
plt.rcParams['xtick.labelsize'] = 12
plt.rcParams['ytick.labelsize'] = 12

In [2]:
import os
import os.path as osp
#import commands
import time
import random

import numpy as np

import numpy as np
from pyspark import SparkConf,SparkContext, StorageLevel
from pyspark.sql import Row, SQLContext, SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import *
from pyspark.ml.linalg import Vectors


from datetime import datetime
LogFile=datetime.now().strftime('Create_vectors_%H_%M_%d_%m_%Y.log')

import logging
logger = logging.getLogger('myapp')
hdlr = logging.FileHandler(LogFile)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
logger.setLevel(logging.INFO)

In [3]:
sqlContext = SQLContext(sc)

In [4]:
sqlContext.sql("use plasticc")

DataFrame[]

#### Registered tables

this notebooks utilised registered Spark tables - these are Spark dataframes that are registered in memory; from there, they can then be utililsed in forther dataframe creation statements.

There is an excellent discussion here -

https://towardsdatascience.com/sql-at-scale-with-apache-spark-sql-and-dataframes-concepts-architecture-and-examples-c567853a702f


## First, we create the modifiers for the flux values and the flux value to be added to the metadata table

This inplements the code from lines 282 - 287 in the original program, but as a set (not iteratively).

        flux_max = np.max(flux)
        flux_min = np.min(flux)
        flux_pow = math.log2(flux_max - flux_min)
        sample['hist'][:,1] /= math.pow(2, flux_pow)
        sample['hist'][:,2] /= math.pow(2, flux_pow)
        sample['meta'][9] = flux_pow / 10


In [9]:
modifiersSQL="""
select
object_id,
log2(max(flux)- min(flux)) flux_pow,
pow(2,log2(max(flux)- min(flux)) ) as HistModifier,
log2(max(flux)- min(flux))/10 as metaVal
from training_set
group by object_id
"""
modifiersDF=sqlContext.sql(modifiersSQL)

# Create the in memory table
modifiersDF.registerTempTable("MODIFIERS")


## Create the padded training set
We do this to create a standard set of features for all objects; in this case, the value provided by sequence_len (which in this case is 256).

We're also going to use Spark registered tables to do this.

In [10]:
training_set = "training_set"
training_metadata = "training_set_metadata"

This is the SQL equivalent of the keras PAD_SEQUENCES function. 

We create a baseline table consisting of object_ids and a 0 value for every feature as well as a rownumber. This is accomplished by a cartesian join (cross join in Hive) between the "cnt" nested table and the "objects" nested table, resulting the baseline nested table which has 256 records for each object, with all features set to zero.

Next, we create a "train_set" nested table containing all the training set information, ordered by the mjd value descending. this pads the data from the last value to the first as per PAD_SEQUENCES,

We then create the padded training set as a left join from the baseline nested table to the train_set nested table, padded to a consistent 236 values for each feature.

Finally, we include the pow modifier onto the flux and flux_err values.

In [11]:
paddedSQL="""
with
cnt
as
(
    select rownum from 
    (
        select row_number() over (ORDER BY object_id) as rownum
        from {}
    ) a
    where rownum <=256
),
objects as (select object_id, 0 padMJD, 0 padPassband,0 padFlux, 0 padFlux_err,0 padDetected from {} group by object_id),
baseline as (select * from objects CROSS JOIN cnt ), -- cartesian product with 256 values to use as the baseline
train_set as (select *, row_number() over (partition by object_id order by mjd desc) as rownum from {}),
paddedRev as (
    select baseline.object_id, --train_set.mjd, baseline.padMJD,
    case when train_set.mjd is null then baseline.padMJD else train_set.mjd end mjd,
    case when train_set.passband is null then baseline.padPassband else train_set.passband end passband,
    case when train_set.flux is null then baseline.padFlux else train_set.flux end flux,
    case when train_set.flux_err is null then baseline.padFlux_err else train_set.flux_err end flux_err,
    case when train_set.detected is null then baseline.padDetected else train_set.detected end detected
    from baseline left outer join train_set on baseline.object_id = train_set.object_id and baseline.rownum=train_set.rownum
    order by baseline.object_id, mjd desc
)
select 
    pr.object_id, 
    mjd, 
    passband, 
    flux / HistModifier as flux, 
    flux_err/ HistModifier as flux_err, 
    detected 
from paddedRev pr
    inner join {} mods
        on pr.object_id = mods.object_id
order by pr.object_id, mjd
""".format(training_metadata,training_set,training_set, "MODIFIERS")

Now we create the padded training data dataframe and create a Spark temporary table

In [12]:
paddedTrainingSet_DF = sqlContext.sql(paddedSQL)
paddedTrainingSet_DF.registerTempTable("PADDED_TRAINING_SET")

#### Create the augmented data

We take the padded data set, and create the augmented values as per the Augment function in the original program.

In [13]:
augmentSQL="""
with New_Training_set as(
    select ts.object_id, ts.mjd, ts.passband,ts.flux,
   rand()*((ts.flux+ts.flux_err)-((ts.flux-ts.flux_err)/1.5))+((ts.flux-ts.flux_err)/1.5)  newFlux,
    ts.flux_err,ts.detected,
    (1+tsm.hostgal_photoz)/( 1+ (rand()*((tsm.hostgal_photoz+tsm.hostgal_photoz_err)-((tsm.hostgal_photoz-tsm.hostgal_photoz_err)/1.5))+((tsm.hostgal_photoz-tsm.hostgal_photoz_err)/1.5))) dt
    from {} ts
        inner join training_set_metadata tsm
            on ts.object_id = tsm.object_id
)
select new_training_set.object_id,
new_training_set.mjd*new_training_set.dt as mjd,  new_training_set.passband, new_training_set.newflux as flux, 
new_training_set.flux_err, new_training_set.detected
from New_Training_set
""".format("PADDED_TRAINING_SET")

augmented_DF = sqlContext.sql(augmentSQL)
augmented_DF.registerTempTable("AUGMENTED_TRAINING")

#### Get the training set metadata
 And add in the modifier field metaVal

In [14]:
metadataSQL="""
select ts.*,
    mod.metaVal
from {} ts
    INNER JOIN {} mod
        ON ts.object_id=mod.object_id
        """.format(training_metadata, "MODIFIERS")
metadata_DF = sqlContext.sql(metadataSQL)
metadata_DF.registerTempTable("TRAINING_SET_METADATA")

### So, now we have the padded, augmented training set and the augmented training set metadata

#### Create the training set padded feature vectors dataframe and instantiate it as a hive table.

Points to note.

- The following SQL could all be incorporated into one statement, but in the interests of clarity, we have broken it down into relevent component parts
- Calculating the MJD intervals utilises SQL WINDOW functionality. This needs to be used carefully, because window functionality will cause Spark to perform a hash sort which is potentially a very expensive operation and on larger datasets can cause a spill to disk which is to be avoided if at all possible. 
- Two separate tables need to be utilised for these sorts, because Hive doesn not support a WINDOW statement on the same field (mjd) with different ORDER BY clauses in the OVER (PARTITION BY ... ORDER BY ...) WINDOW. Hence, we have CTE1 and CTE2 tables for this.




How these modifiers are applied - we join the modifiers table to the padded training (lines 15-17) set and run the calculation in the select statement. (lines 11 and 12)

In [15]:
# Get the basic data from the padded training_set we created earlier, "PADDED_TRAINING_SET"
CTE1_sql="""
        select ts.object_id, mjd,
        mjd - first_value(mjd) over w as mjdInt,
        case when lag(mjd) OVER w is null then
            0
        else
            mjd - lag(mjd) over w 
        end as deltaMjd,
        passband,
        flux,
        flux_err,
        detected,
        row_number() OVER w as rownum
        from {} ts
        WINDOW w AS (PARTITION BY ts.object_id ORDER BY mjd)
""".format("AUGMENTED_TRAINING")

CTE1_df=sqlContext.sql(CTE1_sql)

In [16]:
# Create an in memory table
CTE1_df.registerTempTable("CTE1")

In [17]:
CTE2_sql="""
        select object_id,
        first_value(mjd) OVER x - mjd as rval,
        row_number() OVER x as rownum
        from {}
        WINDOW x AS (PARTITION BY object_id ORDER BY mjd DESC)
""".format("AUGMENTED_TRAINING")

CTE2_df=sqlContext.sql(CTE2_sql)

In [18]:
# Create an in memory table
CTE2_df.registerTempTable("CTE2")

#### So in here we include the calculated fields for the metadata, as well as creating the augmented hostgal_photoz field.

We do need to add modifier values based on minimum and maximum flux values for each observed object - this is the metaVal field from the MODIFIERS table

In [19]:
# Create the metadata we need for the feature vectors
meta_sql="""
        select meta.object_id, gal_l, gal_b, ddf, hostgal_specz,
        rand()*((hostgal_photoz+hostgal_photoz_err)-((hostgal_photoz-hostgal_photoz_err)/1.5))+((hostgal_photoz-hostgal_photoz_err)/1.5) hostgal_photoz,
        hostgal_photoz_err, mwebv,target,
        case when hostgal_photoz > 0 
            then 1  -- CAST(1 AS BOOLEAN)
            else 0 --CAST(0 AS BOOLEAN)
            end as photoz_positive,
        --6, 15, 16, 42, 52, 53, 62, 64, 65, 67, 88, 90, 92, 95, 99
        case 
            when target= 6 then 0
            when target= 15 then 1
            when target= 16 then 2
            when target= 42 then 3
            when target= 52 then 4
            when target= 53 then 5
            when target= 62 then 6
            when target= 64 then 7
            when target= 65 then 8
            when target= 67 then 9
            when target= 88 then 10
            when target= 90 then 11
            when target= 92 then 12
            when target= 95 then 13
            when target= 99 then 14
            else 14
            end mapped_target,
            metaVal

        from {} meta
""".format("TRAINING_SET_METADATA")

meta_df=sqlContext.sql(meta_sql)
meta_df.registerTempTable("meta")

### Create the intermediate table

This table sets up the arrays for the object metadata, spaced out to ten elements and create the key value pairs for the mjd, passband, flux etc arrays that will be created in the next step.


In [20]:
struct_sql="""
        select CTE1.object_id,mapped_target as target,
        array(0,0,0,0,ddf,hostgal_specz, hostgal_photoz,mwebv,photoz_positive,metaVal) as meta,
        double(hostgal_specz) as specz,
        MAP(
            'interval', mjdInt,
            'deltaMjd', deltaMjd,
            'passband',passband,
            'rval', rval,
            'flux',flux,
            'flux_err',flux_err,
            'detected',detected,
            'received_wavelength', case 
                                    when CTE1.passband = 0 then 357/1000
                                    when CTE1.passband = 1 then 477/1000
                                    when CTE1.passband = 2 then 621/1000
                                    when CTE1.passband = 3 then 754/1000
                                    when CTE1.passband = 4 then 871/1000
                                    else  1004/1000
                                    end,
            'source_wavelength', case 
                                    when CTE1.passband = 0 then 357 / (meta.hostgal_photoz + 1)/1000
                                    when CTE1.passband = 1 then 477 / (meta.hostgal_photoz + 1)/1000
                                    when CTE1.passband = 2 then 621 / (meta.hostgal_photoz + 1)/1000
                                    when CTE1.passband = 3 then 754 / (meta.hostgal_photoz + 1)/1000
                                    when CTE1.passband = 4 then 871 / (meta.hostgal_photoz + 1)/1000
                                    else 1004 / (meta.hostgal_photoz + 1)/1000
                                    end
        
        ) AS kv
        from CTE1 
            inner join CTE2
                on CTE1.object_id=CTE2.object_id
                and CTE1. rownum=CTE2.rownum
            inner join meta
                on CTE1.object_id = meta.object_id
"""

struct_df=sqlContext.sql(struct_sql)
struct_df.registerTempTable("struct")

### And finally, we create the full training set of feature vectors, padded to 256 elements.

Next cell illustrates the original structured record with hist as an array of arrays.

In [21]:
getVectorsSql="""
select object_id,meta,target,specz,
collect_list(int(a.kv['passband']))as band,
ARRAY(NAMED_STRUCT(
    'interval',             collect_list(float(a.kv['interval'])) ,
    'deltaMjd',             collect_list(float(a.kv['deltaMjd'])) ,
    'rval',                 collect_list(float(a.kv['rval'])) ,
    'flux',                 collect_list(float(a.kv['flux'])) ,
    'flux_err',             collect_list(float(a.kv['flux_err'])) ,
    'detected',             collect_list(int(a.kv['detected'])) ,
    'source_wavelength',    collect_list(float(a.kv['source_wavelength'])) ,
    'received_wavelength',  collect_list(float(a.kv['received_wavelength'])) 
    )
) as hist
from struct a
group by object_id, meta,target,specz
"""

vectors_df=sqlContext.sql(getVectorsSql)

#### Display the schema for the feature vectors

In [22]:
vectors_df.printSchema()

root
 |-- object_id: integer (nullable = true)
 |-- meta: array (nullable = false)
 |    |-- element: double (containsNull = true)
 |-- target: integer (nullable = false)
 |-- specz: double (nullable = true)
 |-- band: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- hist: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- interval: array (nullable = true)
 |    |    |    |-- element: float (containsNull = true)
 |    |    |-- deltaMjd: array (nullable = true)
 |    |    |    |-- element: float (containsNull = true)
 |    |    |-- rval: array (nullable = true)
 |    |    |    |-- element: float (containsNull = true)
 |    |    |-- flux: array (nullable = true)
 |    |    |    |-- element: float (containsNull = true)
 |    |    |-- flux_err: array (nullable = true)
 |    |    |    |-- element: float (containsNull = true)
 |    |    |-- detected: array (nullable = true)
 |    |    |    |-- element: integer (containsNull =

In [23]:
vectors_df.explain()

== Physical Plan ==
InMemoryTableScan [object_id#46, meta#153, target#152, specz#154, band#167, hist#168]
   +- InMemoryRelation [object_id#46, meta#153, target#152, specz#154, band#167, hist#168], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- ObjectHashAggregate(keys=[object_id#46, meta#153, target#152, specz#154], functions=[collect_list(cast(kv#155[passband] as int), 0, 0), collect_list(cast(kv#155[interval] as float), 0, 0), collect_list(cast(kv#155[deltaMjd] as float), 0, 0), collect_list(cast(kv#155[rval] as float), 0, 0), collect_list(cast(kv#155[flux] as float), 0, 0), collect_list(cast(kv#155[flux_err] as float), 0, 0), collect_list(cast(kv#155[detected] as int), 0, 0), collect_list(cast(kv#155[source_wavelength] as float), 0, 0), collect_list(cast(kv#155[received_wavelength] as float), 0, 0)])
            +- ObjectHashAggregate(keys=[object_id#46, meta#153, target#152, specz#154], functions=[partial_collect_list(cast(kv#155[passband] as int), 0

## And finally, we create the feature vector table in hive

In [24]:
MODE='append'
FORMAT='parquet'
TABLE='training_set_augmented_vectors' #- original full hist ARRAY - STRUCT - ARRAY

vectors_df.write.mode(MODE).format(FORMAT).saveAsTable(TABLE)


## Explain plan comparisons

What we can see here is the advantage of physically instantiating these vector creation scripts into a physical table. Compare the explain plan below with the plan above.

In [5]:
AugmentDF=sqlContext.sql("select * from training_set_flat_augmented_vectors")

In [6]:
AugmentDF.explain()

== Physical Plan ==
*(1) FileScan parquet plasticc.training_set_flat_augmented_vectors[object_id#0,meta#1,target#2,specz#3,band#4,interval#5,deltaMjd#6,rval#7,flux#8,flux_err#9,detected#10,source_wavelength#11,received_wavelength#12] Batched: false, Format: Parquet, Location: InMemoryFileIndex[hdfs://athena-1.nimbus.pawsey.org.au:8020/user/hive/warehouse/plasticc.db/train..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<object_id:int,meta:array<double>,target:int,specz:double,band:array<double>,interval:array...
