Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.

TAJO-527: Upgrade to Netty 4#311

Closed
ykrips wants to merge 67 commits intoapache:masterfrom
ykrips:TAJO-527
Closed

TAJO-527: Upgrade to Netty 4#311
ykrips wants to merge 67 commits intoapache:masterfrom
ykrips:TAJO-527

Conversation

@ykrips
Copy link
Copy Markdown

@ykrips ykrips commented Dec 19, 2014

This is a first try to upgrade netty. I did not optimize code yet. However, this is not easy to apply this change, and I want to hear any suggestions from anyone who has a interest on this patch.

@ykrips
Copy link
Copy Markdown
Author

ykrips commented Dec 20, 2014

I did not catch up this failure when I ran test cases on my laptop. I will dig out this test failure.

@hyunsik
Copy link
Copy Markdown
Member

hyunsik commented Dec 20, 2014

No problem :)

@ykrips
Copy link
Copy Markdown
Author

ykrips commented Dec 21, 2014

Travis test has timed out and this issue led my test build failed. This issue may require more time to figure out what is wrong on my test build.

Your test run exceeded 50 minutes. 

@jinossy
Copy link
Copy Markdown
Member

jinossy commented Dec 22, 2014

@ykrips
Don't worry about it. I also investigate the problem.

@hyunsik
Copy link
Copy Markdown
Member

hyunsik commented Dec 22, 2014

Thank you for nice work. It looks awesome.

Since this work may affect an entire Tajo system, the review and test on real environments will take longer time. So, I think that it will be merged to next release instead 0.10.

@jinossy
Copy link
Copy Markdown
Member

jinossy commented Dec 22, 2014

@hyunsik
I agree with you. We need more review and test

@jinossy
Copy link
Copy Markdown
Member

jinossy commented Dec 22, 2014

I ran test on my macbook. I got rpc hangs in TestAsyncRpc
@ykrips
Could you check the HashedWheelTimer in TaskRunnerManager ?

main" prio=5 tid=7fabff000800 nid=0x10ff62000 waiting on condition [10ff60000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <7f3e47578> (a java.util.concurrent.Semaphore$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
        at java.util.concurrent.Semaphore.acquire(Semaphore.java:286)
        at org.apache.tajo.rpc.CallFuture.get(CallFuture.java:70)
        at org.apache.tajo.rpc.TestAsyncRpc.testStubDisconnected(TestAsyncRpc.java:263)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
        at org.junit.rules.RunRules.evaluate(RunRules.java:20)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
        at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:264)
        at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
        at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:124)
        at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:200)
        at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:153)
        at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)

@ykrips
Copy link
Copy Markdown
Author

ykrips commented Dec 22, 2014

@jinossy ,
Thanks for posting your test results.
I have ran test cases with JDK 1.6 and JDK 1.7, and I did not find rpc hang issues on TestAsyncRpc. Can you provide the details on your test environment?
By the way, I have removed the HashedWheelTimer in TaskRunnerManager, because netty 4 does not accept the Timer object when creating ReadTimeoutHandler instance. Netty 4 uses internal scheduler when catching up the timeout event.
Anyway, thank you again, and I will look through these timeout handlers. In some cases, I found that netty 4 does not create the time-out event.

@jinossy
Copy link
Copy Markdown
Member

jinossy commented Dec 22, 2014

@ykrips
Thanks for quick response
I think we need handle the event after rpc stub is disconnected
My env is following :
OSX : 10.9.5
JDK: 1.7.0_67-b01

@ykrips
Copy link
Copy Markdown
Author

ykrips commented Dec 23, 2014

@jinossy,
Thank you for posting your environment.
I have quickly made up the test environment on Mac OSX yosemite with JDK 1.6.0_65, and got a same problem. This test case works well on Ubuntu 14.04, so I think that this error might come from different thread management. I will look through this issue, and will post patches if I found the root cause.

@ykrips
Copy link
Copy Markdown
Author

ykrips commented Dec 29, 2014

