<a href="https://colab.research.google.com/github/TesisDeepcodeUcuenca/Tesis-Deepcode-Ucuenca/blob/main/Codigos%20Convencionales/C%C3%B3digos_convencionales.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#@title Convolucional 1/2

import numpy as np
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt
import commpy.channelcoding.convcode as cc
import commpy.utilities as util
import scipy.stats as stats  # Importar scipy.stats
from google.colab import files

# Define la función BSC (Binary Symmetric Channel)
def bsc(input_bits, p_t):
    """
    Binary Symmetric Channel.

    Parameters
    ----------
    input_bits : 1D ndarray containing {0, 1}
        Input array of bits to the channel.

    p_t : float in [0, 1]
        Transition/Error probability of the channel.

    Returns
    -------
    output_bits : 1D ndarray containing {0, 1}
        Output bits from the channel.
    """
    output_bits = input_bits.copy()
    flip_locs = (np.random.rand(len(output_bits)) <= p_t)
    output_bits[flip_locs] = 1 ^ output_bits[flip_locs]
    return output_bits

# Define los parámetros del código convolucional
memory = np.array(2, ndmin=1)
g_matrix = np.array((0o5, 0o7), ndmin=2)  # Tasa de codificación 1/2
trellis = cc.Trellis(memory, g_matrix)
tb_depth = 5 * (memory[0] + 1)  # Profundidad de traceback recomendada

# Probabilidades de error a probar
error_probabilities = np.linspace(0.0001, 0.15, 20)

# Número de tramas y longitud de cada mensaje
num_block = 100000
batch_size = 100
num_messages_per_frame = 100
message_length = 50
num_epochs = 2

# Almacena los BER promedio para cada probabilidad de error
ber_results = {p_t: [] for p_t in error_probabilities}  # Diccionario para almacenar BER para cada probabilidad en cada época


for epoch in range(num_epochs):
    print("")
    print(f"Epoch {epoch + 1}/{num_epochs}")
    print(" ")

    for p_t in error_probabilities:
        total_bit_errors = 0
        total_bits = 0

        print(f"Simulating for error probability p_t={p_t}...")

        for _ in tqdm(range(int(num_block/batch_size))):
            # Generar bits de mensaje aleatorios en un batch
            message_bits_batch = np.random.randint(0, 2, (num_messages_per_frame, message_length))

            # Codificar los bits del mensaje en un batch
            coded_bits_batch = np.array([cc.conv_encode(message_bits, trellis) for message_bits in message_bits_batch])

            # Pasar los bits codificados a través del canal BSC en un batch
            received_bits_batch = np.array([bsc(coded_bits, p_t) for coded_bits in coded_bits_batch])

            # Decodificar los bits recibidos en un batch
            decoded_bits_batch = np.array([cc.viterbi_decode(received_bits.astype(float), trellis, tb_depth) for received_bits in received_bits_batch])

            # Calcular el número de errores de bit en un batch
            for message_bits, decoded_bits in zip(message_bits_batch, decoded_bits_batch):
                num_bit_errors = util.hamming_dist(message_bits, decoded_bits[:len(message_bits)])
                total_bit_errors += num_bit_errors
                total_bits += len(message_bits)

        # Calcular la BER promedio para esta época
        ber = total_bit_errors / total_bits
        ber_results[p_t].append(ber)
        print(f"BER for p_t={p_t}: {ber}")

# Convertir los resultados en un DataFrame
results_df = pd.DataFrame({
    'Epoch': np.repeat(range(1, num_epochs + 1), len(error_probabilities)),
    'Probabilidad de Error (p_t)': np.tile(error_probabilities, num_epochs),
    'BER': [ber for sublist in ber_results.values() for ber in sublist]
})

# Guardar los resultados en un archivo CSV
csv_path = '/content/ber_vs_probabilidades.csv'
results_df.to_csv(csv_path, index=False)

# Calcular el promedio y el intervalo de confianza
mean_ber = []
conf_intervals = []

for p_t in error_probabilities:
    data = ber_results[p_t]
    mean = np.mean(data)
    confidence_interval = stats.t.interval(0.95, len(data)-1, loc=mean, scale=stats.sem(data))

    mean_ber.append(mean)
    conf_intervals.append(confidence_interval)

# Convertir listas a arrays para facilitar la manipulación
mean_ber = np.array(mean_ber)
conf_intervals = np.array(conf_intervals)

# Graficar los resultados
plt.figure()
plt.errorbar(error_probabilities, mean_ber,
             yerr=[mean_ber - conf_intervals[:, 0], conf_intervals[:, 1] - mean_ber],
             fmt='o', capsize=5)
plt.xlabel('Probabilidad de error (p_t)')
plt.ylabel('Tasa de Error de Bit (BER)')
plt.title('BER promedio vs Probabilidad de Error con intervalos de confianza')
plt.grid(True)
# plt.xscale('log')  # Escala logarítmica para el eje x si es necesario
plt.yscale('log')  # Escala logarítmica para el eje y
plt.show()

# Descargar el archivo CSV
files.download(csv_path)

In [None]:
#@title Convolucional 1/3

import numpy as np
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt
import commpy.channelcoding.convcode as cc
import commpy.utilities as util
import scipy.stats as stats  # Importar scipy.stats

# Define la función BSC (Binary Symmetric Channel)
def bsc(input_bits, p_t):
    """
    Binary Symmetric Channel.

    Parameters
    ----------
    input_bits : 1D ndarray containing {0, 1}
        Input array of bits to the channel.

    p_t : float in [0, 1]
        Transition/Error probability of the channel.

    Returns
    -------
    output_bits : 1D ndarray containing {0, 1}
        Output bits from the channel.
    """
    output_bits = input_bits.copy()
    flip_locs = (np.random.rand(len(output_bits)) <= p_t)
    output_bits[flip_locs] = 1 ^ output_bits[flip_locs]
    return output_bits

# Define los parámetros del código convolucional
memory = np.array(3, ndmin=1)
g_matrix = np.array((0o7, 0o7, 0o5), ndmin=2)  # Tasa de codificación 1/2
trellis = cc.Trellis(memory, g_matrix)
tb_depth = 5 * (memory[0] + 1)  # Profundidad de traceback recomendada

# Probabilidades de error a probar
#error_probabilities = np.linspace(0.0001, 0.15, 20)

error_probabilities = np.array([0.134221053, 0.142110526, 0.15])



# Número de tramas y longitud de cada mensaje
num_block = 100000
batch_size = 100
num_messages_per_frame = 100
message_length = 50
num_epochs = 1

# Almacena los BER promedio para cada probabilidad de error
ber_results = {p_t: [] for p_t in error_probabilities}  # Diccionario para almacenar BER para cada probabilidad en cada época


for epoch in range(num_epochs):
    print("")
    print(f"Epoch {epoch + 1}/{num_epochs}")
    print(" ")

    for p_t in error_probabilities:
        total_bit_errors = 0
        total_bits = 0

        print(f"Simulating for error probability p_t={p_t}...")

        for _ in tqdm(range(int(num_block/batch_size))):
            # Generar bits de mensaje aleatorios en un batch
            message_bits_batch = np.random.randint(0, 2, (num_messages_per_frame, message_length))

            # Codificar los bits del mensaje en un batch
            coded_bits_batch = np.array([cc.conv_encode(message_bits, trellis) for message_bits in message_bits_batch])

            #print(coded_bits_batch.shape)

            # Pasar los bits codificados a través del canal BSC en un batch
            received_bits_batch = np.array([bsc(coded_bits, p_t) for coded_bits in coded_bits_batch])
            #print(received_bits_batch.shape)
            # Decodificar los bits recibidos en un batch
            decoded_bits_batch = np.array([cc.viterbi_decode(received_bits.astype(float), trellis, tb_depth) for received_bits in received_bits_batch])

            # Calcular el número de errores de bit en un batch
            for message_bits, decoded_bits in zip(message_bits_batch, decoded_bits_batch):
                num_bit_errors = util.hamming_dist(message_bits, decoded_bits[:len(message_bits)])
                total_bit_errors += num_bit_errors
                total_bits += len(message_bits)

        # Calcular la BER promedio para esta época
        ber = total_bit_errors / total_bits
        ber_results[p_t].append(ber)
        print(f"BER for p_t={p_t}: {ber}")

