Skip to content

Commit

Permalink
few changes to streamer (should now play better)
Browse files Browse the repository at this point in the history
  • Loading branch information
primus11 committed May 22, 2012
1 parent b228c88 commit 342fc2f
Show file tree
Hide file tree
Showing 10 changed files with 303 additions and 37 deletions.
3 changes: 3 additions & 0 deletions src/Streamer/AudioFormatInfo.java
Expand Up @@ -17,6 +17,9 @@ public AudioFormatInfo(AudioFormat format) {
this.channels = format.getChannels();
this.signed = true; //??
this.bigEndian = format.isBigEndian();

//System.out.println( format.toString() );
//System.out.println( format.matches(getAudioFormat()) );
}

public AudioFormat getAudioFormat() {
Expand Down
112 changes: 99 additions & 13 deletions src/Streamer/Client.java
Expand Up @@ -4,6 +4,7 @@
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.util.ArrayList;

import javax.sound.sampled.AudioFormat;
import javax.sound.sampled.AudioSystem;
Expand All @@ -17,9 +18,15 @@ public class Client {
public static int rcvPort = 3426;
public static int sndPort = 3427;
DatagramSocket socket;
static String host = "127.0.0.1";
DatagramSocket sendSocket;

SourceDataLine auline = null;
//PlayWave.Position curPosition;
Position curPosition;

enum Position {
LEFT, RIGHT, NORMAL
};

public static void main(String[] args)
{
Expand All @@ -30,18 +37,21 @@ public static void main(String[] args)
public void start() {
try {
socket = new DatagramSocket(Client.rcvPort);
sendSocket = new DatagramSocket(Client.sndPort);
} catch (SocketException e) {
e.printStackTrace();
}

Utils.sendUDP(Codes.JOIN, "127.0.0.1", Server.recvPort, Client.sndPort); //can fail! (send until packet received back - TODO)
//Utils.sendUDP(Codes.JOIN, Client.host, Server.recvPort, Client.sndPort); //can fail! (send until packet received back - TODO)
Utils.sendUDP(Codes.JOIN, Client.host, Server.recvPort, sendSocket);

mainLoop();
}

void setFormat(Packet packet) {
if (auline != null && auline.isOpen()) {
auline.drain();
auline.flush();
auline.close();
}

Expand All @@ -54,41 +64,117 @@ void setFormat(Packet packet) {
e.printStackTrace();
}

/*if (auline.isControlSupported(FloatControl.Type.PAN)) {
this.curPosition = Position.NORMAL;
if (auline.isControlSupported(FloatControl.Type.PAN)) {
FloatControl pan = (FloatControl) auline.getControl(FloatControl.Type.PAN);
if (curPosition == PlayWave.Position.RIGHT)
if (this.curPosition == Position.RIGHT)
pan.setValue(1.0f);
else if (curPosition == PlayWave.Position.LEFT)
else if (this.curPosition == Position.LEFT)
pan.setValue(-1.0f);
}*/
}

auline.start();
}

void playBytes(Packet packet) {
//System.out.println("playBytes");
auline.write(packet.nextBytes, 0, packet.nextBytes.length);
/*while (auline.available() < packet.nextBytes.length)
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
//System.out.println(packet.nextBytes.length);
/*while(auline.isActive())
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
int offset = 0;
while (offset < packet.nextBytes.length)
offset += auline.write(packet.nextBytes, offset, packet.nextBytes.length-offset);
//offset += auline.write(packet.nextBytes, 0, packet.nextBytes.length);
}

void mainLoop() {
byte[] buffer = new byte[2048];
byte[] buffer = new byte[262144];
//byte[] buffer = new byte[1024];
int lastID = -1;

ArrayList<Packet> waitingPackets = new ArrayList<Packet>();
int oldest = 100;

DatagramPacket UDPpacket = new DatagramPacket(buffer, buffer.length);

while (true) {
DatagramPacket UDPpacket = new DatagramPacket(buffer, buffer.length);

try {
this.socket.receive(UDPpacket);
} catch (IOException e) {
e.printStackTrace();
}

Packet packet = (Packet) Utils.bytesToObj(buffer);
/*for (int i=0; i<UDPpacket.getLength(); i++)
if (UDPpacket.getData()[i] != buffer[i])
System.out.println("diff");*/


Packet packet = (Packet) Utils.bytesToObj( Utils.truncate(UDPpacket.getData(), UDPpacket.getLength()) );

if (packet.isFormat())
Utils.sendUDP(Codes.FORMAT, Client.host, Server.recvPort, sendSocket);

//System.out.println(packet.id + " " + packet.isFormat());

//if (!packet.isFormat()
// System.out.println(packet.checkSum + " " + packet.sumBytes());

if (packet.id != lastID+1) {
waitingPackets.add(packet);

//Packet minID = waitingPackets.get(0);
int minIdx = waitingPackets.size()-1;
for (int i=waitingPackets.size()-2; i>=0; i--) {
if (waitingPackets.get(i).id >= lastID + oldest || waitingPackets.get(i).id < lastID+1) {
waitingPackets.remove(i);
minIdx--;
continue;
}

if (waitingPackets.get(i).id < waitingPackets.get(minIdx).id)
minIdx = i;
}

//System.out.println("minId: " + waitingPackets.get(minIdx).id + " " + lastID);

packet = waitingPackets.get(minIdx);
waitingPackets.remove(minIdx);
if (packet.id >= lastID + oldest) {
lastID = packet.id - 1;
}
else if (packet.id == lastID + 1) {
}
}

//if not in order
/*if (packet.id != lastID+1 && !packet.isFormat()) {
//System.out.println(packet.id + " " + lastID+1);
System.out.println("not in order. id: " + packet.id + " lastid+1: " + (lastID+1));
continue;
}
else*/
lastID = packet.id;

//System.out.println(packet.id);

if ( packet.isFormat() ) {
//System.out.println("wewe");
setFormat(packet);
Utils.sendUDP(Codes.FORMAT, "127.0.0.1", Server.recvPort, Client.sndPort);
//Utils.sendUDP(Codes.FORMAT, Client.host, Server.recvPort, Client.sndPort);
//Utils.sendUDP(Codes.FORMAT, Client.host, Server.recvPort, sendSocket);
}
else {
//System.out.println("client " + packet.nextBytes + " " + new String(packet.nextBytes) + " " + packet.id);
//if (packet.id % 2 == 0)
playBytes(packet);
}

Expand Down
7 changes: 7 additions & 0 deletions src/Streamer/Codes.java
Expand Up @@ -3,4 +3,11 @@
public class Codes {
public static byte[] JOIN = {0, 0};
public static byte[] FORMAT = {0, 1};

public static boolean equal(byte[] code, byte[] cmd) {
for (int i=0; i<code.length; i++)
if (code[i] != cmd[i])
return false;
return true;
}
}
29 changes: 28 additions & 1 deletion src/Streamer/Packet.java
Expand Up @@ -15,8 +15,12 @@

public class Packet implements Serializable {
//boolean type;
public static int nextId = 0;
int id;
//String test;

byte[] nextBytes;
//int checkSum;

//AudioFormat format;
//DataLine.Info info;
Expand All @@ -28,7 +32,9 @@ public Packet(byte[] nextBytes) {
//this.nextBytes = new byte[1];
//this.nextBytes[0] = nextByte;
this.nextBytes = nextBytes;
this.bytes = Utils.ObjToBytes(this);
//this.checkSum = sumBytes();
//this.bytes = Utils.ObjToBytes(this);
init();
//byte[] by = Utils.ObjToBytes(this.format);
}

Expand All @@ -38,9 +44,22 @@ public Packet(AudioFormat format) {
//this.info = info;
this.audioFormatInfo = new AudioFormatInfo(format);

init();
}

void init() {
//this.test = "danes je lep dan";
this.id = Packet.nextId++;
this.bytes = Utils.ObjToBytes(this);
}

public int sumBytes() {
int sum = 0;
for (int i=0; i<nextBytes.length; i++)
sum += nextBytes[i];
return sum;
}

public boolean isFormat() {
return (this.audioFormatInfo != null);
}
Expand Down Expand Up @@ -72,4 +91,12 @@ public void send(String address, int port, int transmitterPort) {
public void send(Receiver receiver) {
send(receiver.ip, receiver.port, Server.sndPort);
}

public void send(Receiver receiver, DatagramSocket tsocket) {
send(receiver.ip, receiver.port, tsocket);
}

public void send(String address, int port, DatagramSocket tsocket) {
Utils.sendUDP(this.bytes, address, port, tsocket);
}
}
1 change: 1 addition & 0 deletions src/Streamer/PacketFiller.java
Expand Up @@ -14,6 +14,7 @@ public PacketFiller(BlockingQueue<Packet> queue, Playlist playlist) {

public void run() {
while (true) {
//System.out.println(queue.size());
if (queue.size() < 1000) {
try {
//Packet packet = this.playlist.getNextPacket();
Expand Down

0 comments on commit 342fc2f

Please sign in to comment.