-
Notifications
You must be signed in to change notification settings - Fork 1
/
queue_test_v1.py
84 lines (74 loc) · 1.75 KB
/
queue_test_v1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# from queue import Queue
#
# # Create an empty queue
# q = Queue()
# while True:
# # Get input from the user
# input_str = str(input("Enter a value to add to the queue: "))
#
# # Add the input to the queue
# q.put(input_str)
#
# print("Prompt added to the queue:")
# print(input_str+"\n")
#
# while not q.empty():
# print(q.get())
# # Attempt 2
# import threading
# from queue import Queue
# import time
#
# # Create an empty queue
# q = Queue()
#
# def add_input_to_queue():
# while True:
# # Get input from the user
# input_str = input("Enter a value to add to the queue: ")
#
# # Add the input to the queue
# q.put(input_str)
#
# print("Value added to the queue.")
#
# # Create a new thread
# thread = threading.Thread(target=add_input_to_queue)
#
# # Start the thread
# thread.start()
#
# def print_queue():
# assert q, "Queue does not exist"
# while True:
# print(q.qsize())
# time.sleep(2)
#
# thread2 = threading.Thread(target=print_queue)
#
# # Start the thread
# thread2.start()
import threading
import time
from queue import Queue
# Create an empty queue
q = Queue()
def add_input_to_queue():
while True:
# Get input from the user
input_str = input("Enter a value to add to the queue: ")
# Add the input to the queue
q.put(input_str)
print("Value added to the queue.")
def print_queue_length():
while True:
# Print the number of items in the queue
print("Number of items in the queue:", q.qsize())
# Sleep for 5 seconds
time.sleep(5)
# Create two new threads
thread1 = threading.Thread(target=add_input_to_queue)
thread2 = threading.Thread(target=print_queue_length)
# Start the threads
thread1.start()
thread2.start()