-
Notifications
You must be signed in to change notification settings - Fork 31
Muraveva_A_B_Synchronizer #151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
The head ref may contain hidden characters: "Muraveva_A_B_pra\u0441_6_sync"
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,25 +1,46 @@ | ||
| package hse.java.lectures.lecture6.tasks.queue; | ||
|
|
||
| import java.util.LinkedList; | ||
| import java.util.Queue; | ||
|
|
||
| public class BoundedBlockingQueue<T> { | ||
|
|
||
| private final int capacity; | ||
| private final Queue<T> queue; | ||
|
|
||
| public BoundedBlockingQueue(int capacity) { | ||
|
|
||
| if (capacity <= 0) { | ||
| throw new IllegalArgumentException(); | ||
| } | ||
| this.capacity = capacity; | ||
| this.queue = new LinkedList<>(); | ||
| } | ||
|
|
||
| public void put(T item) throws InterruptedException { | ||
|
|
||
| public synchronized void put(T item) throws InterruptedException { | ||
| if (item == null) { | ||
| throw new NullPointerException(); | ||
| } | ||
| while (queue.size() == capacity) { | ||
| wait(); | ||
| } | ||
| queue.add(item); | ||
| notifyAll(); | ||
| } | ||
|
|
||
| public T take() throws InterruptedException { | ||
| return null; | ||
| public synchronized T take() throws InterruptedException { | ||
| while (queue.isEmpty()) { | ||
| wait(); | ||
| } | ||
| T item = queue.poll(); | ||
| notifyAll(); | ||
| return item; | ||
| } | ||
|
|
||
| public int size() { | ||
| return 0; | ||
| public synchronized int size() { | ||
| return queue.size(); | ||
| } | ||
|
|
||
| public int capacity() { | ||
| return 0; | ||
| return capacity; | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -26,11 +26,13 @@ public void attachMonitor(StreamingMonitor monitor) { | |||||||||||||||||||
|
|
||||||||||||||||||||
| @Override | ||||||||||||||||||||
| public void run() { | ||||||||||||||||||||
| // Writer threads are intentionally infinite for the task contract. | ||||||||||||||||||||
| while (true) { | ||||||||||||||||||||
| if (!monitor.waitTurn(id)) { | ||||||||||||||||||||
| break; | ||||||||||||||||||||
| } | ||||||||||||||||||||
| output.print(message); | ||||||||||||||||||||
| onTick.run(); | ||||||||||||||||||||
| monitor.tickDone(); | ||||||||||||||||||||
|
Comment on lines
33
to
+35
|
||||||||||||||||||||
| output.print(message); | |
| onTick.run(); | |
| monitor.tickDone(); | |
| try { | |
| output.print(message); | |
| onTick.run(); | |
| } finally { | |
| monitor.tickDone(); | |
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,5 +1,45 @@ | ||||||||||||||||||
| package hse.java.lectures.lecture6.tasks.synchronizer; | ||||||||||||||||||
|
|
||||||||||||||||||
| public class StreamingMonitor { | ||||||||||||||||||
| // impl your sync here | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| private final int[] sortedIds; | ||||||||||||||||||
| private final int targetTicks; | ||||||||||||||||||
| private int currentIdx = 0; | ||||||||||||||||||
| private int completedTicks = 0; | ||||||||||||||||||
|
|
||||||||||||||||||
| public StreamingMonitor(int[] sortedIds, int targetTicks) { | ||||||||||||||||||
|
||||||||||||||||||
| public StreamingMonitor(int[] sortedIds, int targetTicks) { | |
| public StreamingMonitor(int[] sortedIds, int targetTicks) { | |
| if (sortedIds == null || sortedIds.length == 0) { | |
| throw new IllegalArgumentException("sortedIds must be non-null and non-empty"); | |
| } | |
| if (targetTicks < 0) { | |
| throw new IllegalArgumentException("targetTicks must be non-negative"); | |
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,17 +17,24 @@ public Synchronizer(List<StreamWriter> tasks, int ticksPerWriter) { | |
| this.ticksPerWriter = ticksPerWriter; | ||
| } | ||
|
|
||
| /** | ||
| * Starts infinite writer threads and waits until each writer prints exactly ticksPerWriter ticks | ||
| * in strict ascending id order. | ||
| */ | ||
| public void execute() { | ||
| // add monitor and sync | ||
| int[] sortedIds = tasks.stream() | ||
| .mapToInt(StreamWriter::getId) | ||
| .sorted() | ||
| .toArray(); | ||
|
|
||
| StreamingMonitor monitor = new StreamingMonitor(sortedIds, ticksPerWriter); | ||
|
|
||
|
Comment on lines
+21
to
+27
|
||
| for (StreamWriter writer : tasks) { | ||
| writer.attachMonitor(monitor); | ||
| } | ||
|
|
||
| for (StreamWriter writer : tasks) { | ||
| Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); | ||
| worker.setDaemon(true); | ||
| worker.start(); | ||
| } | ||
| } | ||
|
|
||
| } | ||
| monitor.awaitDone(); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current implementation exits the writer thread once
waitTurn()returns false. This conflicts with the task contract intask.mdthat describesStreamWriter.run()as an infinite loop; a safer approach is usually to keep the thread alive but block it after completion (or otherwise align the contract + implementation).