In [1]:
1+1

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.1.195:4040
SparkContext available as 'sc' (version = 3.2.3, master = local[*], app id = local-1670117534683)
SparkSession available as 'spark'


res0: Int = 2


In [2]:
import breeze.linalg._
import breeze.stats.distributions._
import scala.math
import java.io._

import breeze.linalg._
import breeze.stats.distributions._
import scala.math
import java.io._


In [3]:
// global parameters
val h: Double = 0.02
val k: Double = math.pow(h,0.75)
val M: Int = (math.Pi/math.pow(k,1.5)).toInt
val grid = k*linspace(-M,M,2*M+1)
val irange = convert(linspace(0,2*M,2*M+1),Int) 
val gamma: Int = 25

def drift(theta: DenseVector[Double], x: Double) = {
    val f = theta(0)*(theta(1) - x)
    f
}

def driftV(theta: DenseVector[Double], x: DenseVector[Double]) = {
    val f = theta(0)*(theta(1) - x)
    f
}

def diffusion(theta: DenseVector[Double], x: Double) = {
    val g = theta(2)
    g
}

def diffusionV(theta: DenseVector[Double], x: DenseVector[Double]) = {
    val g = (DenseVector.ones[Double](x.size))*theta(2)
    g
}

def firststep(theta: DenseVector[Double], y: Double) = {
    val mu = y + h*drift(theta,y)
    val sigma = math.abs(diffusion(theta,y))*math.sqrt(h)
    val g = Gaussian(mu,sigma)
    g.pdf(grid)
}

// pdf with vectors of means and variances, evaluated at one grid pt
def ourgauss(theta: DenseVector[Double], x: Double, y: DenseVector[Double]) = {
    val mu = y + h*driftV(theta,y)
    val thisdiff = diffusionV(theta,y)
    val sigma2 = (thisdiff :*= thisdiff)*h 
    val xmu = x - mu
    val xmu2 = xmu :*= xmu 
    val denom = math.sqrt(2.0*math.Pi*h)*thisdiff.map(math.abs)
    val exparg = -(xmu2 /:/ (2.0*sigma2))
    val pdf = exparg.map(math.exp) /:/ denom
    pdf
}

// generate one row of the "K" matrix
def genonerow(theta: DenseVector[Double], i: Int, gamma: Int) = {
    val j = i - M
    val x = j*k
    val y = k*linspace(j-gamma,j+gamma,(2*gamma+1))
    k*ourgauss(theta,x,y)
}

// intelligent subsetting of a vector
def mysubset(x: DenseVector[Double], i: Int) = {
    if (i < 0) 0.0
    else if (i >= x.size) 0.0
    else x(i)
}

// given a vector of length N, create i-th vector of length 2*gamma+1
def gammawindow(x: DenseVector[Double], i: Int, gamma: Int) = {
    val grange = convert(linspace(i-gamma,i+gamma,(2*gamma+1)),Int)
    val outvec = grange.map(mysubset(x,_:Int))
    outvec
}

// given the parameter vector \theta and propagator
// given two data points, (t_j,x_j) and (t_{j+1},x_{j+1}), this function
// starts off with a delta function centered at x_j at time t_j, and
// steps the density forward in time until t_{j+1}, where it is evaluated
// at the point x_{j+1}
def denevolve(propagator: DenseVector[DenseVector[Double]], theta: DenseVector[Double], txvec: DenseVector[(Double,Double)]) = {
    var px = firststep(theta,txvec(0)._2)
    val T: Double = txvec(1)._1 - txvec(0)._1
    val numsteps: Int = (math.ceil(T/h).toInt) - 2
    for (i <- 1 to numsteps) {
      val allwins = irange.map(gammawindow(px,_:Int,gamma))
      px = propagator dot allwins
    }
    val finalrow = k*ourgauss(theta,txvec(1)._2,grid)
    finalrow dot px
}

