In [None]:
%Truncation on
%AddDeps org.scalanlp breeze_2.12 1.0-RC2 --transitive --trace --verbose
%AddDeps org.scalanlp breeze-natives_2.12 1.0-RC2 --transitive --trace --verbose
%AddDeps org.scalanlp breeze-viz_2.12 1.0-RC2 --transitive --trace --verbose
%ShowTypes on

In [None]:
import kernel_lib._

In [None]:
import com.github.fommil.netlib.LAPACK.{getInstance => lapack}

## kernel func

In [None]:
def kernel_func (
    x:breeze.linalg.DenseVector[Double], 
    y:breeze.linalg.DenseVector[Double]
):Double = x.t * y

# Spark Data Load

In [None]:
import org.apache.spark.{sql => s_sql}

In [None]:
val csv_to_label_features_udf = s_sql.functions.udf (
    (row:String) => {
        val label_features = row.split (',')
        
        (label_features (0).toDouble.toLong, label_features.slice (1, row.length).iterator.map (_.toDouble).toSeq)
    }:(Long, Seq[Double])
)

In [None]:
val max_per_P = 2000

val path = "hdfs://thesis-tiny-python3-anaconda-m/user/<your username>/"

In [None]:
var df_susy_raw = spark.read.format (
    "text"
).load (path + "SUSY.csv.gz")
.filter (
    !$"value".contains ("MET_rel")
).withColumn (
    "label_features", csv_to_label_features_udf ($"value")
).select (
    $"label_features._1".as ("label"),
    $"label_features._2".as ("features")
).withColumn (
    "id", s_sql.functions.monotonically_increasing_id
)

df_susy_raw = df_susy_raw.repartition (df_susy_raw.count ().toInt / max_per_P)

df_susy_raw.rdd.getNumPartitions

In [None]:
var df_higgs = spark.read.format (
    "text"
).load (path + "HIGGS.csv.gz")
.filter (
    !$"value".contains ("m_jj")
).withColumn (
    "label_features", csv_to_label_features_udf ($"value")
).select (
    $"label_features._1".as ("label"),
    $"label_features._2".as ("features")
).withColumn (
    "id", s_sql.functions.monotonically_increasing_id
)

df_higgs_raw = df_higgs_raw.repartition (df_higgs_raw.count ().toInt / max_per_P)

df_higgs_raw.rdd.getNumPartitions

In [None]:
var df_digits_raw = spark.read.format (
    "parquet"
).load (path + "digits.snappy.parquet").withColumn (
    "features", $"features".cast ("ARRAY<DOUBLE>")
).withColumn (
    "id", s_sql.functions.monotonically_increasing_id
)

df_digits_raw = df_digits_raw.repartition (df_digits_raw.count ().toInt / max_per_P)

df_digits_raw.rdd.getNumPartitions

# Nystrom algorithm

## Features transform

```scala
val image_dim = scala.math.sqrt (
    df_digits_raw.select (
        s_sql.functions.size ($"features")
    ).map (_.getInt (0)).take (1)(0)
).toInt

// window is centered

val window_x = 7
val window_y = 7
val skip_x = 3
val skip_y = 3

def max_pool (
    I:breeze.linalg.DenseMatrix[Double]
):breeze.linalg.DenseMatrix[Double] = {
    // column major implies image will be
    val I_pooled = new breeze.linalg.DenseMatrix[Double](
        (I.rows - window_y) / skip_y, 
        (I.cols - window_x) / skip_x
    )
    
    for (
        (i, im, ip) <- (0 until I_pooled.rows).map (
            (v:Int) => (v, skip_y * v, skip_y * v + window_y)
        );
        (j, jm, jp) <- (0 until I_pooled.cols).map (
            (v:Int) => (v, skip_x * v, skip_x * v + window_x)
        )
    ) {
        I_pooled (i, j) = breeze.linalg.max (I (im until ip, jm until jp))
    }
    
    I_pooled
}

val max_pool_udf = s_sql.functions.udf (
    (x:Seq[Double]) => {
        max_pool (
            new breeze.linalg.DenseMatrix (image_dim, image_dim, x.toArray)
        ).data.toSeq
    }
)
```