# Convertir los resultados en un DataFrame
results_df = pd.DataFrame({
    'Epoch': np.repeat(range(1, num_epochs + 1), len(error_probabilities)),
    'Probabilidad de Error (p_t)': np.tile(error_probabilities, num_epochs),
    'BER': [ber for sublist in ber_results.values() for ber in sublist]
})

# Guardar los resultados en un archivo CSV
results_df.to_csv('ber_vs_probabilidades22.csv', index=False)

# Calcular el promedio y el intervalo de confianza
mean_ber = []
conf_intervals = []

for p_t in error_probabilities:
    data = ber_results[p_t]
    mean = np.mean(data)
    confidence_interval = stats.t.interval(0.95, len(data)-1, loc=mean, scale=stats.sem(data))

    mean_ber.append(mean)
    conf_intervals.append(confidence_interval)

# Convertir listas a arrays para facilitar la manipulación
mean_ber = np.array(mean_ber)
conf_intervals = np.array(conf_intervals)

# Graficar los resultados
plt.figure()
plt.errorbar(error_probabilities, mean_ber,
             yerr=[mean_ber - conf_intervals[:, 0], conf_intervals[:, 1] - mean_ber],
             fmt='o', capsize=5)
plt.xlabel('Probabilidad de error (p_t)')
plt.ylabel('Tasa de Error de Bit (BER)')
plt.title('BER promedio vs Probabilidad de Error con intervalos de confianza')
plt.grid(True)
# plt.xscale('log')  # Escala logarítmica para el eje x si es necesario
plt.yscale('log')  # Escala logarítmica para el eje y
plt.show()



In [None]:
#@title Codigos Hamming (15, 11) y datos crudos

import numpy as np
import csv
import pandas as pd
def guardar_matriz_en_csv(matriz, nombre_archivo):
    # Convertir la matriz a un DataFrame de pandas
    df = pd.DataFrame(matriz.T)  # Transponer para que cada columna corresponda a una probabilidad

    # Asignar nombres a las columnas como Probabilidad 1, Probabilidad 2, etc.
    df.columns = [f'Probabilidad {i + 1}' for i in range(matriz.shape[0])]

    # Guardar el DataFrame en un archivo CSV
    df.to_csv(nombre_archivo, index=False)
def canal_bsc(mensaje, ruido):
    channel_message = np.bitwise_xor(mensaje, ruido)
    return channel_message
def hamming_data(bits):
    matrix_dimension = int(np.ceil(np.log2(len(bits))))
    potencias_de_dos = [2 ** i for i in range(matrix_dimension + 1)]
    posiciones_info = [i for i in range(1, 2 ** matrix_dimension + 1) if i not in potencias_de_dos]
    return posiciones_info, matrix_dimension
def calculate_ber(original_bits, decoded_bits):
    error_count = np.sum(original_bits != decoded_bits)
    ber = error_count / len(original_bits)
    return ber
def hamming_decode(Rx_bits, posiciones_info):
    uncoded_bits = []
    parity_check = 0
    for i, bit in enumerate(Rx_bits):
        if bit:
            parity_check ^= i
    if parity_check != 0:
        Rx_bits[parity_check] ^= 1
    for pos in posiciones_info:
        uncoded_bits.append(Rx_bits[pos])

    return uncoded_bits
