#!/usr/bin/env ruby
#
# $Source: /var/cvs/sysvipc/sysvipc/test_sysvipc,v $
#
# $Revision: 1.21 $
# $Date: 2007/09/02 19:20:46 $
#
# Copyright (c) 2006 James Steven Jenkins.
#
$:.unshift(ENV['PWD'])
require 'sysvipc'
require 'test/unit'
include SystemVIPC
KEY = ftok(__FILE__, 0)
NSEMS = 16
NSEMOPS = 5
NMSGS = 16
SHMSIZE = 1024
class TestCradle < Test::Unit::TestCase
def setup
end
def test_msg
msg = MessageQueue.new(KEY, IPC_CREAT | 0660)
assert_instance_of(MessageQueue, msg, 'MessageQueue.new')
uid = Process.uid
gid = Process.gid
perm = Permission.new(msg)
assert_instance_of(Permission, perm, 'Permission.new')
assert_equal(uid, perm.uid, 'Permission#uid')
assert_equal(gid, perm.gid, 'Permission#gid')
assert_equal(uid, perm.cuid, 'Permission#cuid')
assert_equal(gid, perm.cgid, 'Permission#cgid')
1.upto(NMSGS) do |i|
assert_equal(msg, msg.send(i, "message #{i}"), 'MessageQueue#send')
end
NMSGS.downto(1) do |i|
assert_equal("message #{i}", msg.recv(i, 100), 'MessageQueue#recv')
end
Process.fork do
sleep 1
msg.send(1, 'message 1')
end
assert_equal("message 1", msg.recv(1, 100), 'MessageQueue#recv')
Process.wait
t = Thread.new do
sleep 1
msg.send(2, 'message 2')
end
assert_equal('message 2', msg.recv(2, 100), 'MessageQueue#recv')
msg.remove
end
def test_sem
sem = Semaphore.new(KEY, NSEMS, IPC_CREAT | 0660)
assert_kind_of(Semaphore, sem, 'Semaphore.new')
uid = Process.uid
gid = Process.gid
perm = Permission.new(sem)
assert_instance_of(Permission, perm, 'Permission.new')
assert_equal(uid, perm.uid, 'Permission#uid')
assert_equal(gid, perm.gid, 'Permission#gid')
assert_equal(uid, perm.cuid, 'Permission#cuid')
assert_equal(gid, perm.cgid, 'Permission#cgid')
assert_equal(sem.size, NSEMS, 'Semaphore#size')
NSEMS.times do |i|
assert_equal(sem, sem.set_value(i, 2), 'Semaphore#set_value')
assert_equal(2, sem.value(i), 'Semaphore#value')
end
values = Array.new(NSEMS, 1)
assert_equal(sem, sem.set_all(values), 'Semaphore#set_all')
assert_equal(values, sem.to_a, 'Semaphore#to_a')
NSEMS.times do |i|
assert_equal(0, sem.n_count(i), 'Semaphore#n_count')
assert_equal(0, sem.z_count(i), 'Semaphore#z_count')
end
acquire = []
release = []
NSEMOPS.times do |i|
op = SemaphoreOperation.new(i, -1)
assert_kind_of(SemaphoreOperation, op, 'SemaphoreOperation.new')
assert_equal(i, op.pos, 'SemaphoreOperation#pos')
assert_equal(-1, op.value, 'SemaphoreOperation#value')
assert_equal(0, op.flags, 'SemaphoreOperation#flags')
acquire << op
op = SemaphoreOperation.new(i, 1)
assert_equal(i, op.pos, 'SemaphoreOperation#pos')
assert_equal(1, op.value, 'SemaphoreOperation#value')
assert_equal(0, op.flags, 'SemaphoreOperation#flags')
release << op
end
assert_equal(sem, sem.apply(acquire), 'Semaphore#apply')
assert_equal(sem, sem.apply(release), 'Semaphore#apply')
pid = Process.pid
NSEMOPS.times do |i|
assert_equal(pid, sem.pid(i), 'Semaphore#pid')
end
rd, wr = IO.pipe
wr.write '1'
assert_equal(sem, sem.apply(acquire), 'Semaphore#apply')
Process.fork do
rd.close
wr.write '2'
sem.apply(acquire)
wr.write '4'
sem.apply(release)
wr.write '5'
wr.close
end
sleep 1
wr.write '3'
assert_equal(sem, sem.apply(release), 'Semaphore#apply')
wr.close
Process.wait
assert_equal('12345', rd.read, 'Semaphore')
rd.close
seq = '1'
assert_equal(sem, sem.apply(acquire), 'Semaphore#apply')
t = Thread.new do
seq << '2'
sem.apply(acquire)
seq << '4'
sem.apply(release)
seq << '5'
end
sleep 1
seq << '3'
assert_equal(sem, sem.apply(release), 'Semaphore#apply')
t.join
assert_equal('12345', seq, 'Semaphore')
nowait = [SemaphoreOperation.new(0, -1, IPC_NOWAIT)]
assert_equal(sem, sem.apply(acquire), 'Semaphore#apply')
exception = assert_raise(Errno::EAGAIN) do
sem.apply(nowait)
end
assert_equal(sem, sem.apply(release), 'Semaphore#apply')
sem.remove
end
def test_shm
shm = SharedMemory.new(KEY, SHMSIZE, IPC_CREAT | 0660)
assert_kind_of(SharedMemory, shm, 'SharedMemory.new')
uid = Process.uid
gid = Process.gid
perm = Permission.new(shm)
assert_instance_of(Permission, perm, 'Permission.new')
assert_equal(uid, perm.uid, 'Permission#uid')
assert_equal(gid, perm.gid, 'Permission#gid')
assert_equal(uid, perm.cuid, 'Permission#cuid')
assert_equal(gid, perm.cgid, 'Permission#cgid')
assert_equal(SHMSIZE, shm.size, 'SharedMemory#size')
assert_equal(shm, shm.attach, 'SharedMemory#attach')
data_size = SHMSIZE - 1
adata = 'A' * data_size
bdata = 'B' * data_size
assert_equal(shm, shm.write(adata), 'SharedMemory#write')
assert_equal(adata, shm.read(data_size), 'SharedMemory#read')
assert_equal(shm, shm.write('test123'), 'SharedMemory#write')
assert_equal('123', shm.read(3, 4), 'SharedMemory#read')
assert_equal(shm, shm.write('test', 4), 'SharedMemory#write')
assert_equal('testtest', shm.read(8, 0), 'SharedMemory#read')
Process.fork do
shm.write(bdata)
end
Process.wait
assert_equal(bdata, shm.read(data_size), 'SharedMemory#read')
t = Thread.new do
shm.write(adata)
end
t.join
assert_equal(adata, shm.read(data_size), 'SharedMemory#read')
assert_equal(shm, shm.detach, 'SharedMemory#detach')
shm.remove
end
def teardown
end
end