Skip to content

Commit

Permalink
./dev/scalafmt + copyright header
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamuel-bs committed Oct 27, 2021
1 parent 279c063 commit 76f4fa6
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import org.apache.spark.internal.config._
import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils}

private[spark] class JavaSerializationStream(
out: OutputStream, counterReset: Int, extraDebugInfo: Boolean)
extends SerializationStream {
out: OutputStream,
counterReset: Int,
extraDebugInfo: Boolean)
extends SerializationStream {
private val objOut = new ObjectOutputStream(out)
private var counter = 0

Expand Down Expand Up @@ -59,9 +61,10 @@ private[spark] class JavaSerializationStream(
}

private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoader)
extends DeserializationStream {
extends DeserializationStream {

private val objIn = new ObjectInputStream(in) {

override def resolveClass(desc: ObjectStreamClass): Class[_] =
try {
// scalastyle:off classforname
Expand All @@ -73,18 +76,20 @@ private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoa
}

override def resolveProxyClass(ifaces: Array[String]): Class[_] = {
// scalastyle:off classforname
val resolved = ifaces.map(iface => Class.forName(iface, false, loader))
// scalastyle:on classforname
java.lang.reflect.Proxy.getProxyClass(loader, resolved: _*)
}
// scalastyle:off classforname
val resolved = ifaces.map(iface => Class.forName(iface, false, loader))
// scalastyle:on classforname
java.lang.reflect.Proxy.getProxyClass(loader, resolved: _*)
}

}

def readObject[T: ClassTag](): T = objIn.readObject().asInstanceOf[T]
def close(): Unit = { objIn.close() }
}

private object JavaDeserializationStream {

val primitiveMappings = Map[String, Class[_]](
"boolean" -> classOf[Boolean],
"byte" -> classOf[Byte],
Expand All @@ -94,13 +99,15 @@ private object JavaDeserializationStream {
"long" -> classOf[Long],
"float" -> classOf[Float],
"double" -> classOf[Double],
"void" -> classOf[Void]
)
"void" -> classOf[Void])

}

private[spark] class JavaSerializerInstance(
counterReset: Int, extraDebugInfo: Boolean, defaultClassLoader: ClassLoader)
extends SerializerInstance {
counterReset: Int,
extraDebugInfo: Boolean,
defaultClassLoader: ClassLoader)
extends SerializerInstance {

override def serialize[T: ClassTag](t: T): ByteBuffer = {
val bos = new ByteBufferOutputStream()
Expand Down Expand Up @@ -133,6 +140,7 @@ private[spark] class JavaSerializerInstance(
def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = {
new JavaDeserializationStream(s, loader)
}

}

/**
Expand All @@ -148,20 +156,23 @@ class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable {
private var counterReset = conf.get(SERIALIZER_OBJECT_STREAM_RESET)
private var extraDebugInfo = conf.get(SERIALIZER_EXTRA_DEBUG_INFO)

protected def this() = this(new SparkConf()) // For deserialization only
protected def this() = this(new SparkConf()) // For deserialization only

override def newInstance(): SerializerInstance = {
val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader)
new JavaSerializerInstance(counterReset, extraDebugInfo, classLoader)
}

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
out.writeInt(counterReset)
out.writeBoolean(extraDebugInfo)
}
override def writeExternal(out: ObjectOutput): Unit =
Utils.tryOrIOException {
out.writeInt(counterReset)
out.writeBoolean(extraDebugInfo)
}

override def readExternal(in: ObjectInput): Unit =
Utils.tryOrIOException {
counterReset = in.readInt()
extraDebugInfo = in.readBoolean()
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
counterReset = in.readInt()
extraDebugInfo = in.readBoolean()
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.serializer;

import java.io.Serializable;
Expand All @@ -6,28 +23,27 @@
import java.lang.reflect.Proxy;

class ContainsProxyClass implements Serializable {
final MyInterface proxy = (MyInterface) Proxy.newProxyInstance(
MyInterface.class.getClassLoader(),
new Class[]{MyInterface.class},
new MyInvocationHandler());
final MyInterface proxy = (MyInterface) Proxy.newProxyInstance(
MyInterface.class.getClassLoader(),
new Class[]{MyInterface.class},
new MyInvocationHandler());

// Interface needs to be public as classloaders will mismatch - see ObjectInputStream#resolveProxyClass for details.
public interface MyInterface {
void myMethod();
}
// Interface needs to be public as classloaders will mismatch - see ObjectInputStream#resolveProxyClass for details.
public interface MyInterface {
void myMethod();
}

static class MyClass implements MyInterface, Serializable {
@Override
public void myMethod() {
}
}
static class MyClass implements MyInterface, Serializable {
@Override
public void myMethod() {}
}

class MyInvocationHandler implements InvocationHandler, Serializable {
private final MyClass real = new MyClass();
class MyInvocationHandler implements InvocationHandler, Serializable {
private final MyClass real = new MyClass();

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
return method.invoke(real, args);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
return method.invoke(real, args);
}
}
}

0 comments on commit 76f4fa6

Please sign in to comment.