-
Notifications
You must be signed in to change notification settings - Fork 41
/
Better way60_3.py
160 lines (129 loc) · 4.14 KB
/
Better way60_3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from threading import Thread
from queue import Queue
# 파이썬 3.6이하에서는 실행되지 않는다.
class ClosableQueue(Queue):
SENTINEL = object()
def close(self):
self.put(self.SENTINEL)
def __iter__(self):
while True:
item = self.get()
try:
if item is self.SENTINEL:
return # 스레드를 종료시킨다
yield item
finally:
self.task_done()
in_queue = ClosableQueue()
logic_queue = ClosableQueue()
out_queue = ClosableQueue()
class StoppableWorker(Thread):
def __init__(self, func, in_queue, out_queue, **kwargs):
super().__init__(**kwargs)
self.func = func
self.in_queue = in_queue
self.out_queue = out_queue
def run(self):
for item in self.in_queue:
result = self.func(item)
self.out_queue.put(result)
ALIVE = '*'
EMPTY = '-'
class SimulationError(Exception):
pass
class Grid:
def __init__(self, height, width):
self.height = height
self.width = width
self.rows = []
for _ in range(self.height):
self.rows.append([EMPTY] * self.width)
def get(self, y, x):
return self.rows[y % self.height][x % self.width]
def set(self, y, x, state):
self.rows[y % self.height][x % self.width] = state
def __str__(self):
output = ''
for row in self.rows:
for cell in row:
output += cell
output += '\n'
return output
async def count_neighbors(y, x, get):
n_ = get(y - 1, x + 0) # 북(N)
ne = get(y - 1, x + 1) # 북동(NE)
e_ = get(y + 0, x + 1) # 동(E)
se = get(y + 1, x + 1) # 남동(SE)
s_ = get(y + 1, x + 0) # 남(S)
sw = get(y + 1, x - 1) # 남서(SW)
w_ = get(y + 0, x - 1) # 서(W)
nw = get(y - 1, x - 1) # 북서(NW)
neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
count = 0
for state in neighbor_states:
if state == ALIVE:
count += 1
# 여기서 블러킹 I/O를 수행한다
#data = my_socket.recv(100)
return count
async def game_logic(state, neighbors):
if state == ALIVE:
if neighbors < 2:
return EMPTY # 살아 있는 이웃이 너무 적음: 죽음
elif neighbors > 3:
return EMPTY # 살아 있는 이웃이 너무 많음: 죽음
else:
if neighbors == 3:
return ALIVE # 다시 생성됨
# 여기서 I/O를 수행한다
#data = await my_socket.recv(100)
return state
async def step_cell(y, x, get, set):
state = get(y, x)
neighbors = await count_neighbors(y, x, get)
next_state = await game_logic(state, neighbors)
set(y, x, next_state)
class ColumnPrinter:
def __init__(self):
self.columns = []
def append(self, data):
self.columns.append(data)
def __str__(self):
row_count = 1
for data in self.columns:
row_count = max(
row_count, len(data.splitlines()) + 1)
rows = [''] * row_count
for j in range(row_count):
for i, data in enumerate(self.columns):
line = data.splitlines()[max(0, j - 1)]
if j == 0:
padding = ' ' * (len(line) // 2)
rows[j] += padding + str(i) + padding
else:
rows[j] += line
if (i + 1) < len(self.columns):
rows[j] += ' | '
return '\n'.join(rows)
import asyncio
async def simulate(grid):
next_grid = Grid(grid.height, grid.width)
tasks = []
for y in range(grid.height):
for x in range(grid.width):
task = step_cell(
y, x, grid.get, next_grid.set) # 팬아웃
tasks.append(task)
await asyncio.gather(*tasks) # 팬인
return next_grid
grid = Grid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
columns = ColumnPrinter()
for i in range(5):
columns.append(str(grid))
grid = asyncio.run(simulate(grid)) # 이벤트 루프를 실행한다
print(columns)