def hamming_encode(bits, posiciones_info, matrix_dimension):
    bits_data = list(enumerate(bits))
    matriz_hamming = np.zeros(2**matrix_dimension, dtype=int)
    bit_activados = np.zeros((matrix_dimension, ((2*matrix_dimension)-1)), dtype=int)
    flags_paridad = np.zeros(matrix_dimension, dtype=int)
    bits_paridad = np.zeros(matrix_dimension, dtype=int)
    for i, pos in enumerate(posiciones_info): matriz_hamming[pos] = bits_data[i][1]
    for i, pos in enumerate(posiciones_info):
        pos_bin = format(pos, '04b')
        for j in range(matrix_dimension):
            if pos_bin[(matrix_dimension-1) - j] == '1':
                bit_activados[j, flags_paridad[j]] = matriz_hamming[pos]
                flags_paridad[j] += 1
    # for i in range(0, len(matriz_hamming), 4):print(matriz_hamming[i:i+4])
    for j in range(matrix_dimension):
        for i in bit_activados[j]:
            bits_paridad[j] ^= i
        matriz_hamming[2 ** j] = bits_paridad[j]
    return matriz_hamming
def tanda_hamming(batch_size, noise_block_len, block_len, qs, _, name):
    ber_q = np.zeros((len(qs), batch_size))
    for j, q in enumerate(qs):
        data = np.random.randint(0, 2, (batch_size, block_len, 1))
        noise = np.random.binomial(1, q, size=(batch_size, noise_block_len, 1)).astype(int)
        for i in range(0, batch_size):
            raw_data = (data[i:i + 1, :, :]).squeeze().squeeze()
            batch_noise = (noise[i:i + 1, :, :]).squeeze().squeeze()
            posiciones_info, matrix_dimension = hamming_data(raw_data)
            encoded_vector = hamming_encode(raw_data, posiciones_info, matrix_dimension)
            channel_vector = canal_bsc(encoded_vector, batch_noise)
            decoded_vector = hamming_decode(channel_vector, posiciones_info)
            ber = calculate_ber(raw_data, decoded_vector)
            ber_q[j][i] = ber
    return ber_q
def guardar_csv(data, filename):
    filepath = filename  # Ajusta la ruta según dónde quieras guardar el archivo
    with open(filepath, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['Q values', 'Average BER'])
        for row in data:
            writer.writerow(row)
def tanda_raw(batch_size, block_len, qs, _, name):
    ber_q = np.zeros((len(qs), batch_size))
    for j, q in enumerate(qs):
        data = np.random.randint(0, 2, (batch_size, block_len, 1))
        noise = np.random.binomial(1, q, size=(batch_size,block_len, 1)).astype(int)
        for i in range(0, batch_size):
            raw_data = (data[i:i + 1, :, :]).squeeze().squeeze()
            batch_noise = (noise[i:i + 1, :, :]).squeeze().squeeze()
            channel_vector = canal_bsc(raw_data, batch_noise)
            ber = calculate_ber(raw_data, channel_vector)
            ber_q[j][i] = ber
    return ber_q

if __name__ == "__main__":

    block_len = 11
    noise_block_len = 16
    batch_size = 100
    code_rate = 3
    num_block = 100000
    num_test_batch = int(num_block / batch_size)

    qs = np.linspace(0.0001, 0.15, 20)
    arrays_q_raw = []
    arrays_q_HAM = []
    print(f"Total:", num_test_batch, "tandas")
    for _ in range(num_test_batch):
        ber_q_HAM = tanda_hamming(batch_size, noise_block_len, block_len, qs, _,"H1.csv")
        arrays_q_HAM.append(ber_q_HAM)
        ber_q_raw = tanda_raw(batch_size, block_len, qs, _, "RAW1.csv")
        arrays_q_raw.append(ber_q_raw)

    qs_reshaped = qs.reshape(20, 1)

    HAM_stack = np.hstack(arrays_q_HAM)
    HAM_data = np.hstack((qs_reshaped, HAM_stack))
    HAM_data_transposed = HAM_data.T
    guardar_matriz_en_csv(HAM_data, "raw50.csv")

    raw_stack = np.hstack(arrays_q_raw)
    raw_data = np.hstack((qs_reshaped, raw_stack))
    RAW_data_transposed = raw_data.T
    guardar_matriz_en_csv(raw_data, "raw50.csv")


    print(raw_data.shape)
    print(HAM_data.shape)


