Skip to content
This repository has been archived by the owner on Nov 8, 2018. It is now read-only.

MNIST with spark standalone cluster mode #37

Closed
nagamanojk opened this issue Oct 11, 2017 · 14 comments
Closed

MNIST with spark standalone cluster mode #37

nagamanojk opened this issue Oct 11, 2017 · 14 comments

Comments

@nagamanojk
Copy link

@JoeriHermans

Hi Joeri,

I tried to run one of my experiment with pysprak standalone cluster mode, with 3 workers.
I'm getting an connectionRefused. error to the worker.
Is this expected?

ee207437@pcg-ee207437-1:/usr/lib/spark$ ./bin/spark-submit --master spark://10.51.5.40:7077 examples/src/main/python/gtzanKeras.py gtzan.parquet
Using TensorFlow backend.
17/10/11 14:35:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
root
|-- features_normalized: vector (nullable = true)
|-- label_index: double (nullable = true)
|-- label: array (nullable = true)
| |-- element: double (containsNull = true)


Layer (type) Output Shape Param #

dense_1 (Dense) (None, 40) 1240


activation_1 (Activation) (None, 40) 0


dropout_1 (Dropout) (None, 40) 0


dense_2 (Dense) (None, 15) 615


activation_2 (Activation) (None, 15) 0


dropout_2 (Dropout) (None, 15) 0


dense_3 (Dense) (None, 10) 160


activation_3 (Activation) (None, 10) 0

Total params: 2,015
Trainable params: 2,015
Non-trainable params: 0


Number of training instances: 887
Number of testing instances: 113
2017-10-11 14:36:03.929908: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations.
2017-10-11 14:36:03.929928: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations.
2017-10-11 14:36:03.929934: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX instructions, but these are available on your machine and could speed up CPU computations.
2017-10-11 14:36:03.929938: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX2 instructions, but these are available on your machine and could speed up CPU computations.
2017-10-11 14:36:03.929943: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use FMA instructions, but these are available on your machine and could speed up CPU computations.
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 754, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/local/lib/python2.7/dist-packages/distkeras/trainers.py", line 466, in service
self.parameter_server.initialize()
File "/usr/local/lib/python2.7/dist-packages/distkeras/parameter_servers.py", line 111, in initialize
file_descriptor.bind(('0.0.0.0', self.master_port))
File "/usr/lib/python2.7/socket.py", line 228, in meth
return getattr(self._sock,name)(*args)
error: [Errno 98] Address already in use

