## ...  and so it begins

In [1]:
/** 
 *  Load a custom Lightning Scala Client for Visualizations 
 */
%AddJar https://github.com/joshisa/lightning-scala/raw/master/dist/lightning-scala_2.10-0.2.0.jar -f
val b = "SWYgc2hlIHdlaWdocyB0aGUgc2FtZSBhcyBhIGR1Y2ssIHNoZSdzIG1hZGUgb2Ygd29vZCBhbmQgdGhlcmVmb3JlIGEgd2l0Y2guIH5Nb250eSBQeXRob24="
val bo = "TGV0J3MgaG9wZSB0aGF0IG91ciBkYXRhIGFuYWx5c2lzIGlzIGEgd2VlIGJpdCBiZXR0ZXIgdGhhbiB0aGUgTW9udHkgUHl0aG9uIG1lZGlldmFsIGxvZ2ljIG9mIGFib3Zl"
val boo = "RG8gd2hhdCBJIGRvLiBIb2xkIHRpZ2h0IGFuZCBwcmV0ZW5kIGl04oCZcyBhIHBsYW4hIH5Eci4gV2hv"

Starting download from https://github.com/joshisa/lightning-scala/raw/master/dist/lightning-scala_2.10-0.2.0.jar
Finished download of lightning-scala_2.10-0.2.0.jar


In [2]:
/**
 *  Standard and custom imports
 *
 *  Imports help with data formats, time calculations and visualizations
 *
 */

import sys.process._
import scala.util.control.NonFatal
import java.util.Base64
import java.nio.charset.StandardCharsets
import play.api.libs.json.Json
import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType, Metadata};

import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext

import System.nanoTime
import java.util.concurrent.TimeUnit
import scala.util.Random
import scala.collection.mutable.ArrayBuffer
import java.text.DecimalFormat

import org.viz.lightning._

/**
 *  Dumb Code to obscure strings
 */
 
def createMagic(mystery: String): String = {
  Base64.getEncoder.encodeToString(mystery.getBytes(StandardCharsets.UTF_8))
}

/**
 *  Equally dumb code to make obscure strings less obscure
 */
 
def unfurl(mystery: String): String = {
  return new String(Base64.getDecoder().decode(mystery), StandardCharsets.UTF_8)
}

/**
 *  Important tuning optimization for potential Join operations
 */

val sqlctx = new SQLContext(sc)
sqlctx.setConf("spark.sql.shuffle.partitions", "1")

val profileFormat = new DecimalFormat("#.###########")

println(unfurl(b))
println(unfurl(bo))

If she weighs the same as a duck, she's made of wood and therefore a witch. ~Monty Python
Let's hope that our data analysis is a wee bit better than the Monty Python medieval logic of above


In [3]:
/** Reference && Inspiration 
 *
 *  http://stackoverflow.com/questions/33725500/load-data-from-bluemix-object-store-in-spark
 *  https://developer.ibm.com/clouddataservices/start-developing-with-spark-and-notebooks/
 *  https://github.com/ibm-et/spark-kernel/wiki/List-of-Current-Magics-for-the-Spark-Kernel
 *
 */
println("Spark Context Version: " + sc.version);
println("Java version: " + scala.util.Properties.javaVersion);
println("Scala Release version: " + scala.util.Properties.releaseVersion);
println("_____________________");
println("Spark Context Config:");
sc.getConf.getAll.foreach(println);
println("_____________________");
println("Spark SQL Context Config:");
sqlctx.getAllConfs.foreach(println);