In [None]:
#@title Turbo códigos

import sys
import random
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import csv
from turbo import TurboEncoder, AWGN, TurboDecoder

def canal_bsc(mensaje, ruido):
    msg_bits = ((mensaje + 1)/2).astype(int)
    msg_canal_bits = np.bitwise_xor(msg_bits, ruido)
    msg_canal = (msg_canal_bits * 2) - 1
    return msg_canal

def calculate_ber(original_bits, decoded_bits):
    error_count = sum([x != y for x, y in zip(original_bits, decoded_bits)])
    ber = error_count / len(original_bits)
    return ber
def guardar_matriz_en_csv(matriz, nombre_archivo):
    # Convertir la matriz a un DataFrame de pandas
    df = pd.DataFrame(matriz.T)  # Transponer para que cada columna corresponda a una probabilidad

    # Asignar nombres a las columnas como Probabilidad 1, Probabilidad 2, etc.
    df.columns = [f'Probabilidad {i + 1}' for i in range(matriz.shape[0])]

    # Guardar el DataFrame en un archivo CSV
    df.to_csv(nombre_archivo, index=False)
def turbo_encode(message_bits):
    block_size = len(message_bits)
    interleaver = random.sample(range(0, block_size), block_size)
    encoder = TurboEncoder(interleaver)
    encoded_vector = encoder.execute(message_bits)
    return encoded_vector, interleaver

def turbo_decode(received_vector, interleaver):
    decoder = TurboDecoder(interleaver)
    decoded_vector = decoder.execute(received_vector)
    decoded_vector = [int(b > 0.0) for b in decoded_vector]
    return decoded_vector

def tanda_train(batch_size, block_len, code_rate, qs, _, progreso_tanda):
    ber_q = np.zeros((len(qs), batch_size))
    for j, q in enumerate(qs):
        data = np.random.randint(0, 2, (batch_size, block_len, 1))
        noise = np.random.binomial(1, q, size=(batch_size, (block_len*code_rate) + 6, 1)).astype(int)
        for i in range(0, batch_size):
            progreso_tanda.update(1)
            raw_data = (data[i:i+1, :, :]).squeeze().squeeze()
            batch_noise = (noise[i:i+1, :, :]).squeeze().squeeze()
            encoded_vector, interleaver = turbo_encode(raw_data)
            coded_antipodal = (encoded_vector * 2) - 1
            channel_vector = canal_bsc(coded_antipodal, batch_noise)
            decoded_vector = np.array(turbo_decode(channel_vector, interleaver))
            received_data = decoded_vector[:len(raw_data)]
            ber = calculate_ber(raw_data, received_data)
            ber_q[j][i] = ber
    return ber_q

def guardar_csv(data, filename):
    filepath = filename  # Ajusta la ruta según dónde quieras guardar el archivo
    with open(filepath, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['Q values', 'Average BER'])
        for row in data:
            writer.writerow(row)



if __name__ == "__main__":
    names = ["turbo_1_3.csv"]

    for name in names:
        block_len = 50
        batch_size = 100
        code_rate = 3
        num_block = 100000
        num_test_batch = int(num_block/batch_size)
        qs = np.linspace(0.0001, 0.15, 20)
        num_epoch_test = 100

        arrays_q_turbo = []
        print(f"Total:", num_epoch_test, "tandas")
        progreso_tanda = tqdm(total=num_epoch_test*20*batch_size, desc=f"Progreso GENERAL")
        for _ in range(num_epoch_test):
            ber_q_turbo = tanda_train(batch_size, block_len, code_rate, qs, _, progreso_tanda)
            arrays_q_turbo.append(ber_q_turbo)
            #progreso_tanda.update(1)

        qs_reshaped = qs.reshape(20, 1)

        turbo_stack = np.hstack(arrays_q_turbo)
        turbo_data = np.hstack((qs_reshaped, turbo_stack))
        guardar_matriz_en_csv(turbo_data, "turbo_1_3.csv")
        print(turbo_data.shape)

