Permalink
Browse files

Fix spacing and formatting

  • Loading branch information...
1 parent b514bec commit ecbfb65860e4fea722537802cf036f0b505d7da9 @tgravescs tgravescs committed Feb 22, 2014
@@ -41,7 +41,8 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
* as well as classes created by the interpreter when the user types in code. This is just a wrapper
* around a Jetty server.
*/
-private[spark] class HttpServer(resourceBase: File, securityManager: SecurityManager) extends Logging {
+private[spark] class HttpServer(resourceBase: File, securityManager: SecurityManager)
+ extends Logging {
private var server: Server = null
private var port: Int = -1
@@ -1,13 +1,13 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -104,10 +104,10 @@ import scala.collection.mutable.ArrayBuffer
*
* - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters
* can be used. Yarn requires a specific AmIpFilter be installed for security to work
- * properly. For non-Yarn deployments, users can write a filter to go through a companies
- * normal login service. If an authentication filter is in place then the SparkUI
- * can be configured to check the logged in user against the list of users who have
- * view acls to see if that user is authorized.
+ * properly. For non-Yarn deployments, users can write a filter to go through a
+ * companies normal login service. If an authentication filter is in place then the
+ * SparkUI can be configured to check the logged in user against the list of users who
+ * have view acls to see if that user is authorized.
* The filters can also be used for many different purposes. For instance filters
* could be used for logging, encypryption, or compression.
*
@@ -1,13 +1,12 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License") you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -1,20 +1,20 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License") you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.spark
import javax.security.auth.callback.Callback
@@ -60,7 +60,8 @@ abstract class Broadcast[T](val id: Long) extends Serializable {
}
private[spark]
-class BroadcastManager(val _isDriver: Boolean, conf: SparkConf, securityManager: SecurityManager) extends Logging with Serializable {
+class BroadcastManager(val _isDriver: Boolean, conf: SparkConf, securityManager: SecurityManager)
+ extends Logging with Serializable {
private var initialized = false
private var broadcastFactory: BroadcastFactory = null
@@ -153,7 +153,7 @@ private object HttpBroadcast extends Logging {
}
def read[T](id: Long): T = {
- logDebug("broadcast read server: " + serverUri + " id: broadcast-"+id)
+ logDebug("broadcast read server: " + serverUri + " id: broadcast-" + id)
val url = serverUri + "/" + BroadcastBlockId(id).name
var uc: URLConnection = null
@@ -62,7 +62,8 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
val handlers = metricsHandlers ++ Seq[ServletContextHandler](
createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static/*"),
- createServletHandler("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)),
+ createServletHandler("/app/json",
+ (request: HttpServletRequest) => applicationPage.renderJson(request)),
createServletHandler("/app", (request: HttpServletRequest) => applicationPage.render(request)),
createServletHandler("/json", (request: HttpServletRequest) => indexPage.renderJson(request)),
createServletHandler("*", (request: HttpServletRequest) => indexPage.render(request))
@@ -447,8 +447,11 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
// Must be created within selector loop - else deadlock
-private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : Selector, id_ : ConnectionId)
- extends Connection(channel_, selector_, id_) {
+private[spark] class ReceivingConnection(
+ channel_ : SocketChannel,
+ selector_ : Selector,
+ id_ : ConnectionId)
+ extends Connection(channel_, selector_, id_) {
def isSaslComplete(): Boolean = {
if (sparkSaslServer != null) sparkSaslServer.isComplete() else false
@@ -509,7 +512,7 @@ private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : S
val inbox = new Inbox()
val headerBuffer: ByteBuffer = ByteBuffer.allocate(MessageChunkHeader.HEADER_SIZE)
- var onReceiveCallback: (Connection , Message) => Unit = null
+ var onReceiveCallback: (Connection, Message) => Unit = null
var currentChunk: MessageChunk = null
channel.register(selector, SelectionKey.OP_READ)
@@ -584,7 +587,7 @@ private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : S
}
}
} catch {
- case e: Exception => {
+ case e: Exception => {
logWarning("Error reading from connection to " + getRemoteConnectionManagerId(), e)
callOnExceptionCallback(e)
close()
@@ -37,7 +37,8 @@ import scala.concurrent.duration._
import org.apache.spark.util.{SystemClock, Utils}
-private[spark] class ConnectionManager(port: Int, conf: SparkConf, securityManager: SecurityManager) extends Logging {
+private[spark] class ConnectionManager(port: Int, conf: SparkConf,
+ securityManager: SecurityManager) extends Logging {
class MessageStatus(
val message: Message,
@@ -54,7 +55,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, securityManag
private val selector = SelectorProvider.provider.openSelector()
// default to 30 second timeout waiting for authentication
- private val authTimeout= System.getProperty("spark.core.connection.auth.wait.timeout","30000").toInt
+ private val authTimeout = System.getProperty("spark.core.connection.auth.wait.timeout",
+ "30000").toInt
private val handleMessageExecutor = new ThreadPoolExecutor(
conf.getInt("spark.core.connection.handler.threads.min", 20),
@@ -564,7 +566,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, securityManag
logDebug("Server sasl not completed: " + connection.connectionId)
}
if (replyToken != null) {
- var securityMsgResp = SecurityMessage.fromResponse(replyToken, securityMsg.getConnectionId)
+ var securityMsgResp = SecurityMessage.fromResponse(replyToken,
+ securityMsg.getConnectionId)
var message = securityMsgResp.toBufferMessage
if (message == null) throw new Exception("Error creating security Message")
sendSecurityMessage(connection.getRemoteConnectionManagerId(), message)
@@ -689,7 +692,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, securityManag
var firstResponse: Array[Byte] = null
try {
firstResponse = conn.sparkSaslClient.firstToken()
- var securityMsg = SecurityMessage.fromResponse(firstResponse, conn.connectionId.toString())
+ var securityMsg = SecurityMessage.fromResponse(firstResponse,
+ conn.connectionId.toString())
var message = securityMsg.toBufferMessage
if (message == null) throw new Exception("Error creating security message")
connectionsAwaitingSasl += ((conn.connectionId, conn))
@@ -714,13 +718,15 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, securityManag
def startNewConnection(): SendingConnection = {
val inetSocketAddress = new InetSocketAddress(connManagerId.host, connManagerId.port)
val newConnectionId = new ConnectionId(id, idCount.getAndIncrement.intValue)
- val newConnection = new SendingConnection(inetSocketAddress, selector, connManagerId, newConnectionId)
+ val newConnection = new SendingConnection(inetSocketAddress, selector, connManagerId,
+ newConnectionId)
logInfo("creating new sending connection for security! " + newConnectionId )
registerRequests.enqueue(newConnection)
newConnection
}
- // I removed the lookupKey stuff as part of merge ... should I re-add it ? We did not find it useful in our test-env ...
+ // I removed the lookupKey stuff as part of merge ... should I re-add it ?
+ // We did not find it useful in our test-env ...
// If we do re-add it, we should consistently use it everywhere I guess ?
message.senderAddress = id.toSocketAddress()
logDebug("Sending Security [" + message + "] to [" + connManagerId + "]")
@@ -737,7 +743,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, securityManag
val inetSocketAddress = new InetSocketAddress(connectionManagerId.host,
connectionManagerId.port)
val newConnectionId = new ConnectionId(id, idCount.getAndIncrement.intValue)
- val newConnection = new SendingConnection(inetSocketAddress, selector, connectionManagerId, newConnectionId)
+ val newConnection = new SendingConnection(inetSocketAddress, selector, connectionManagerId,
+ newConnectionId)
logDebug("creating new sending connection: " + newConnectionId)
registerRequests.enqueue(newConnection)
@@ -751,7 +758,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, securityManag
checkSendAuthFirst(connectionManagerId, connection)
}
message.senderAddress = id.toSocketAddress()
- logDebug("Before Sending [" + message + "] to [" + connectionManagerId + "]" + " connectionid: " + connection.connectionId)
+ logDebug("Before Sending [" + message + "] to [" + connectionManagerId + "]" + " " +
+ "connectionid: " + connection.connectionId)
if (authEnabled) {
// if we aren't authenticated yet lets block the senders until authentication completes
@@ -74,6 +74,7 @@ private[spark] object MessageChunkHeader {
buffer.get(ipBytes)
val ip = InetAddress.getByAddress(ipBytes)
val port = buffer.getInt()
- new MessageChunkHeader(typ, id, totalSize, chunkSize, other, securityNeg, new InetSocketAddress(ip, port))
+ new MessageChunkHeader(typ, id, totalSize, chunkSize, other, securityNeg,
+ new InetSocketAddress(ip, port))
}
}
@@ -121,7 +121,7 @@ private[spark] object JettyUtils extends Logging {
}
private def addFilters(handlers: Seq[ServletContextHandler]) {
- val filters : Array[String] = System.getProperty("spark.ui.filters", "").split(',').map(_.trim())
+ val filters: Array[String] = System.getProperty("spark.ui.filters", "").split(',').map(_.trim())
filters.foreach {
case filter : String =>
if (!filter.isEmpty) {
@@ -54,8 +54,11 @@ private[spark] class JobProgressUI(val sc: SparkContext) {
def formatDuration(ms: Long) = Utils.msDurationToString(ms)
def getHandlers = Seq[ServletContextHandler](
- createServletHandler("/stages/stage", (request: HttpServletRequest) => stagePage.render(request)),
- createServletHandler("/stages/pool", (request: HttpServletRequest) => poolPage.render(request)),
- createServletHandler("/stages", (request: HttpServletRequest) => indexPage.render(request))
+ createServletHandler("/stages/stage",
+ (request: HttpServletRequest) => stagePage.render(request)),
+ createServletHandler("/stages/pool",
+ (request: HttpServletRequest) => poolPage.render(request)),
+ createServletHandler("/stages",
+ (request: HttpServletRequest) => indexPage.render(request))
)
}

0 comments on commit ecbfb65

Please sign in to comment.