In [1]:
import findspark
findspark.init()

In [2]:
%matplotlib inline

import matplotlib.pyplot as plt
import time

from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline, PipelineModel
from pyspark.sql.types import (StructType, StructField,
    StringType, LongType, IntegerType, DoubleType)
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
    .master('spark://wolf-130:7077') \
    .appName('PLAsTiCC') \
    .config('spark.executor.instances', '36') \
    .config('spark.executor.cores','5') \
    .config('spark.executor.memory', '15g') \
    .config('spark.driver.memory','5g') \
    .config('spark.sql.shuffle.partitions', '180') \
    .config('spark.default.parallelism', '180') \
    .config('spark.files.maxPartitionBytes', '256m') \
    .config('spark.sql.repl.eagerEval.enabled', 'true') \
    .config('spark.sql.execution.arrow.enabled', 'true') \
    .config('spark.serializer','org.apache.spark.serializer.KryoSerializer') \
    .config('spark.hadoop.fs.daos.pool.uuid', '110b0404-c5bf-4c1c-adf5-a39cd7deb8d0') \
    .config('spark.hadoop.fs.daos.container.uuid', '110b0404-c5bf-4c1c-adf5-a39cd7deb8d0') \
    .config('spark.hadoop.fs.daos.impl', 'org.apache.hadoop.fs.daos.DaosFileSystem') \
    .getOrCreate()
#    .config('spark.driver.extraClassPath', '/home/daos/demo/install/daos-spark/hadoop-daos-1.0-SNAPSHOT.jar') \
#    .config('spark.executor.extraClassPath', '/home/daos/demo/install/daos-spark/hadoop-daos-1.0-SNAPSHOT.jar') \
#    .config('spark.jars', 'file:///home/daos/demo/install/daos-spark/hadoop-daos-1.0-SNAPSHOT.jar,file:///home/daos/demo/install/daos-spark/daos_jni-1.0-SNAPSHOT.jar') \
#    .config('spark.repl.local.jars', 'file:///home/daos/demo/install/daos-spark/hadoop-daos-1.0-SNAPSHOT.jar,file:///home/daos/demo/install/daos-spark/daos_jni-1.0-SNAPSHOT.jar') \
    
sc = spark.sparkContext
#sc.addPyFile('/home/daos/demo/install/daos-spark/hadoop-daos-1.0-SNAPSHOT.jar')
#sc.addPyFile('/home/daos/demo/install/daos-spark/daos_jni-1.0-SNAPSHOT.jar')

In [4]:
feature_schema = StructType([
    StructField("amplitude_u", DoubleType()),
    StructField("amplitude_g", DoubleType()),
    StructField("amplitude_r", DoubleType()),
    StructField("amplitude_i", DoubleType()),
    StructField("amplitude_z", DoubleType()),
    StructField("amplitude_Y", DoubleType()),
    StructField("percent_beyond_1_std_u", DoubleType()),
    StructField("percent_beyond_1_std_g", DoubleType()),
    StructField("percent_beyond_1_std_r", DoubleType()),
    StructField("percent_beyond_1_std_i", DoubleType()),
    StructField("percent_beyond_1_std_z", DoubleType()),
    StructField("percent_beyond_1_std_Y", DoubleType()),
    StructField("maximum_u", DoubleType()),
    StructField("maximum_g", DoubleType()),
    StructField("maximum_r", DoubleType()),
    StructField("maximum_i", DoubleType()),
    StructField("maximum_z", DoubleType()),
    StructField("maximum_Y", DoubleType()),
    StructField("max_slope_u", DoubleType()),
    StructField("max_slope_g", DoubleType()),
    StructField("max_slope_r", DoubleType()),
    StructField("max_slope_i", DoubleType()),
    StructField("max_slope_z", DoubleType()),
    StructField("max_slope_Y", DoubleType()),
    StructField("median_u", DoubleType()),
    StructField("median_g", DoubleType()),
    StructField("median_r", DoubleType()),
    StructField("median_i", DoubleType()),
    StructField("median_z", DoubleType()),
    StructField("median_Y", DoubleType()),
    StructField("median_absolute_deviation_u", DoubleType()),
    StructField("median_absolute_deviation_g", DoubleType()),
    StructField("median_absolute_deviation_r", DoubleType()),
    StructField("median_absolute_deviation_i", DoubleType()),
    StructField("median_absolute_deviation_z", DoubleType()),
    StructField("median_absolute_deviation_Y", DoubleType()),
    StructField("percent_close_to_median_u", DoubleType()),
    StructField("percent_close_to_median_g", DoubleType()),
    StructField("percent_close_to_median_r", DoubleType()),
    StructField("percent_close_to_median_i", DoubleType()),
    StructField("percent_close_to_median_z", DoubleType()),
    StructField("percent_close_to_median_Y", DoubleType()),
    StructField("minimum_u", DoubleType()),
    StructField("minimum_g", DoubleType()),
    StructField("minimum_r", DoubleType()),
    StructField("minimum_i", DoubleType()),
    StructField("minimum_z", DoubleType()),
    StructField("minimum_Y", DoubleType()),
    StructField("skew_u", DoubleType()),
    StructField("skew_g", DoubleType()),
    StructField("skew_r", DoubleType()),
    StructField("skew_i", DoubleType()),
    StructField("skew_z", DoubleType()),
    StructField("skew_Y", DoubleType()),
    StructField("std_u", DoubleType()),
    StructField("std_g", DoubleType()),
    StructField("std_r", DoubleType()),
    StructField("std_i", DoubleType()),
    StructField("std_z", DoubleType()),
    StructField("std_Y", DoubleType()),
    StructField("weighted_average_u", DoubleType()),
    StructField("weighted_average_g", DoubleType()),
    StructField("weighted_average_r", DoubleType()),
    StructField("weighted_average_i", DoubleType()),
    StructField("weighted_average_z", DoubleType()),
    StructField("weighted_average_Y", DoubleType()),
    StructField("mwebv", DoubleType()),
    StructField("z", DoubleType()),
    StructField("zerr", DoubleType()),
    StructField("object_id", LongType()),
    StructField("label", LongType())
])


In [5]:
textFile = spark.read.text("daos:///test.txt")
textFile.show()

+-----+
|value|
+-----+
|Hello|
| DAOS|
+-----+



In [6]:
model = PipelineModel.load("daos:///plasticc.model")
testData = spark.read.schema(feature_schema).option("header", "true").csv('daos:///test_feats_label')

assembler = VectorAssembler(
    inputCols=["amplitude_u", "amplitude_g", "amplitude_r", "amplitude_i", "amplitude_z", "amplitude_Y",
               "percent_beyond_1_std_u", "percent_beyond_1_std_g", "percent_beyond_1_std_r", "percent_beyond_1_std_i", "percent_beyond_1_std_z", "percent_beyond_1_std_Y",
               "maximum_u", "maximum_g", "maximum_r", "maximum_i", "maximum_z", "maximum_Y",
               "max_slope_u", "max_slope_g", "max_slope_r", "max_slope_i", "max_slope_z", "max_slope_Y",
               "median_u", "median_g", "median_r", "median_i", "median_z", "median_Y",
               "median_absolute_deviation_u", "median_absolute_deviation_g", "median_absolute_deviation_r", "median_absolute_deviation_i", "median_absolute_deviation_z", "median_absolute_deviation_Y",
               "percent_close_to_median_u", "percent_close_to_median_g", "percent_close_to_median_r", "percent_close_to_median_i", "percent_close_to_median_z", "percent_close_to_median_Y",
               "minimum_u", "minimum_g", "minimum_r", "minimum_i", "minimum_z", "minimum_Y",
               "skew_u", "skew_g", "skew_r", "skew_i", "skew_z", "skew_Y",
               "std_u", "std_g", "std_r", "std_i", "std_z", "std_Y",
               "weighted_average_u", "weighted_average_g", "weighted_average_r", "weighted_average_i", "weighted_average_z", "weighted_average_Y"],
    outputCol="features")

assembledTestData = assembler.transform(testData).select("label", "features")

start_time = time.time()
predictions = model.transform(assembledTestData)
numItems = predictions.count()
duration = time.time() - start_time
print(f"DAOS: Finished predicting {numItems} items in {duration} seconds")

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, 10.8.1.131, executor 31): java.io.FileNotFoundException: File does not exist: /plasticc.model/metadata/part-00000
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1847)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1819)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1733)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:366)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2211)

	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
	at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
	at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228)
	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
	at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
	at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
	at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
	at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
	at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:109)
	at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:267)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:266)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:224)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:95)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /plasticc.model/metadata/part-00000
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1847)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1819)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1733)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:366)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2211)

	at org.apache.hadoop.ipc.Client.call(Client.java:1475)
	at org.apache.hadoop.ipc.Client.call(Client.java:1412)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
	at com.sun.proxy.$Proxy18.getBlockLocations(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy19.getBlockLocations(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
	... 33 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File does not exist: /plasticc.model/metadata/part-00000
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1847)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1819)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1733)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:366)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2211)

	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
	at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
	at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228)
	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
	at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
	at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
	at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
	at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
	at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:109)
	at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:267)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:266)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:224)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:95)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /plasticc.model/metadata/part-00000
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1847)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1819)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1733)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:366)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2211)

	at org.apache.hadoop.ipc.Client.call(Client.java:1475)
	at org.apache.hadoop.ipc.Client.call(Client.java:1412)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
	at com.sun.proxy.$Proxy18.getBlockLocations(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy19.getBlockLocations(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
	... 33 more


In [5]:
model = PipelineModel.load("/PLAsTiCC/plasticc.model")
testData = spark.read.schema(feature_schema).option("header", "true").csv('/PLAsTiCC/test_feats_label')

assembler = VectorAssembler(
    inputCols=["amplitude_u", "amplitude_g", "amplitude_r", "amplitude_i", "amplitude_z", "amplitude_Y",
               "percent_beyond_1_std_u", "percent_beyond_1_std_g", "percent_beyond_1_std_r", "percent_beyond_1_std_i", "percent_beyond_1_std_z", "percent_beyond_1_std_Y",
               "maximum_u", "maximum_g", "maximum_r", "maximum_i", "maximum_z", "maximum_Y",
               "max_slope_u", "max_slope_g", "max_slope_r", "max_slope_i", "max_slope_z", "max_slope_Y",
               "median_u", "median_g", "median_r", "median_i", "median_z", "median_Y",
               "median_absolute_deviation_u", "median_absolute_deviation_g", "median_absolute_deviation_r", "median_absolute_deviation_i", "median_absolute_deviation_z", "median_absolute_deviation_Y",
               "percent_close_to_median_u", "percent_close_to_median_g", "percent_close_to_median_r", "percent_close_to_median_i", "percent_close_to_median_z", "percent_close_to_median_Y",
               "minimum_u", "minimum_g", "minimum_r", "minimum_i", "minimum_z", "minimum_Y",
               "skew_u", "skew_g", "skew_r", "skew_i", "skew_z", "skew_Y",
               "std_u", "std_g", "std_r", "std_i", "std_z", "std_Y",
               "weighted_average_u", "weighted_average_g", "weighted_average_r", "weighted_average_i", "weighted_average_z", "weighted_average_Y"],
    outputCol="features")

assembledTestData = assembler.transform(testData).select("label", "features")


Finished predicting 3492838 items in 30.789774656295776 seconds


In [49]:
sc.stop()