## Data set

In [None]:
val df_data = df_digits_raw.select (
    LinalgUtils.normalize_udf (
        $"features"
    ).as ("nfeatures"), $"label", $"id"
).persist

// mnist
val df_data = df_digits_raw.select (
    LinalgUtils.normalize_udf (
        $"features"
    ).as ("nfeatures"), $"label", $"id"
).persist

In [None]:
val n = df_data.count ().toInt
val rows = df_data.select (
    s_sql.functions.size ($"nfeatures")
).map (_.getInt (0)).take (1)(0)

## Nystrom Loop

```scala
val kappa = 1.0
val t = .3
val base_lamb = kappa * kappa / scala.math.min (t, 1.0)
val q = 1.5
// target lambda is 1 / sqrt (n)
val H = (scala.math.log (base_lamb * scala.math.sqrt (n)) / scala.math.log (q)).toInt
val delta = .9

val q2 = 54 * (kappa * kappa) * ((2 + t * t) / (t * t)) * scala.math.log  (12 * H * n / delta)

val max_rows = 5000

val base_beta = scala.math.min (q2 * kappa * kappa / (base_lamb * n), 1.0)

var lambda_h = base_lamb
var beta_h = base_beta

//step 0, i.e. initialization step
var df_sample_set_h:org.apache.spark.sql.Dataset[
    org.apache.spark.sql.Row
] = df_data.select ("nfeatures").sample (beta_h).withColumn (
    "ph_cap",
    LinalgUtils.cap_to_one_udf (
        // first round does not have a kpack
        // with most kernels, kinda meaningless...
        KernelPack.knorm_factory (kernel_func (_,_))($"nfeatures").multiply (q2)
    )
).filter (
    s_sql.functions.rand ().multiply (beta_h) <= $"ph_cap"
).orderBy ("ph_cap").limit (max_rows)

// sample length
// runs most of the computation, lol
var cols = df_sample_set_h.count ().toInt

val kpack_h = KernelPack (
    // X
    new breeze.linalg.DenseMatrix (
        rows, cols, 
        df_sample_set_h.select ("nfeatures").flatMap {
            case row: s_sql.Row => row.getSeq[Double](0)
        }.collect ().toArray
    ), 
    // A
    new breeze.linalg.DenseVector (
        df_sample_set_h.select ("ph_cap").map {
            case row: s_sql.Row => row.getDouble(0)
        }.collect ().toArray
    ), 
    // lambda * n
    lambda_h * n, 
    kernel_func (_,_)
)

lambda_h = base_lamb
beta_h = base_beta

for (i <- 1 until H) {
    println ("----------------------------------------------")
    println (f"Step: ${i}")
    println ("----------------------------------------------")
    // step i
    // lambda and beta updates
    lambda_h /= q
    beta_h = scala.math.min (q2 * kappa * kappa / (lambda_h * n), 1.0)
    
    println (f"lambda_h: ${lambda_h}, beta_h: ${beta_h}")
    println ("----------------------------------------------")
    
    // new sample
    df_sample_set_h = df_data.select ("nfeatures").sample (beta_h).withColumn (
        "ph_cap",
        LinalgUtils.cap_to_one_udf (kpack_h.kleverage_udf ($"nfeatures").multiply (q2))
    ).filter (
        s_sql.functions.rand ().multiply (beta_h) <= $"ph_cap"
    ).orderBy (s_sql.functions.negate ($"ph_cap")).limit (max_rows)

    // sample length
    // runs most of the computation, lol
    cols = df_sample_set_h.count ().toInt

    // kernel udfs update
    kpack_h.update (
        // X
        new breeze.linalg.DenseMatrix (
            rows, cols, 
            df_sample_set_h.select ("nfeatures").flatMap {
                case row: s_sql.Row => row.getSeq[Double](0)
            }.collect ().toArray
        ), 
        // A
        new breeze.linalg.DenseVector (
            df_sample_set_h.select ("ph_cap").map {
                case row: s_sql.Row => row.getDouble(0)
            }.collect ().toArray
        ), 
        // lambda * n
        lambda_h * n
    )
    
    println (f"cols: ${cols}")
    println ("----------------------------------------------")
    
    // global leverage score
    df_data.select (
        kpack_h.kleverage_udf ($"nfeatures").as ("ph")
    ).agg (s_sql.functions.sum ($"ph")).show
}
```

## Nystrom spaces loop

In [None]:
val kappa = 1.0
val t = 1.0
val base_lamb = kappa * kappa / scala.math.min (t, 1.0)
val q = 2
// target lambda is 1 / n
val H = (scala.math.log (base_lamb * n) / scala.math.log (q)).toInt
val delta = .9

// val q2 = 54 * (kappa * kappa) * ((2 + t * t) / (t * t)) * scala.math.log  (12 * H * n / delta)
val q2 = (kappa * kappa) * ((2 + t * t) / (t * t)) * scala.math.log  (12 * H * n / delta)

val max_rows = 1000
val max_spaces = 5
val max_non_improv = 10

val base_beta = scala.math.min (q2 * kappa * kappa / (base_lamb), 1.0)

In [None]:
var lambda_h = base_lamb
var beta_h = base_beta

//step 0, i.e. initialization step
var df_sample_set_h:org.apache.spark.sql.Dataset[
    org.apache.spark.sql.Row
] = df_data.select ($"nfeatures", $"id").sample (beta_h).withColumn (
    "ph_cap",
    LinalgUtils.cap_to_one_udf (
        // first round does not have a kpack
        // with most kernels, kinda meaningless...
        KernelPack.knorm_factory (kernel_func (_,_))($"nfeatures").multiply (scala.math.log  (12 * H * n / delta))
    )
).filter (
    s_sql.functions.rand () <= LinalgUtils.cap_to_one_udf ($"ph_cap")
).orderBy (s_sql.functions.negate ($"ph_cap")).limit (max_rows).persist

// sample length
var cols = df_sample_set_h.count ().toInt

val kspaces_h = KernelSpaces (
    // X
    new breeze.linalg.DenseMatrix (
        rows, cols, 
        df_sample_set_h.select ("nfeatures").flatMap {
            case row: s_sql.Row => row.getSeq[Double](0)
        }.collect ().toArray
    ), 
    // A
    new breeze.linalg.DenseVector (
        df_sample_set_h.select (
            LinalgUtils.cap_to_one_udf ($"ph_cap")
        ).map {
            case row: s_sql.Row => row.getDouble(0)
        }.collect ().toArray
    ),
    // X_ids
    df_sample_set_h.select (
        $"id"
    ).map {
        case row: s_sql.Row => row.getLong(0)
    }.collect (),
    // lambda * n
    lambda_h,// * n, 
    kernel_func (_,_),
    max_spaces
)

df_sample_set_h = df_sample_set_h.unpersist (true)

In [None]:
var non_improv_count = 0
var min_leverage:Double = df_data.agg (
    s_sql.functions.sum (kspaces_h.min_kleverage_udf () ($"nfeatures").as ("best_ph"))
).map {
    case row: s_sql.Row => row.getDouble(0)
}.collect ()(0)

var current_leverage:Double = 0.0

var best_Xs = kspaces_h.Xs
var best_sqrt_GXs = kspaces_h.sqrt_GXs
var best_regularizer_diags = kspaces_h.regularizer_diags
var best_ids = kspaces_h.Xs_ids

var i = 1

In [None]:
i = 1
lambda_h = base_lamb

devise map partitions strategy by using mapPartitions...

