In [7]:
import sys
sys.path.append('../code/python')

import findspark
import pyspark

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import time

In [8]:
! rm -r metastore_db

rm: cannot remove ‘metastore_db’: No such file or directory


In [9]:
findspark.init('/users/snakanda/vista/spark-2.2.0-bin-hadoop2.7')

sc = pyspark.SparkContext(appName="vista")

In [10]:
from vista import Vista

  from ._conv import register_converters as _register_converters
  from . import h5a, h5d, h5ds, h5f, h5fd, h5g, h5r, h5s, h5t, h5p, h5z
  from .. import h5, h5g, h5i, h5o, h5r, h5t, h5l, h5p, h5s, h5d


In [11]:
sc

In [69]:
def downstream_ml_func(features_df, results_dict, layer_index):
    lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=50, regParam=0.5)
    train_df, test_df = features_df.randomSplit([0.8, 0.2], seed=2019)
    model = lr.fit(train_df)
    predictions = model.transform(test_df)
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                                  metricName="accuracy")
    results_dict[layer_index] = evaluator.evaluate(predictions)
    return results_dict

prev_time = time.time()
# mem_sys_rsv is an optional parameter. If not set a default value of 3 will be used.
vista = Vista("vista-example", 150, 8, 1, 'alexnet', 3, 0, downstream_ml_func,
                  '/users/snakanda/vista/data/foods/foods.csv',
                  '/users/snakanda/vista/data/foods/images',
                  20129, 130, mem_sys_rsv=3)

print(vista.run())
print("Runtime (min): " + str((time.time()-prev_time)/60.0))

Vista Configs(join, cpu, np, heap, f_core, pers): b, 7, 238, 133, 0.972117558402, deser
{-1: 0.821356783919598, -3: 0.8329145728643216, -2: 0.8261306532663316}
Runtime (min): 22.1466980338


### Structured Features Only

In [70]:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import array, col
from pyspark.sql.types import StringType, FloatType, IntegerType
from pyspark.sql import functions as F

toDense = F.udf(lambda v: Vectors.dense(v.toArray()), VectorUDT())

def get_struct_df(sc, data_file_path):
    sql_context = pyspark.SQLContext(sc)
    struct_df = sql_context.read.format('csv').options(header='false').load(data_file_path)
    col_names = struct_df.schema.names
    for col_name in col_names[1:-1]:
        struct_df = struct_df.withColumn(col_name, struct_df[col_name].cast(FloatType()))
        
    struct_df = VectorAssembler(inputCols=col_names[1:-1], outputCol="features").transform(struct_df)
    struct_df = struct_df.withColumn("features", toDense("features"))
    struct_df = struct_df.withColumn("id", struct_df[col_names[0]].cast(StringType())) \
        .withColumn("label", struct_df[col_names[-1]].cast(IntegerType())) \
        .select("id", "features", "label")
    return struct_df

In [71]:
features_df = get_struct_df(sc, '/users/snakanda/vista/data/foods/foods.csv')

In [72]:
print(downstream_ml_func(features_df, {}, 0))

{0: 0.8067371526924023}


In [5]:
sc.stop()