In [None]:
import dask
import matplotlib.pyplot as plt
import numpy as np
from dask.distributed import Client
from dask import delayed
import dask.multiprocessing
import dask.array as da
import time
from scipy.ndimage import convolve

In [None]:
# using standard dask clienk
dask.config.set(scheduler='processes')
client = Client()
client


In [None]:
def read_info_from_file(input_file):
    # initialize board
    with open(input_file) as f:
        w, h = [int(x) for x in next(f).split()]
        # use the smallest data type
        board = np.zeros((w, h),dtype=np.uint8)
        for line_count, line in enumerate(f, start=0):
            single_numbers = line.split()
            x, y = map(int, single_numbers[:2])
            board[x][y] = 1
        dask_board = da.from_array(board, chunks=(chunk_size, chunk_size))
    return dask_board

In [None]:
# set the variables
chunk_size = 100
iterations = 1
overlap = 1

input_file = 'Data/input.txt'
#initialize 
board = read_info_from_file(input_file)

#make a mask for counting the live neighbours
mask = da.ones((3,3))
#get rid from the middle
mask[1, 1] = 0

In [None]:
def process_cell(board_chunk, block_id=None):
    width, height = board_chunk.shape
    #based on the index of chunks
    start_width = block_id[0] * width
    start_height = block_id[1] * height
    board_slice = board[start_width:(start_width + width), start_height:(start_height + height)].compute()

    # Apply the rules of the game.
    board_chunk[np.logical_or(board_chunk < 2, board_chunk > 3)] = 0
    board_chunk[board_chunk == 3] = 1
    # cell with two neighbours stay the same
    board_chunk[board_chunk == 2] = board_slice[board_chunk == 2]
    return board_chunk

def prepare_next_board(steps, board, mask):
    for _ in range(steps):
        # use the convolution
        num_live_neighbors = board.map_overlap(convolve, depth=1, boundary='none', 
                                                    weights=mask, mode='constant', cval=0)
        board = num_live_neighbors.map_blocks(process_cell, dtype=np.uint8)

    return board




In [None]:
import warnings

# Suppress Bokeh warnings
warnings.filterwarnings('ignore', category=UserWarning)

final_board = prepare_next_board(iterations, board, mask)

final_result = final_board.compute()
print(final_result)

Write to the text file

In [None]:
def write_output_file(output, board):
    w = board.shape[0]
    h = board.shape[1]
    live_cells = []
    for r in range(w):
        for c in range(h):
            if board[r][c] == 1:
                live_cells.append([r, c])
    f = open(output, "w")
    f.write(str(w)+ " "+ str(h)+"\n")
    for cell in live_cells:
        f.write(str(cell[0])+ " "+ str(cell[1])+ "\n")
    f.close()

In [None]:
output = input_file.split('/')[-1]
write_output_file('Output_Files/' + output, final_result)

In [None]:
client.shutdown()