/
ModeS.java
107 lines (92 loc) · 2.9 KB
/
ModeS.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package org.dcache.ftp.data;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import org.dcache.pool.repository.RepositoryChannel;
/** Implementation of MODE S. */
public class ModeS extends Mode
{
private final int _blockSize;
/* Implements MODE S send operation. */
private class Sender extends AbstractMultiplexerListener
{
protected SocketChannel _socket;
protected long _position;
protected long _count;
public Sender(SocketChannel socket)
{
_socket = socket;
_position = getStartPosition();
_count = getSize();
}
@Override
public void register(Multiplexer multiplexer) throws IOException
{
multiplexer.register(this, SelectionKey.OP_WRITE, _socket);
}
@Override
public void write(Multiplexer multiplexer, SelectionKey key)
throws Exception
{
long nbytes = transferTo(_position, _count, _socket);
_monitor.sentBlock(_position, nbytes);
_position += nbytes;
_count -= nbytes;
/* There is no special end-of-file signal in mode S. Just
* close the connection.
*/
if (_count == 0) {
close(multiplexer, key, true);
}
}
}
/* Implements MODE S receive operation. */
private class Receiver extends AbstractMultiplexerListener
{
protected SocketChannel _socket;
protected long _position;
public Receiver(SocketChannel socket)
{
_socket = socket;
_position = 0;
}
@Override
public void register(Multiplexer multiplexer) throws IOException
{
multiplexer.register(this, SelectionKey.OP_READ, _socket);
}
@Override
public void read(Multiplexer multiplexer, SelectionKey key)
throws Exception
{
_monitor.preallocate(_position + _blockSize);
long nbytes = transferFrom(_socket, _position, _blockSize);
if (nbytes == -1) {
close(multiplexer, key, true);
} else {
_monitor.receivedBlock(_position, nbytes);
_position += nbytes;
}
}
}
public ModeS(Role role, RepositoryChannel file, ConnectionMonitor monitor,
int blockSize)
throws IOException
{
super(role, file, monitor);
_blockSize = blockSize;
}
@Override
public void newConnection(Multiplexer multiplexer, SocketChannel socket)
throws Exception
{
switch (_role) {
case Sender:
multiplexer.add(new Sender(socket));
break;
case Receiver:
multiplexer.add(new Receiver(socket));
break;
}
}
}