[Stage 9:> (0 + 3) / 3]17/10/11 14:36:10 WARN TaskSetManager: Lost task 1.0 in stage 9.0 (TID 656, 10.51.5.30, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/lib/python2.7/dist-packages/distkeras/workers.py", line 261, in train
self.connect()
File "/usr/local/lib/python2.7/dist-packages/distkeras/workers.py", line 197, in connect
self.socket = connect(self.master_host, self.master_port, self.disable_nagle)
File "/usr/local/lib/python2.7/dist-packages/distkeras/networking.py", line 97, in connect
fd.connect((host, port))
File "/usr/lib/python2.7/socket.py", line 228, in meth
return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Thanks,
Manoj

@JoeriHermans
Copy link
Collaborator

Hi Manoj,

The error says the parameter server port is already allocated, could you check this?

File "/usr/lib/python2.7/socket.py", line 228, in meth
return getattr(self._sock,name)(*args)
error: [Errno 98] Address already in use

Joeri

@nagamanojk
Copy link
Author

@JoeriHermans

Hi Joeri,
I tried to re-run, now I'm not get error: [Errno 98] Address already in use, but still, its not able to connect to worker machines.
Please find the attached worker1.log & master.log from SparkUI.
It look like the code runs only on master, and not on worker.
I have tried the same example with Spark's mlLib API's it works fine.
Please let me know, if you need any additional information.
worker1.log
master.log

./bin/spark-submit --master spark://10.51.5.40:7077 examples/src/main/python/gtzanKeras.py gtzan.parquet
Using TensorFlow backend.
17/10/11 17:15:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
root
|-- features_normalized: vector (nullable = true)
|-- label_index: double (nullable = true)
|-- label: array (nullable = true)
| |-- element: double (containsNull = true)


Layer (type) Output Shape Param #

dense_1 (Dense) (None, 40) 1240


activation_1 (Activation) (None, 40) 0


dropout_1 (Dropout) (None, 40) 0


dense_2 (Dense) (None, 15) 615


activation_2 (Activation) (None, 15) 0


dropout_2 (Dropout) (None, 15) 0


dense_3 (Dense) (None, 10) 160


activation_3 (Activation) (None, 10) 0

Total params: 2,015
Trainable params: 2,015
Non-trainable params: 0


Number of training instances: 897
Number of testing instances: 103
2017-10-11 17:15:14.950629: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations.
2017-10-11 17:15:14.950654: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations.
2017-10-11 17:15:14.950660: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX instructions, but these are available on your machine and could speed up CPU computations.
2017-10-11 17:15:14.950665: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX2 instructions, but these are available on your machine and could speed up CPU computations.
2017-10-11 17:15:14.950669: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use FMA instructions, but these are available on your machine and could speed up CPU computations.
[Stage 9:> (0 + 3) / 3]17/10/11 17:15:20 WARN TaskSetManager: Lost task 2.0 in stage 9.0 (TID 669, 10.51.5.30, executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/lib/python2.7/dist-packages/distkeras/workers.py", line 261, in train
self.connect()
File "/usr/local/lib/python2.7/dist-packages/distkeras/workers.py", line 197, in connect
self.socket = connect(self.master_host, self.master_port, self.disable_nagle)
File "/usr/local/lib/python2.7/dist-packages/distkeras/networking.py", line 97, in connect
fd.connect((host, port))
File "/usr/lib/python2.7/socket.py", line 228, in meth
return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Training time: 27.7298340797
Train Accuracy: 0.952062430323
Test Accuracy: 0.78640776699

Thanks,
Manoj

@JoeriHermans
Copy link
Collaborator

Hi Manoj,

This shouldn't happen. My guess is that there is an other process occupying the default port (5000). Or, the OS has a firewall enabled. What is the output of netstat -l4pn?

Joeri

@nagamanojk
Copy link
Author

Hi Joeri,
It looks like there is no process running on port 5000 and firewall is inactive.

ee207437@pcg-ee207437-3:~$ netstat -lanp --protocol=inet
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN -
tcp 0 0 127.0.0.1:631 0.0.0.0:* LISTEN -
tcp 0 0 0.0.0.0:5900 0.0.0.0:* LISTEN 2067/vino-server
udp 0 0 0.0.0.0:33182 0.0.0.0:* -
udp 0 0 0.0.0.0:631 0.0.0.0:* -
udp 0 0 0.0.0.0:5353 0.0.0.0:* -

ee207437@pcg-ee207437-3:~$ sudo ufw status
Status: inactive

Thanks,
Manoj

@JoeriHermans
Copy link
Collaborator

Hmm, strange. Could you send me a sample of how you use ADAG? Because it seems that for some reason the parameter server is not allocated.

Joeri

@nagamanojk
Copy link
Author

Hi Joeri,
Please find below code details

if name == "main":

num_processes = 3
num_executors = 1
application_name = "Distributed Keras XXX Analysis"
num_workers = 3

spark = SparkSession\
    .builder\
    .appName(application_name)\
    .getOrCreate()


reader = spark
transFile = sys.argv[1]

#In[4]:
# Read the dataset.
raw_dataset = reader.read.parquet(transFile) \
                 .select("features_normalized", "label_index", "label")

# Finally, we create a trainingset and a testset.
(training_set, test_set) = raw_dataset.randomSplit([0.9, 0.1])

#In[5]:
# Print the schema of the dataset.
training_set.printSchema()

#In[6]:
#Multilayer Perceptron
mlp = Sequential()
mlp.add(Dense(40, input_shape=(30,)))
mlp.add(Activation('relu'))
mlp.add(Dropout(0.1))
mlp.add(Dense(15))
mlp.add(Activation('relu'))
mlp.add(Dropout(0.1))
mlp.add(Dense(10))
mlp.add(Activation('softmax'))

#In[7]:
mlp.summary()

#In[8]:
optimizer_mlp = 'adam'
loss_mlp = 'categorical_crossentropy'

#In[9]:
#Training
training_set = training_set.repartition(num_workers)
test_set = test_set.repartition(num_workers)
training_set.cache()
test_set.cache()
print("Number of training instances: " + str(training_set.count()))
print("Number of testing instances: " + str(test_set.count()))

#In[10]: Evaluation
def evaluate_accuracy(model, test_set, features="features_normalized"):
    evaluator = AccuracyEvaluator(prediction_col="prediction_index", label_col="label_index")
    predictor = ModelPredictor(keras_model=model, features_col=features)
    transformer = LabelIndexTransformer(output_dim=10)
    test_set = test_set.select(features, "label_index")
    test_set = predictor.predict(test_set)
    test_set = transformer.transform(test_set)
    score = evaluator.evaluate(test_set)

    return score

#In[11]: ADAG
trainer = ADAG(keras_model=mlp, worker_optimizer=optimizer_mlp, loss=loss_mlp, num_workers=num_workers,
           batch_size=10, communication_window=5, num_epoch=200,
           features_col="features_normalized", label_col="label")
# Modify the default parallelism factor.
trained_model = trainer.train(training_set)

#In[12]:
# View the weights of the trained model.
trained_model.get_weights()

#In[13]:
print("Training time: " + str(trainer.get_training_time()))
print("Train Accuracy: " + str(evaluate_accuracy(trained_model, training_set)))
print("Test Accuracy: " + str(evaluate_accuracy(trained_model, test_set)))

spark.stop()

--Manoj

@JoeriHermans
Copy link
Collaborator

JoeriHermans commented Oct 13, 2017

Hmm, this is really strange. But in the ouput it shows that the model is trained?

Could you give the output of detemine_host_address()? Maybe the port only binds to the local address for some reason. And after the crash do print(trainer.parameter_server).

Joeri

@nagamanojk
Copy link
Author

Joeri

For determine_host_address() O/P is 127.0.1.1
I tried to print "host" and "port" from /usr/local/lib/python2.7/dist-packages/distkeras/networking.py line 97

When I run in with --master local[*} on PC-ee207437-1
From ee207437-1 host: 127.0.1.1 port:5000
From ee207437-1 host: 127.0.1.1 port:5000
From ee207437-1 host: 127.0.1.1 port:5000

This is expected, as I have num_worker = 3

When I run in with --master spark://10.51.5.40 on PC-ee207437-1
From ee207437-1 host: 127.0.1.1 port:5000 (This is master)
From ee207437-2 host: 127.0.1.1 port:5000 (This is worker-1)
From ee207437-3 host: 127.0.1.1 port:5000 (This is worker-2)

From worker-2 and worker-3 its trying to connect to itself, is this correct ?
Shouldn't it connect to master, i.e., 10.51.5.40

/etc/hosts has
pcg-ee207437-1 10.51.5.40
pcg-ee207437-2 10.51.5.30
pcg-ee207437-3 10.51.5.50

spark/sbin/slaves
pcg-ee207437-1
pcg-ee207437-2
pcg-ee207437-3

In SparkUI I see all worker connected to master. I tried running other examples like wordcount.py.
I works fine, I see reduction in run time.

I think, I'm missing some env-var setting for dist-keras with spark-standalone.

Thanks,
Manoj

@JoeriHermans
Copy link
Collaborator

Yes, that's the error. We need to force determine_host_address to not pick the local address. If we fix that it will work.

I'm on my phone right now, but I can look at it in about one hour. I'll keep you posted.

Joeri

@JoeriHermans
Copy link
Collaborator

Also, did you define the hostname of local as 127.0.0.1 in /etc/hosts? That would explain why determine host address doesn't function properly.

Joeri

@JoeriHermans
Copy link
Collaborator

Hi Manoj,

I think I have a fix. The only downside is that this code isn't cross-platform. But I don't think a lot of people run Spark on Windows / Mac anyway.

import os
import fcntl
import socket
import struct

def get_interface_ip(ifname):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    
    return socket.inet_ntoa(fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s',
                            ifname[:15]))[20:24])