In [None]:
#@title Códigos de repetición

import random
import pandas as pd
import numpy as np
from tqdm import tqdm
import csv


def canal_bsc(mensaje, ruido):
    msg_canal_bits = np.bitwise_xor(mensaje, ruido)
    return msg_canal_bits

def calculate_ber(original_bits, decoded_bits):
    error_count = sum([x != y for x, y in zip(original_bits, decoded_bits)])
    ber = error_count / len(original_bits)
    return ber
def guardar_matriz_en_csv(matriz, nombre_archivo):
    # Convertir la matriz a un DataFrame de pandas
    df = pd.DataFrame(matriz.T)  # Transponer para que cada columna corresponda a una probabilidad

    # Asignar nombres a las columnas como Probabilidad 1, Probabilidad 2, etc.
    df.columns = [f'Probabilidad {i + 1}' for i in range(matriz.shape[0])]

    # Guardar el DataFrame en un archivo CSV
    df.to_csv(nombre_archivo, index=False)
def repetition_encoder(message, repeat_factor):
    return np.repeat(message, repeat_factor)
def repetition_decoder(received_bits, repeat_factor):
    decoded_bits = np.reshape(received_bits, (len(received_bits) // repeat_factor, repeat_factor))
    decoded_bits = np.sum(decoded_bits, axis=1) > repeat_factor // 2
    return decoded_bits.astype(int)


def tanda_train(batch_size, block_len, code_rate, qs, _, progreso_tanda):
    ber_q = np.zeros((len(qs), batch_size))
    for j, q in enumerate(qs):
        data = np.random.randint(0, 2, (batch_size, block_len, 1))
        noise = np.random.binomial(1, q, size=(batch_size, (block_len*code_rate) , 1)).astype(int)
        for i in range(0, batch_size):
            progreso_tanda.update(1)
            raw_data = (data[i:i+1, :, :]).squeeze().squeeze()
            batch_noise = (noise[i:i+1, :, :]).squeeze().squeeze()
            encoded_vector = repetition_encoder(raw_data, 3)
            channel_vector = canal_bsc(encoded_vector, batch_noise)
            decoded_vector = repetition_decoder(channel_vector, 3)
            ber = calculate_ber(raw_data, decoded_vector)
            ber_q[j][i] = ber
    return ber_q

def guardar_csv(data, filename):
    filepath = filename  # Ajusta la ruta según dónde quieras guardar el archivo
    with open(filepath, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['Q values', 'Average BER'])
        for row in data:
            writer.writerow(row)



if __name__ == "__main__":
    names = ["repeticion_1_3.csv"]

    for name in names:
        block_len = 50
        batch_size = 100
        code_rate = 3
        qs = np.linspace(0.0001, 0.15, 20)
        num_epoch_test = 100

        arrays_q_x3 = []
        print(f"Total:", num_epoch_test, "tandas")
        progreso_tanda = tqdm(total=num_epoch_test*20*batch_size, desc=f"Progreso GENERAL")
        for _ in range(num_epoch_test):
            ber_q_x3 = tanda_train(batch_size, block_len, code_rate, qs, _, progreso_tanda)
            arrays_q_x3.append(ber_q_x3)
            #progreso_tanda.update(1)

        qs_reshaped = qs.reshape(20, 1)

        x3_stack = np.hstack(arrays_q_x3)
        x3_data = np.hstack((qs_reshaped, x3_stack))
        guardar_matriz_en_csv(x3_data, "repeticion_1_3.csv")
        print(x3_data.shape)