Spark Context Version: 1.6.0
Java version: 1.8.0
Scala Release version: Some(2.10.5)
_____________________
Spark Context Config:
(spark.eventLog.dir,/gpfs/fs01/user/s70e-5f4bdff17b6ca7-347c2e5e0455/events)
(spark.eventLog.enabled,true)
(spark.deploy.resourceScheduler.factory,org.apache.spark.deploy.master.EGOResourceSchedulerFactory)
(spark.ui.retainedJobs,0)
(spark.shuffle.service.enabled,true)
(spark.executor.extraJavaOptions,-Djava.security.egd=file:/dev/./urandom)
(spark.executor.id,driver)
(spark.port.maxRetries,512)
(spark.sql.tungsten.enabled,false)
(spark.repl.class.uri,http://xx.xx.133.233:34107)
(spark.logConf,true)
(spark.driver.host,xx.xxx.133.233)
(spark.history.fs.logDirectory,/gpfs/fs01/user/s70e-5f4bdff17b6ca7-347c2e5e0455/events)
(spark.master,spark://xx-xxxxx-xxxxx-env5-0027:7082)
(spark.r.command,/usr/local/src/bluemix_jupyter_bundle.v19/R/bin/Rscript)
(spark.app.id,app-20160928164847-0602-0f76aa4a-b634-4723-9a8b-2fc3173aa70f)
(spark.app.name,IBM Spark Kernel)
(spark

### Stand up Scaffolding

In [4]:
/**
 *  Lightning provides API-based access to reproducible web visualizations.
 *  http://lightning-viz.org/
 *  @param host should not contain a trailing slash
 */ 
val lgn = Lightning(host="https://lightningviz.mybluemix.net");
lgn.auth = None
lgn.createSession()
println(unfurl(boo))
println("");

Do what I do. Hold tight and pretend it’s a plan! ~Dr. Who


In [5]:
/**
 *  Useful for formatted console output of test results and calculated values
 */

def printAnalysis(expected: Array[Double], calculated: Array[Double]):Any = {
    val sb = new StringBuilder
    (expected,calculated).zipped.foreach {(expected,result) =>
        sb append "Expected: " + expected + " Calculated: " + result
        if (expected == result) sb append "   PASS" else sb append "   FAIL :-("                                                          
        println(sb)
        sb.setLength(0)
    }
}

def createTestDF(testValues:Array[Double]): org.apache.spark.sql.DataFrame = {
    val schema = StructType(
            Seq(
              StructField("TestValues", DoubleType, false, Metadata.fromJson("""{"description":"Series of Test Input Values"}"""))
              )
            )
    val testinput:org.apache.spark.rdd.RDD[Double] = sc.parallelize(testValues)
    val rows: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = testinput map {
          case testvalue => org.apache.spark.sql.Row(testvalue)
        }

    return sqlctx.createDataFrame(rows,schema)
}

def metaPrint(dataframe: org.apache.spark.sql.DataFrame) = {
    println("Meta Description")
    println("================")
    dataframe.schema.foreach{s => println(s"Column: ${s.name} [${(Json.parse(s.metadata.toString)\ "description").as[String]}]")}
    println(".")
    dataframe.printSchema()
}

def display (values: Array[Array[Double]],
            lgn: org.viz.lightning.Lightning = Lightning(host="https://lightningviz.mybluemix.net"), 
            xaxis: String = "x-Axis",
            yaxis: String = "y-Axis"): Either[com.ibm.spark.magic.CellMagicOutput,com.ibm.spark.magic.LineMagicOutput] = {
                                          
    val viz = lgn.line(values, xaxis=xaxis, yaxis=yaxis)
    val viz2html = viz.getHTML
    return kernel.magics.html(s"""${viz2html}""");
}

In [6]:
/**
 *  Useful for smallish datasets to calculate:
 *  Simple, Central and Exponential Moving Averages as well as 1st and 2nd Differentials
 *  For larger datasets, consider the Cloudera ts lib
 *  @constructor create a new notebookTimeSeries object taking an Array[Double]
 *  @param dataframe: Spark SQL Dataframe containing 1...N columns
 *  @param colnames: Sequence of column names to be aggregated (summed) and
 *                   treated as a single Array for visualization.
 *  @author Sanjay Joshi (joshisa@us.ibm.com)
 *  @version 1.0
 *  @todo Add more functionality.  Consider Breeze Vectors and spark-ts
 *
 *  Copyright 2016 IBM
 */

class notebookTimeSeries(dataframe: org.apache.spark.sql.DataFrame, colnames: Seq[String]) {
  import org.viz.lightning._
  import java.math.MathContext
    
  val values: Array[Double] = dataframe.select(colnames.head, colnames.tail: _*).rdd.map{
      case org.apache.spark.sql.Row(row:Int) => {
          var sumInt = 0.0
          for (i <- 0 until row.length) {
              sumInt += row(i).toDouble
          }
          sumInt
      }
      case org.apache.spark.sql.Row(row:Double) => {
          var sumDub = 0.0
          for (i <- 0 until row.length) {
              sumDub += row(i)
          }
          sumDub
      }
  }.collect()
    
  println("Nanite data probes launched ... ")
    
  private def roundME(bignumber: Double, precision: Int = 4):Double = {
    var bd:BigDecimal = BigDecimal(bignumber)
    bd = bd.round(new MathContext(precision))
    return bd.doubleValue()
  }  
    
  def display (lgn: org.viz.lightning.Lightning = Lightning(host="https://lightningviz.mybluemix.net"), 
               xaxis: String = "x-Axis",
               yaxis: String = "y-Axis"): Either[com.ibm.spark.magic.CellMagicOutput,com.ibm.spark.magic.LineMagicOutput] = {
                                          
    val viz = lgn.line(Array(values), xaxis=xaxis, yaxis=yaxis)
    val viz2html = viz.getHTML
    return kernel.magics.html(s"""${viz2html}""");
  }
    
  def firstdiffy(valueArray: Array[Double] = values): Array[Double] = {
      return ((valueArray drop 1, valueArray).zipped.map(_-_) map math.abs)
  }
    
  def secdiffy(valueArray: Array[Double] = values): Array[Double]  = {
      return ((firstdiffy(valueArray) drop 1, firstdiffy(valueArray)).zipped.map(_-_) map math.abs)
  }
  
  def simpleMA(period: Int, step: Int = 1, valueArray: Array[Double] = values): Array[Double] = {
      (List.make(period - 1, 0.0) ++ (valueArray sliding (period,step) map (_ sum) map (_ / period) map (roundME(_)))).toArray
  }
    
  private def cmaCalc(period: Int, valueArray: Array[Double], step: Int = 1): Array[Double] = {
      val buffy = (List.make(period - 1, 0.0) ++ (valueArray sliding (period,step) map (_ sum) map (_ / period) map (roundME(_)))).toBuffer
      buffy.remove(0, ((period - 1) / 2))
      return buffy.toArray
  }

  def centralMA(period: Int, valueArray: Array[Double] = values): Array[Double] = {
      try {
          period match {
              case bad if bad < 2 => {
                  println(s"WARNING: window size of ${bad} provided. Central Moving Average cannot be computed")
                  val window = 0
                  return Array(0.0)
              }
              case even if even % 2 == 0 => {
                  println(s"WARNING: Even window size of $period detected. Incrementing window size from ${period} to ${period+1} for Central Moving Average calculation")
                  return cmaCalc(period+1, valueArray)
              }
              case _ => {
                  return cmaCalc(period, valueArray)
              }
          };
      } catch {
          case NonFatal(exc) => println(exc); return Array(0);
      }
  }
    
/** 
  Description (http://www.timestored.com/b/exponential-moving-average-ema-kdb/)
  One of the issues with the simple moving average is that it gives every day an equal weighting. 
  For many purposes it makes more sense to give the more recent days a higher weighting, 
  one method of doing this is by using the Exponential Moving Average. 
  This uses an exponentially decreasing weight for dates further in the past.
    
  Method (http://www.itl.nist.gov/div898/handbook/pmc/section3/pmc324.htm)
  params:
      λ (lambda) - a constant that determines the depth of memory of the EWMA
          The parameter λ determines the rate at which "older" data enter 
          into the calculation of the EWMA statistic. A value of λ=1 implies 
          that only the most recent measurement influences the EWMA (degrades to Shewhart chart). 
          Thus, a large value of λ (closer to 1) gives more weight to recent data 
          and less weight to older data; a small value of λ (closer to 0) gives 
          more weight to older data. The value of λ is usually set between 0.2 
          and 0.3 (Hunter) although this choice is somewhat arbitrary. 
          Lucas and Saccucci (1990) give tables that help the user select λ.
          
      precision - number of significant digits in the calculated value
 */

  def exponentialMA(lambda: Double = 0.5, precision: Int = 4): Array[Double] = {
      val inverseLambda = (1.0 - lambda)
      val ema = Array.fill(values.length)(0.0)
      (values).view.toArray.zipWithIndex.map {
          case (x,i) if i == 0 => {
              ema(i) = roundME(x, precision)
              ema(i)
          }
          case (x,i) => {
              ema(i) = roundME((lambda * x) + (inverseLambda * ema(i-1)), precision)
              ema(i)
          }
      }
  }
    
  def sum(valueArray: Array[Double] = values): Double = {
      var sum = 0.0
      var i = 0
      while (i < valueArray.length) { 
          sum += valueArray(i); 
          i += 1 
      }
      sum
  }
    
  def toArray(valueArray: Array[Double] = values): Array[Double] = {
      return valueArray
  }
    
  def length: Int = {
      return values.length
  }
      
  override def toString: String = {
      "Timeseries Object derived from a Spark SQL Dataframe for compatibility with a Lightning Visualization Server"
  }
}

In [7]:
val dftest1 = createTestDF(Array(2.0,2.0,4.0,4.0,6.0,6.0,8.0,8.0,10.0,10.0))
metaPrint(dftest1)

Meta Description
Column: TestValues [Series of Test Input Values]
.
root
 |-- TestValues: double (nullable = false)



In [8]:
val dftest2 = createTestDF(Array(1.0,2.0,3.0,4.0,8.0,10.0,20.0))
metaPrint(dftest2)

Meta Description
Column: TestValues [Series of Test Input Values]
.
root
 |-- TestValues: double (nullable = false)



In [9]:
val testclass1 = new notebookTimeSeries(dftest1, Seq("TestValues"))
val testclass2 = new notebookTimeSeries(dftest2, Seq("TestValues"))
testclass1.toString()

Nanite data probes launched ... 
Nanite data probes launched ... 


Timeseries Object derived from a Spark SQL Dataframe for compatibility with a Lightning Visualization Server

In [10]:
val test1:Array[Double] = testclass1.simpleMA(2)
val test2:Array[Double] = testclass1.simpleMA(4)
val test3:Array[Double] = testclass1.centralMA(0)
val test4:Array[Double] = testclass1.centralMA(1)
val test5:Array[Double] = testclass1.centralMA(2)
val test6:Array[Double] = testclass1.centralMA(3)
val test7:Array[Double] = testclass1.centralMA(4)
val test8:Array[Double] = testclass1.centralMA(5)
val test9:Array[Double] = testclass2.exponentialMA(0.7,8)
val test10:Array[Double] = testclass1.firstdiffy()
val test11:Array[Double] = testclass1.secdiffy()
val test12:Array[Double] = testclass1.toArray()
val test13:Double = testclass1.sum()
val test14:Int = testclass1.length



In [11]:
val expected_1       = Array(0.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0)
val expected_2       = Array(0.0,0.0,0.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0)
val expected_3_4     = Array(0.0)
val expected_5_6     = Array(0.0,2.667,3.333,4.667,5.333,6.667,7.333,8.667,9.333)
val expected_7_8     = Array(0.0,0.0,3.6,4.4,5.6,6.4,7.6,8.4)
val expected_9       = Array(1.0,1.7,2.61,3.583,6.6749,9.00247,16.700741)
val expected_10      = Array(0.0,2.0,0.0,2.0,0.0,2.0,0.0,2.0,0.0)
val expected_11      = Array(2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0)
val expected_12      = Array(2.0,2.0,4.0,4.0,6.0,6.0,8.0,8.0,10.0,10.0)
val expected_13      = 60.0
val expected_14      = 10
var alpha = 0.7

println("Moving Average Test Suite ...")
println("=============================")
var window = 2
println(s"\nTest 1: Simple Moving Average for Window Size of $window")
printAnalysis(expected_1,test1)
var window = 4
println(s"\nTest 2: Simple Moving Average for Window Size of $window")
printAnalysis(expected_2,test2)
var window = 0
println(s"\nTest 3: Central Moving Average for Window Size of $window")
printAnalysis(expected_3_4,test3)
var window = 1
println(s"\nTest 4: Central Moving Average for Window Size of $window")
printAnalysis(expected_3_4,test4)
var window = 2
println(s"\nTest 5: Central Moving Average for Window Size of $window")
printAnalysis(expected_5_6,test5)
var window = 3
println(s"\nTest 6: Central Moving Average for Window Size of $window")
printAnalysis(expected_5_6,test6)
var window = 4
println(s"\nTest 7: Central Moving Average for Window Size of $window")
printAnalysis(expected_7_8,test7)
var window = 5
println(s"\nTest 8: Central Moving Average for Window Size of $window")
printAnalysis(expected_7_8,test8)
println(s"\nTest 9: Exponential Moving Average for Smoothing Factor of $alpha")
printAnalysis(expected_9,test9)
println(s"\nTest 10: First Differential (Slopes between each point)")
printAnalysis(expected_10,test10)
println(s"\nTest 11: Second Differential (Rate of change of slopes)")
printAnalysis(expected_11,test11)
println(s"\nTest 12: Single Array of the original input dataframe columns summed")
printAnalysis(expected_12,test12)
println(s"\nTest 13: Sum of the single array")
printAnalysis(expected_13,test13)
println(s"\nTest 14: Length of the single array")
printAnalysis(expected_14,test14)
println("")

Moving Average Test Suite ...

Test 1: Simple Moving Average for Window Size of 2
Expected: 0.0 Calculated: 0.0   PASS
Expected: 2.0 Calculated: 2.0   PASS
Expected: 3.0 Calculated: 3.0   PASS
Expected: 4.0 Calculated: 4.0   PASS
Expected: 5.0 Calculated: 5.0   PASS
Expected: 6.0 Calculated: 6.0   PASS
Expected: 7.0 Calculated: 7.0   PASS
Expected: 8.0 Calculated: 8.0   PASS
Expected: 9.0 Calculated: 9.0   PASS

Test 2: Simple Moving Average for Window Size of 4
Expected: 0.0 Calculated: 0.0   PASS
Expected: 0.0 Calculated: 0.0   PASS
Expected: 0.0 Calculated: 0.0   PASS
Expected: 3.0 Calculated: 3.0   PASS
Expected: 4.0 Calculated: 4.0   PASS
Expected: 5.0 Calculated: 5.0   PASS
Expected: 6.0 Calculated: 6.0   PASS
Expected: 7.0 Calculated: 7.0   PASS
Expected: 8.0 Calculated: 8.0   PASS
Expected: 9.0 Calculated: 9.0   PASS

Test 3: Central Moving Average for Window Size of 0
Expected: 0.0 Calculated: 0.0   PASS

Test 4: Central Moving Average for Window Size of 1
Expected: 0.0 Calcul

In [12]:
testclass1.display()

<h2 style="text-align:center">Simple Moving Average</h2>
<img src="https://wikimedia.org/api/rest_v1/media/math/render/svg/3d47f20e3047f6e87f1d2b61ae86b25925c4bf23"/>
<img src="https://dal.objectstorage.open.softlayer.com/v1/AUTH_c7f5f0b1d1314eae9a8b362eda80dacd/whisk/sma.png"/>
<h4 style="text-align:center">The period (n) selected depends on the type of movement of interest, such as short, intermediate, or long-term.</h4>

In [13]:
test2

Array(0.0, 0.0, 0.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0)

In [14]:
display(test2,yaxis="Y-Axis (Period=4)")

<h2 style="text-align:center">Central Moving Average</h2>
If the data used are not centered around the mean, a simple moving average lags behind the latest datum point by half the sample width. An SMA can also be disproportionately influenced by old datum points dropping out or new data coming in. For a number of applications, it is advantageous to avoid the shifting induced by using only 'past' data. Hence a central moving average can be computed, using data equally spaced on either side of the point in the series where the mean is calculated.<sup><a href="https://en.wikipedia.org/wiki/Moving_average">source</a></sup>
<img src="https://dal.objectstorage.open.softlayer.com/v1/AUTH_c7f5f0b1d1314eae9a8b362eda80dacd/whisk/cma.png"/>
<h4 style="text-align:center">The period (n) must be an odd integer in order to have a central value with equal sampling on both sides</h4>

In [15]:
test8

Array(0.0, 0.0, 3.6, 4.4, 5.6, 6.4, 7.6, 8.4)

In [16]:
display(test8,yaxis="Y-Axis (Period=5)")

<h2 style="text-align:center">Exponential Weighted Moving Average</h2><br/>
<span style="display:inline-block;margin-left:25%;margin-right:25%;">One of the issues with the simple moving average is that it gives every day an equal weighting. For many purposes it makes more sense to give the more recent days a higher weighting, one method of doing this is by using the Exponential Moving Average. This uses an exponentially decreasing weight for dates further in the past.</span>
<img src="https://dal.objectstorage.open.softlayer.com/v1/AUTH_c7f5f0b1d1314eae9a8b362eda80dacd/whisk/ewma.png"/>

In [17]:
test9

Array(1.0, 1.7, 2.61, 3.583, 6.6749, 9.00247, 16.700741)

In [18]:
display(test9,yaxis="Y-Axis (lambda=0.7)")

<h2 style="text-align:center">Differential Calculation - First and Second Order</h2><br/>
<span style="display:inline-block;margin-left:25%;margin-right:25%;">Differencing helps to stabilize the mean.  The first difference of a time series is the series of changes from one period to the next. If Y<sub>t</sub> denotes the value of the time series Y at period t, then the first difference of Y at period t is equal to Y<sub>t</sub>-Y<sub>t-1</sub>. If the first difference of Y is stationary and also completely random (not autocorrelated), then Y is described by a random walk model: each value is a random step away from the previous value. If the first difference of Y is stationary but not completely random--i.e., if its value at period t is autocorrelated with its value at earlier periods--then a more sophisticated forecasting model such as exponential smoothing or ARIMA may be appropriate.<sup><a href="http://people.duke.edu/~rnau/411diff.htm"</a></sup></span>
<img src="https://dal.objectstorage.open.softlayer.com/v1/AUTH_c7f5f0b1d1314eae9a8b362eda80dacd/whisk/diffy.png"/>

In [19]:
test10

Array(0.0, 2.0, 0.0, 2.0, 0.0, 2.0, 0.0, 2.0, 0.0)

In [20]:
display(test10, yaxis="Y-Axis - First Difference")

In [21]:
test11

Array(2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0)

In [22]:
display(test11, yaxis="Y-Axis 2nd Differential")

## License:  Apache 2.0
<img src="https://camo.githubusercontent.com/050668fbd650757da3effc745955d16100810839/68747470733a2f2f697066732e696f2f697066732f516d5666466241534731314d6a5546757666414a6d6a4b7078524759737443417739476d5a45706241374b453741"/>