Hello All,
I was away from the computer for several days, and sorry for that. I have added a tricky parameter on testStubDisconnected function. MacOSX uses the completely different thread management policy than Linux, therefore other thread does not work at all. These test cases works with JDK "1.6.0_65" and "1.7.0_71" on MacOSX yosemite, and with this investigation I feel that it is needed to change some codes which uses eventloopgroup.

@ykrips
Copy link
Copy Markdown
Author

ykrips commented Dec 30, 2014

First build test passed, but second one did not. I will look through this error.

@ykrips
Copy link
Copy Markdown
Author

ykrips commented Feb 16, 2015

Hello All,
It has been a long time to enable a netty4 library to tajo project. Finally, performance on netty4 was achieved to the acceptable level, and errors on Travis CI build was resolved. Now, I think, it is a time to discuss on any missing points or any potential issues.

@jihoonson
Copy link
Copy Markdown
Contributor

@ykrips, thanks for your great work!
I have one question, just from my curiosity. How did you evaluate the performance with Netty4?

@ykrips
Copy link
Copy Markdown
Author

ykrips commented Feb 16, 2015

Hello @jihoonson ,
I have done with several items. First, disabled nagle algorithm as possible. Enabling nagle algorithm will reduce the resource use on network infrastructure, but it will delay network transmission. Also, netty4 team recommend not to use flush() function frequently, but it also delays the network transmission. Second, I have set the send and receive buffer size of servers and clients as possible. Low buffer size also delays the network performance, and providers and consumers wait until the buffer is empty. Finally, I have merged and refactored the source code to use shared eventloopgroup. Creating a object which tightly coupled to the operating system resource is expensive operation, and when creating these objects frequently, it may lead starvation on native memory and network resources.

@ykrips
Copy link
Copy Markdown
Author

ykrips commented Feb 26, 2015

@jinossy,
I would liked to append additional patches for rpc codes using netty4. Would you please check these patches?

@jinossy
Copy link
Copy Markdown
Member

jinossy commented Feb 26, 2015

@ykrips
Sure, I will review soon

@ykrips
Copy link
Copy Markdown
Author

ykrips commented Feb 27, 2015

Alright. It will be fixed up soon.

@hyunsik
Copy link
Copy Markdown
Member

hyunsik commented Mar 1, 2015

The patch looks nice to me. In order to ensure its stability, it would be great if we carry out some experiments with some heavy queries on TB-sized data sets. Anyone can help this kind of experiment?

@ykrips
Copy link
Copy Markdown
Author

ykrips commented Mar 1, 2015

@hyunsik, it would be a great thing that we can run some stress tests on multiple-node clusters. We need to find out test environment for this test.

@jinossy
Copy link
Copy Markdown
Member

jinossy commented Mar 2, 2015

@ykrips
Could you fix following error ? I ran TPCH-Q3

  • Error 1
2015-03-02 11:41:19,399 WARN org.apache.tajo.rpc.RpcConnectionPool: Try to reconnect : server1/xxx.xxx.xxx.xxx:28091
2015-03-02 11:41:19,405 ERROR org.apache.tajo.rpc.AsyncRpcClient: server2/xxx.xxx.xxx.xxx:28091,class org.apache.tajo.ipc.TajoWorkerProtocol,java.nio.channels.ClosedCh
annelException
com.google.protobuf.ServiceException: java.nio.channels.ClosedChannelException
        at org.apache.tajo.rpc.AsyncRpcClient$ProxyRpcChannel$1.operationComplete(AsyncRpcClient.java:147)
        at org.apache.tajo.rpc.AsyncRpcClient$ProxyRpcChannel$1.operationComplete(AsyncRpcClient.java:142)
        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:567)
        at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:754)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:655)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1113)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633)
        at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:32)
        at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:908)
        at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:960)
        at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:893)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.ClosedChannelException
2015-03-02 11:41:19,407 ERROR org.apache.tajo.master.ContainerProxy: Connect error to server3/xxx.xxx.xxx.xxx:28091 caused by
io.netty.channel.ConnectTimeoutException: Connect error to server3/xxx.xxx.xxx.xxx:28091 caused by
        at org.apache.tajo.rpc.NettyClientBase.handleConnectionInternally(NettyClientBase.java:93)
        at org.apache.tajo.rpc.NettyClientBase.connect(NettyClientBase.java:103)
        at org.apache.tajo.rpc.RpcConnectionPool.getConnection(RpcConnectionPool.java:96)
        at org.apache.tajo.master.TajoContainerProxy.assignExecutionBlock(TajoContainerProxy.java:105)
        at org.apache.tajo.master.TajoContainerProxy.launch(TajoContainerProxy.java:75)
        at org.apache.tajo.worker.TajoResourceAllocator$LaunchRunner.run(TajoResourceAllocator.java:210)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
  • Error 2
2015-03-02 11:41:19,406 ERROR org.apache.tajo.rpc.AsyncRpcClient: null,class org.apache.tajo.ipc.TajoWorkerProtocol,java.lang.UnsupportedOperationException: unsupported message type:
 RpcProtos$RpcRequest (expected: ByteBuf, FileRegion)
com.google.protobuf.ServiceException: java.lang.UnsupportedOperationException: unsupported message type: RpcProtos$RpcRequest (expected: ByteBuf, FileRegion)
        at org.apache.tajo.rpc.AsyncRpcClient$ProxyRpcChannel$1.operationComplete(AsyncRpcClient.java:147)
        at org.apache.tajo.rpc.AsyncRpcClient$ProxyRpcChannel$1.operationComplete(AsyncRpcClient.java:142)
        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:567)
        at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:754)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:669)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1113)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633)
        at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:32)
        at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:908)
        at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:960)
        at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:893)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException: unsupported message type: RpcProtos$RpcRequest (expected: ByteBuf, FileRegion)
        at io.netty.channel.nio.AbstractNioByteChannel.filterOutboundMessage(AbstractNioByteChannel.java:280)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:663)
        ... 10 more

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add file checking ?
if (PullServerUtil.isNativeIOPossible() && manageOsCache && count() > 0 && super.isOpen())

It will fix the "bad file descriptor"

2015-03-03 10:34:40,755 WARN org.apache.tajo.pullserver.PullServerUtil: Failed to manage OS cache for /data05/tajo/data/q_1425346386770_0001/output/1/hash-shuffle/3/263
java.lang.NullPointerException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.tajo.pullserver.PullServerUtil.posixFadviseIfPossible(PullServerUtil.java:56)
        at org.apache.tajo.pullserver.FadvisedFileRegion.transferSuccessful(FadvisedFileRegion.java:163)
        at org.apache.tajo.pullserver.FileCloseListener.operationComplete(FileCloseListener.java:46)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinossy,
Thanks for posting test results on Netty4. I'll commit it soon.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting... Netty team added isOpen() api on DefaultFileRegion class in 4.0.25.final. It could be good to use netty api for checking if this fileregion is deallocated or not.

@jinossy
Copy link
Copy Markdown
Member

jinossy commented Mar 3, 2015

I’ve successfully tested by real data on my company cluster.

  • ENV
    • 2 TajoMaster + 4 TajoWorker
    • JDK 1.7.0_67
    • 1G Network
Json Table 
2TB compressed by snappy
7.3TB Actual bytes

select count(*) from (select id from table1 group by id) t1;
Progress: 100%, response time: 3546.781 sec
?count
-------------------------------
2802809536
(1 rows, 3546.781 sec, 11 B selected)


Parquet table
8.1TB compressed by snappy
select count(*)  from table2
Progress: 100%, response time: 374.358 sec
?count
-------------------------------
16090817643
(1 rows, 374.358 sec, 12 B selected)

@jinossy
Copy link
Copy Markdown
Member

jinossy commented Mar 3, 2015

+1
I greatly appreciate your effort.
Thank you!

@asfgit asfgit closed this in 22876a8 Mar 3, 2015
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants