In [1]:
import scala.collection.mutable.ListBuffer

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.stat.MultivariateStatisticalSummary
import org.apache.spark.rdd.RDD

import org.ucsd.dse.capstone.traffic._

In [2]:
def do_run(sc: SparkContext, m_string_rdd: RDD[String], fid: String, file_dir_prefix: String, pivot_field: Int) = {
    val handler: PivotHandler = new StandardPivotHandler(sc, pivot_field)
    val m_vector_rdd: RDD[Vector] = MLibUtils.pivot(m_string_rdd, handler)
    //
    // obtain mean vector
    //
    val m_summary_stats: MultivariateStatisticalSummary = MLibUtils.summary_stats(m_vector_rdd)
    val mean_vector = m_summary_stats.mean.toArray
    val mean_filename = file_dir_prefix + "mean_vector." + fid + ".csv"
    MLibUtils.write_vectors(mean_filename, List[Vector](Vectors.dense(mean_vector)))
    //
    // execute PCA
    //
    val m_pca_vector_rdd: RDD[Vector] = m_vector_rdd
    val k = 30
    val (eigenvectors, eigenvalues) = MLibUtils.execute_pca(m_pca_vector_rdd, k)
    //
    // eigenvectors written out as column-major matrix
    //
    val eigenvectors_filename = file_dir_prefix + "eigenvectors." + fid + ".csv"
    MLibUtils.write_matrix(eigenvectors_filename, eigenvectors)
    //
    // eigenvalues written out as one row
    //
    val eigenvalue_filename = file_dir_prefix + "eigenvalues." + fid + ".csv"
    MLibUtils.write_vectors(eigenvalue_filename, List[Vector](eigenvalues))
    //
    // take a sample of 10 vectors
    //
    val sample_arr: Array[Vector] = m_vector_rdd.takeSample(false, 10, 47)
    val sample_filename = file_dir_prefix + "samples." + fid + ".csv"
    MLibUtils.write_vectors(sample_filename, sample_arr)
    //
    // print statements to verify
    //
    println("eigenvectors= " + eigenvectors)
    println("eigenvalues= " + eigenvalues)
    val m_list_buffer = new ListBuffer[Double]()
    val m_eig_arr: Array[Double] = eigenvalues.toArray
    var cum_sum = 0.0
    for (i <- 0 to m_eig_arr.length - 1) {
      cum_sum += m_eig_arr(i)
      m_list_buffer += cum_sum
    }
    println("perc variance explained= " + m_list_buffer)
  }

In [3]:
val files: List[String] = List("/home/conway/temp/d11_text_station_5min_2010_01_01.txt")
val m_string_rdd: RDD[String] = MLibUtils.new_rdd(sc, files, 4)

val m_fields_pca = List[Tuple2[String, Int]](("/tmp/total_flow.", Fields.TotalFlow), ("/tmp/occupancy.", Fields.Occupancy), ("/tmp/speed.", Fields.Speed))

In [4]:
val fid = "01_2010" // hardcode id for now
m_fields_pca.foreach { tuple: Tuple2[String, Int] => 
    val file_dir_prefix = tuple._1
    val pivot_field = tuple._2
    do_run(sc, m_string_rdd, fid, file_dir_prefix, pivot_field)
}

eigenvectors= -0.014179053756036675  -0.03037718619871968   ... (30 total)
-0.013934916121909321  -0.03275430438900051   ...
-0.016034237735640236  -0.055059232655306156  ...
-0.02224021609687439   -0.08981752344420363   ...
-0.024642280667812242  -0.10607302341759699   ...
-0.027795328674961227  -0.11289527138479002   ...
-0.030726817696849183  -0.12283858465522722   ...
-0.03170953796111793   -0.12525142945220372   ...
-0.034189141311152076  -0.12849180780454958   ...
-0.03290663819484187   -0.12416688963085971   ...
-0.03311054703618485   -0.1310980899138751    ...
-0.032277317026352566  -0.1265671031580873    ...
-0.03251217552606689   -0.12425059890120821   ...
-0.03171185468968779   -0.116056644114712     ...
-0.03181839933942967   -0.11452975253348241   ...
-0.03080736298438649   -0.11167488118017743   ...
-0.03142498376606977   -0.10741768894647377   ...
-0.02961351963584656   -0.10614436286162152   ...
-0.028937492493774763  -0.10563617832550512   ...
-0.028048169225921703  -0