// this function takes x and theta and computes the log likelihood
def loglik(theta: DenseVector[Double], txvec: DenseVector[(Double,Double)]) = {
    val trange = convert(linspace(0,txvec.size-2,txvec.size-1),Int)
    val tslices = trange.map(x => txvec(x to (x+1))).toArray
    val tslicesRDD = sc.parallelize(tslices)

    // compute the propagator
    val prop = irange.map(genonerow(theta,_:Int,gamma))

    // apply denevolve to each consecutive pair of time series points
   // val indivdens = tslicesRDD.map(denevolve(prop,theta,_))
    val indivdens = tslices.map(denevolve(prop,theta,_))

    // compute overall log likelihood
    //sum(DenseVector(indivdens.collect()).map(math.log))
   sum(DenseVector(indivdens).map(math.log))
}

// this function computes p(y | x, sigma_eps^2)
def filtlik(tyvec: DenseVector[(Double,Double)], txvec: DenseVector[(Double,Double)], sigeps2: Double) = {
    val y = tyvec.map(x => x._2)
    val x = txvec.map(x => x._2)

    val normcon = -0.5*math.log(2.0*math.Pi*sigeps2)
    val xmy = x - y
    val xmy2 = xmy :*= xmy
    val normmain = -xmy2/(2*sigeps2)
    
    normcon*normmain.size + sum(normmain)
}

// prior for theta
def thetaprior(theta: DenseVector[Double]) = {
    val prior1 = Gaussian(0.5,1)
    val prior2 = Gaussian(2.0,10.0)
    prior1.logPdf(theta(0)) + prior2.logPdf(theta(1))
}

// prior for sigeps2
def sigeps2prior(sigeps2: Double) = {
    val prior = Exponential(1.0)
    prior.logPdf(sigeps2)
}

// auxiliary function that takes two vectors and creates a vector of tuples
def vec2tuples(v1: DenseVector[Double], v2: DenseVector[Double]) = {
    val bigvec = DenseVector.vertcat(v1,v2)
    val txs = v1.size
    val myrange = convert(linspace(0,txs-1,txs),Int)
    myrange.map( i => (bigvec(i),bigvec(i+txs)) )
}

def fulllik(tyvec: DenseVector[(Double,Double)], txvec: DenseVector[(Double,Double)], theta: DenseVector[Double], sigeps2: Double) = {
    var lik = loglik(theta, txvec)
    lik += filtlik(tyvec, txvec, sigeps2)
    lik += thetaprior(theta)
    lik += sigeps2prior(sigeps2)
    lik
}

val xvecproposal = new Gaussian(0.0,0.02)
val thetaproposal = new Gaussian(0.0,0.05)
val sigeps2proposal = new Gaussian(0.0,0.02)
val metro = new Uniform(0,1)

// generate one metropolis sample
// must pass in tyvec, the data
// and also the previous iteration's values for txvec, theta, sigeps2
// and the old likelihood 
def metropolis(tyvec: DenseVector[(Double,Double)], txvec: DenseVector[(Double,Double)], theta: DenseVector[Double], sigeps2: Double, oldlik: Double) = {

    // create proposal
    val xvec = txvec.map(x => x._2)
    val xvecstar = xvec + DenseVector(xvecproposal.sample(xvec.size).toArray)
    val txvecstar = vec2tuples(txvec.map(x=>x._1),xvecstar)
    var thetastar = DenseVector[Double](theta(0),theta(1),math.log(theta(2)))
    thetastar = thetastar + DenseVector(thetaproposal.sample(theta.size).toArray)
    thetastar(2) = 0.25
    val sigeps2star = math.exp(math.log(sigeps2) + sigeps2proposal.sample(1)(0))

    // evaluate likelihood
    val likstar = fulllik(tyvec, txvecstar, thetastar, sigeps2star)

    // accept/reject step
    val u = metro.sample(1)(0)
    val ratio = math.exp(likstar - oldlik)
    if (ratio > u)
        (1,txvecstar,thetastar,sigeps2star,likstar)
    else
        (0,txvec,theta,sigeps2,oldlik)
}

// aux function to output vector to file
def outvec(v: DenseVector[Double], fname: String) = {
    val pw = new PrintWriter(new FileOutputStream(new File(fname),true))
    pw.write(v.foldLeft("")((a,b) => a+b.toString+',').toString.stripSuffix(","))
    pw.write("\n")
    pw.close
    0
}

// mcmc loop
def mcmc(timeseries: DenseVector[(Double,Double)], numsamples: Int) = {
    // initial value of theta
    var theta: DenseVector[Double] = DenseVector(1.0,0.1,0.25)

    // initial value of txvec
    var txvec: DenseVector[(Double,Double)] = timeseries.copy

    // initial value of sigeps2
    var sigeps2: Double = 1.0

    // compute initial likelihood
    var lik = fulllik(timeseries, txvec, theta, sigeps2)

    // initial value of accept/ratio flag
    var accept = 1

    for (i <- 1 to numsamples) {
        // concatenate everything and save to disk
        var everything = DenseVector.vertcat(theta,txvec.map(x => x._2))
        everything = DenseVector.vertcat(everything,DenseVector(sigeps2))
        everything = DenseVector.vertcat(everything,DenseVector(lik))
        everything = DenseVector.vertcat(everything,DenseVector(accept))
        val tmp = outvec(everything,"mcmc.out")

        // take a metropolis step
        val metrostep = metropolis(timeseries, txvec, theta, sigeps2, lik)
        accept = metrostep._1
        if (accept == 1) {
            txvec = metrostep._2
            theta = metrostep._3
            sigeps2 = metrostep._4
            lik = metrostep._5
        }
    }
    0
    println("Finished")
}



h: Double = 0.02
k: Double = 0.053182958969449884
M: Int = 256
grid: breeze.linalg.DenseVector[Double] = DenseVector(-13.61483749617917, -13.561654537209721, -13.50847157824027, -13.45528861927082, -13.402105660301372, -13.34892270133192, -13.295739742362471, -13.24255678339302, -13.189373824423571, -13.136190865454122, -13.08300790648467, -13.029824947515221, -12.976641988545772, -12.923459029576321, -12.870276070606872, -12.817093111637423, -12.763910152667972, -12.710727193698522, -12.657544234729073, -12.604361275759622, -12.551178316790173, -12.497995357820722, -12.444812398851273, -12.391629439881823, -12.338446480912372, -12.285263521942923, -12.232080562973474, -12.178897604004023, -12.125714645034574, -12.072531686065124, -12.019348727095673, -11.966165768126224, -11.9129828091...


In [4]:
// fake data created in R
import scala.io.Source
val tvecarr = Source.fromFile("tvec.csv").getLines.map(_.split(",")).toArray
val t = DenseVector[Double](tvecarr(0).map(_.toDouble))
val yvecarr = Source.fromFile("xvec.csv").getLines.map(_.split(",")).toArray
val y = DenseVector[Double](yvecarr(0).map(_.toDouble))

val timeseries = vec2tuples(t,y)

// run mcmc
val mcmcout = mcmc(timeseries,1000)

//System.exit(0)



import scala.io.Source
tvecarr: Array[Array[String]] = Array(Array(0, 0.2, 0.4, 0.6, 0.8, 1, 1.2, 1.4, 1.6, 1.8, 2, 2.2, 2.4, 2.6, 2.8, 3, 3.2, 3.4, 3.6, 3.8, 4, 4.2, 4.4, 4.6, 4.8, 5, 5.2, 5.4, 5.6, 5.8, 6, 6.2, 6.4, 6.6, 6.8, 7, 7.2, 7.4, 7.6, 7.8, 8, 8.2, 8.4, 8.6, 8.8, 9, 9.2, 9.4, 9.6, 9.8, 10, 10.2, 10.4, 10.6, 10.8, 11, 11.2, 11.4, 11.6, 11.8, 12, 12.2, 12.4, 12.6, 12.8, 13, 13.2, 13.4, 13.6, 13.8, 14, 14.2, 14.4, 14.6, 14.8, 15, 15.2, 15.4, 15.6, 15.8, 16, 16.2, 16.4, 16.6, 16.8, 17, 17.2, 17.4, 17.6, 17.8, 18, 18.2, 18.4, 18.6, 18.8, 19, 19.2, 19.4, 19.6, 19.8, 20, 20.2, 20.4, 20.6, 20.8, 21, 21.2, 21.4, 21.6, 21.8, 22, 22.2, 22.4, 22.6, 22.8, 23, 23.2, 23.4, 23.6, 23.8, 24, 24.2, 24.4, 24.6, 24.8, 25, 25.2, 25.4, 25.6, 25.8, 26, 26.2, 26.4, 26.6, 26.8, 27, 27.2, 27.4, 27.6, 27...