In [None]:
while (non_improv_count < max_non_improv) {
    println ("----------------------------------------------")
    println (f"i: ${i}")
    println ("----------------------------------------------")
    
    // step i
    // lambda and beta updates
    
    if (i < H) {
        lambda_h /= q
    }

    beta_h = scala.math.min (q2 * kappa * kappa / (lambda_h), 1.0)
    
    println (f"lambda_h: ${lambda_h}, beta_h: ${beta_h}")
    println ("----------------------------------------------")
    
    // removal of subspaces when array gets full
    if (kspaces_h.Xs_length <= i) {
        println (f"removed a X at i: ${(i - 1) % (kspaces_h.Xs_length - 1)}")
        println ("----------------------------------------------")
        kspaces_h.remove_id ((i - 1) % (kspaces_h.Xs_length - 1))
    }
    
    kspaces_h.update_lambda (
        // lambda * n
        lambda_h// * n
    )
    
    // new sample
    df_sample_set_h = df_data.select ($"nfeatures", $"id").sample (beta_h).withColumn (
        "ph_cap",
        kspaces_h.min_kleverage_udf () ($"nfeatures").multiply (scala.math.log  (12 * H * n / delta))
    ).filter (
        s_sql.functions.rand () <= LinalgUtils.cap_to_one_udf ($"ph_cap")
    ).orderBy (s_sql.functions.negate ($"ph_cap")).limit (max_rows).persist

    // sample length
    cols = df_sample_set_h.count ().toInt

    // kernel udfs update
    kspaces_h.append (
        // X
        new breeze.linalg.DenseMatrix (
            rows, cols, 
            df_sample_set_h.select ("nfeatures").flatMap {
                case row: s_sql.Row => row.getSeq[Double](0)
            }.collect ().toArray
        ), 
        // A
        new breeze.linalg.DenseVector (
            df_sample_set_h.select (
                LinalgUtils.cap_to_one_udf ($"ph_cap")
            ).map {
                case row: s_sql.Row => row.getDouble(0)
            }.collect ().toArray
        ),
        // X_ids
        df_sample_set_h.select (
            $"id"
        ).map {
            case row: s_sql.Row => row.getLong(0)
        }.collect ()
    )
    
    println (f"cols: ${cols}")
    println ("----------------------------------------------")
    
    df_sample_set_h = df_sample_set_h.unpersist (true)
    
    println ("Cross leverages between the Xis")
    for (
        k <- 0 until kspaces_h.Xs_count
    ) {
        print (k)
        print ("    ")

        for (l <- 0 until kspaces_h.Xs_count) {
            print (f"""${
                breeze.linalg.sum (KernelUtils.kernel_leverages (
                    kspaces_h.Xs ()(k),
                    kspaces_h.Xs ()(l),
                    kspaces_h.sqrt_GXs ()(k),
                    kspaces_h.kernel_func
                ))
            }%.2f""")
            print (" | ")
        }

        print ("\n")
    }
    
    println ("----------------------------------------------")
    
    current_leverage = df_data.agg (
        s_sql.functions.sum (kspaces_h.min_kleverage_udf () ($"nfeatures").as ("best_ph"))
    ).map {
        case row: s_sql.Row => row.getDouble(0)
    }.collect ()(0)
    
    println (f"current leverage sum: ${current_leverage}, current best ${min_leverage}")
    println ("----------------------------------------------")
    
    if (current_leverage - min_leverage < -1) {
        println (f"improvement happend!")
        println ("----------------------------------------------")
        min_leverage = current_leverage
        
        // makes so that it stops when 
        // no improvement happens
        // max_non_improv times
        if (non_improv_count > 0) {
            non_improv_count -= 1
        }
        
        best_Xs = kspaces_h.Xs
        best_sqrt_GXs = kspaces_h.sqrt_GXs
        best_regularizer_diags = kspaces_h.regularizer_diags
        best_ids = kspaces_h.Xs_ids
    }
    else {
        non_improv_count += 1
        println (f"current non-improvement: ${non_improv_count}")
        println ("----------------------------------------------")
    }
    
    i += 1
}

# Best's curation

In [None]:
best_Xs.foreach {
    case aX => println (aX.cols)
}

In [None]:
for (i <- 0 until best_Xs.length) {
    print (i)
    print ("    ")

    for (j <- 0 until best_Xs.length) {
        print (f"""${
            breeze.linalg.sum (KernelUtils.kernel_leverages (
                best_Xs (i),
                best_Xs (j),
                best_sqrt_GXs (i),
                kspaces_h.kernel_func
            ))
        }%.2f""")
        print (" | ")
    }

    print ("\n")
}

## Label spaces, then calssification via spaces

In [None]:
val chosen_ids = Set (best_ids.flatMap {case a => a}:_*)

In [None]:
val is_chosen_udf = s_sql.functions.udf (
    (id:Long) => { chosen_ids (id) }
)

In [None]:
val df_chosen = df_data.filter (is_chosen_udf ($"id"))

In [None]:
df_chosen.groupBy ($"label").count ().show

test classification of subset and evaluate generalization error

In [None]:
val df_split_data = df_chosen.groupBy ($"label").agg (
    s_sql.functions.collect_list ($"nfeatures").as ("nfeatures"),
    s_sql.functions.count ($"nfeatures")
)

In [None]:
val labels_features = df_split_data.map {
    case row:s_sql.Row => (row.getLong (0).toInt, row.getSeq[Seq[Double]] (1).flatten, row.getLong (2).toInt)
}.collect ()

In [None]:
val labels_arr = labels_features.map (_._1).toSeq
val features_arr = labels_features.map {
    case (_, aX, aX_cols) => new breeze.linalg.DenseMatrix (rows, aX_cols, aX.toArray)
}.toSeq

val sqrt_Gfeatures_arr = features_arr.map {
    case aX => KernelUtils.eval_chol (
        aX, breeze.linalg.DenseVector.ones[Double] (aX.cols), 
        1.0 / scala.math.sqrt (n), kernel_func
    )
}

val labeled_spaces = labels_arr.zip (features_arr).zip (sqrt_Gfeatures_arr)

In [None]:
val buffer_len = features_arr.map (_.cols).max

val max_proj_classifier_udf = org.apache.spark.sql.functions.udf (
    (y:Seq[Double]) => {
        val dense_y = new breeze.linalg.DenseVector (y.toArray)
        val buff_y_proj = new breeze.linalg.DenseVector[Double] (buffer_len)

        val norm = KernelUtils.kernel_norm (
            dense_y, kernel_func
        )

        var best_proj:Double = 0.0
        var best_label:Long = 0
        var curr_proj:Double = 0.0
        
        for (
            ((label, aX), sqrt_GX) <- labeled_spaces
        ) {
            KernelUtils.buff_kernel_proj (
                aX, dense_y, sqrt_GX, kernel_func,
                buff_y_proj(0 until aX.cols)
            )
            
            curr_proj = buff_y_proj(0 until aX.cols).t * buff_y_proj(0 until aX.cols)
            
            if (curr_proj >= best_proj) {
                best_proj = curr_proj
                best_label = label
            }
        }
        
        (best_label, norm - best_proj)
    }:(Long, Double)
)

In [None]:
df_data.withColumn (
    "pred_pair", max_proj_classifier_udf ($"nfeatures")
).select (
    $"label", $"pred_pair._1".as ("pred_label"), 
    $"pred_pair._2".as ("pred_iscore")
).groupBy (
    $"label".equalTo ($"pred_label").as ("mismatch")
).agg (
    s_sql.functions.count ("*").as ("group_sum"),
    s_sql.functions.sum ($"pred_iscore").as ("group_sum")
).show

## Develop ridge classifier for each subspace, then take majority vote

retrieves labels of each subpace, in the right order

In [None]:
import scala.collection.mutable.Map

import org.apache.spark.{ml => s_ml}

In [None]:
val best_labels = for (some_ids <- best_ids) yield {
    val id_set = Set (some_ids:_*)
    val id_map = some_ids.zip (0 until some_ids.length).toMap
    
    val chosen_udf = s_sql.functions.udf (
        (id:Long) => { id_set (id) }
    )
    
    df_data.filter (
        chosen_udf ($"id")
    ).select ($"label", $"id").map {
        case row:s_sql.Row => (row.getLong (0).toInt, row.getLong (1))
    }.collect ().sortBy {
        case (_, some_id:Long) => id_map (some_id)
    }.map (_._1)
}.toSeq

## Ridge regression based on subspace