In [1]:
from pyspark.sql import SparkSession
import time

# Criar a sessão Spark
spark = SparkSession.builder \
    .appName("Jogo da Vida") \
    .getOrCreate()

In [2]:
def wall_time():
    tv = time.time()

    return tv

In [3]:
def normaliza(i, j, tam):
    return (i * (tam + 2)) + j

In [4]:
def init_tabuleiro(tam):
    tabuleiro_entrada = [0] * ((tam+2)*(tam+2))
    tabuleiro_saida = [0] * ((tam+2)*(tam+2))
    
    tabuleiro_entrada[normaliza(1, 2, tam)] = 1;
    tabuleiro_entrada[normaliza(2, 3, tam)] = 1;
    tabuleiro_entrada[normaliza(3, 1, tam)] = 1;
    tabuleiro_entrada[normaliza(3, 2, tam)] = 1;
    tabuleiro_entrada[normaliza(3, 3, tam)] = 1;
    return tabuleiro_entrada, tabuleiro_saida

In [5]:
def printa_separador(first, last):
    for i in range(first, last+1):
        print("=", end="")
    print("=")

def dump_tabuleiro(tabuleiro, tam, primeiro, ultimo, msg):
    print(f"{msg}; Dump posicoes [{primeiro}:{ultimo}, {primeiro}:{ultimo}] de tabuleiro {tam} x {tam}")

    printa_separador(primeiro, ultimo)

    for i in range(normaliza(primeiro, 0, tam), normaliza(ultimo+1, 0, tam), normaliza(1, 0, tam)):
        for j in range(i+primeiro, i+ultimo+1):
            print("X" if tabuleiro[j] else ".", end="")
        print()

    printa_separador(primeiro, ultimo)

In [6]:
def correto(tabuleiro, tam):
    cnt = sum(tabuleiro)

    return (
        cnt == 5
        and tabuleiro[normaliza(tam-2, tam-1, tam)]
        and tabuleiro[normaliza(tam-1, tam, tam)]
        and tabuleiro[normaliza(tam, tam-2, tam)]
        and tabuleiro[normaliza(tam, tam-1, tam)]
        and tabuleiro[normaliza(tam, tam, tam)]
    )

In [7]:
def uma_vida(x, tabuleiro_entrada, tabuleiro_saida, tam):
    v, i = x
    if(i < 1 or i > tam):
        return []

    for j in range(1, tam+1):
        vizinhos_vivos = tabuleiro_entrada[normaliza(i-1, j-1, tam)] + tabuleiro_entrada[normaliza(i-1, j, tam)] + \
                         tabuleiro_entrada[normaliza(i-1, j+1, tam)] + tabuleiro_entrada[normaliza(i, j-1, tam)] + \
                         tabuleiro_entrada[normaliza(i, j+1, tam)] + tabuleiro_entrada[normaliza(i+1, j-1, tam)] + \
                         tabuleiro_entrada[normaliza(i+1, j, tam)] + tabuleiro_entrada[normaliza(i+1, j+1, tam)]

        if tabuleiro_entrada[normaliza(i, j, tam)] and vizinhos_vivos < 2:
            tabuleiro_saida[j] = 0
        elif tabuleiro_entrada[normaliza(i, j, tam)] and vizinhos_vivos > 3:
            tabuleiro_saida[j] = 0
        elif not tabuleiro_entrada[normaliza(i, j, tam)] and vizinhos_vivos == 3:
            tabuleiro_saida[j] = 1
        else:
            tabuleiro_saida[j] = tabuleiro_entrada[normaliza(i, j, tam)]
    return tabuleiro_saida

In [8]:
POWMIN = 3
POWMAX = 10

for pow in range(POWMIN, POWMAX+1):
    tam = 1 << pow
    
    tempo0 = wall_time();
    tabuleiro_entrada, tabuleiro_saida = init_tabuleiro(tam)
    
    # Criar um RDD (Resilient Distributed Dataset) a partir das células iniciais
    entrada_rdd = spark.sparkContext.parallelize(tabuleiro_entrada)
    saida_rdd = spark.sparkContext.parallelize(tabuleiro_saida)
    
    tempo1 = wall_time();
    
    for i in range(2*(tam-3)):
        entrada_rdd = spark.sparkContext.parallelize(tabuleiro_entrada)
        rdd_index = entrada_rdd.zipWithIndex()
        saida_rdd = rdd_index.map(lambda x: uma_vida(x, tabuleiro_entrada, [0]*(tam+2), tam))
        saida_rdd = saida_rdd.filter(lambda x : x != [])
        tabuleiro_saida = [[0]*(tam+2)]
        tabuleiro_saida.extend(saida_rdd.collect())
        tabuleiro_saida.append([0]*(tam+2))
        tabuleiro_saida_resultante = [elemento for lista in tabuleiro_saida for elemento in lista]
        tabuleiro_saida = tabuleiro_saida_resultante
        
        saida_rdd = spark.sparkContext.parallelize(tabuleiro_saida)
        rdd_index = saida_rdd.zipWithIndex()
        entrada_rdd = rdd_index.map(lambda x: uma_vida(x, tabuleiro_saida, [0]*(tam+2), tam))
        entrada_rdd = entrada_rdd.filter(lambda x : x != [])
        tabuleiro_entrada = [[0]*(tam+2)]
        tabuleiro_entrada.extend(entrada_rdd.collect())
        tabuleiro_entrada.append([0]*(tam+2))
        tabuleiro_entrada_resultante = [elemento for lista in tabuleiro_entrada for elemento in lista]
        tabuleiro_entrada = tabuleiro_entrada_resultante
    
    tempo2 = wall_time();
    
    if correto(tabuleiro_entrada, tam):
        print("**RESULTADO CORRETO**")
    else:
        print("**RESULTADO ERRADO**")
    
    tempo3 = wall_time()
    
    print("** Dados de execução **")
    print(f"tamanho = {tam} Blocos")
    print("Tempos:")
    print(f"    Inicialização:      {tempo1 - tempo0:.7f} Segundos")
    print(f"    Computa vida:       {tempo2 - tempo1:.7f} Segundos")
    print(f"    Verifica corretude: {tempo3 - tempo2:.7f} Segundos")
    print(f"Tempo total: {tempo3 - tempo0:.7f} Segundos\n\n")

**RESULTADO CORRETO**
** Dados de execução **
tamanho = 8 Blocos
Tempos:
    Inicialização:      0.2867811 Segundos
    Computa vida:       10.7237015 Segundos
    Verifica corretude: 0.0000951 Segundos
Tempo total: 11.0105777 Segundos


**RESULTADO CORRETO**
** Dados de execução **
tamanho = 16 Blocos
Tempos:
    Inicialização:      0.0063810 Segundos
    Computa vida:       18.5989878 Segundos
    Verifica corretude: 0.0000739 Segundos
Tempo total: 18.6054428 Segundos


**RESULTADO CORRETO**
** Dados de execução **
tamanho = 32 Blocos
Tempos:
    Inicialização:      0.0061274 Segundos
    Computa vida:       40.8299544 Segundos
    Verifica corretude: 0.0000970 Segundos
Tempo total: 40.8361788 Segundos


**RESULTADO CORRETO**
** Dados de execução **
tamanho = 64 Blocos
Tempos:
    Inicialização:      0.0057714 Segundos
    Computa vida:       88.3846455 Segundos
    Verifica corretude: 0.0001426 Segundos
Tempo total: 88.3905594 Segundos


**RESULTADO CORRETO**
** Dados de execução **