def get_default_iface():
    route = "/proc/net/route"
    with open(route) as f:
        for line in f.readlines():
            try:
                iface, dest, _, flags, _, _, _, _, _, _, _, =  line.strip().split()
                if dest != '00000000' or not int(flags, 16) & 2:
                    continue
                return iface
            except:
                continue
                
def determine_host_address():
    # Retrieve the Network Interface in ASCII encoding.
    iface = get_default_iface().encode("ascii")
    # Obtain the network address from an active NIC.
    address = get_interface_ip(iface)
    
    return address

Could you verify if that determine_host_address() doesn't return the local address on your machines?

Joeri

@nagamanojk
Copy link
Author

nagamanojk commented Oct 13, 2017 via email

@JoeriHermans
Copy link
Collaborator

If it's ok for you I'll close this issue now. Feel free to re-open it.

Joeri

@nagamanojk
Copy link
Author

Hi Joeri,
Sorry for the delay, I was stuck up with some other things on application side.
I tried to print "host" and "port" from /usr/local/lib/python2.7/dist-packages/distkeras/networking.py line 97

When I run in with --master spark://10.51.5.40 on PC-ee207437-1
From ee207437-1 host: 10.51.5.40 port:5000 (This is master)
From ee207437-2 host: 10.51.5.40 port:5000 (This is worker-1)
From ee207437-3 host: 10.51.5.40 port:5000 (This is worker-2)

The Run completes successfully.
But in the end of worker log, this below line is printed.
"17/10/17 08:34:24 ERROR Co"
This is not an issue, right?

Thanks a lot for immediate responses, Joeri.
--Manoj

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants