In [1]:
import sionna
import tensorflow as tf
import numpy as np

from sionna.mimo import StreamManagement

from sionna.ofdm import ResourceGrid, ResourceGridMapper, LSChannelEstimator, LMMSEEqualizer
from sionna.ofdm import OFDMModulator, OFDMDemodulator, ZFPrecoder, RemoveNulledSubcarriers

from sionna.channel.tr38811 import Antenna, AntennaArray, CDL, DenseUrban, Urban, SubUrban
from sionna.channel.tr38811.utils import gen_single_sector_topology as gen_topology
from sionna.channel import subcarrier_frequencies, cir_to_ofdm_channel, cir_to_time_channel
from sionna.channel import ApplyOFDMChannel, ApplyTimeChannel, OFDMChannel

from sionna.fec.ldpc.encoding import LDPC5GEncoder
from sionna.fec.ldpc.decoding import LDPC5GDecoder

from sionna.mapping import Mapper, Demapper

from sionna.utils import BinarySource, ebnodb2no, sim_ber, QAMSource
from sionna.utils.metrics import compute_ber

sionna.config.xla_compat=True

2024-09-03 14:53:53.237298: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2024-09-03 14:53:53.281264: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2024-09-03 14:53:53.281971: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [5]:
class Model(tf.keras.Model):
    """Simulate OFDM MIMO transmissions over a 3GPP 38.901 model.
    """
    def __init__(self, scenario, perfect_csi, doppler_enabled, elevation_angle = 80.0):
        super().__init__()
        self._scenario = scenario
        self._perfect_csi = perfect_csi

        # Internally set parameters
        self._carrier_frequency = 2.0e9
        self._fft_size = 128
        self._subcarrier_spacing = 30e3
        self._num_ofdm_symbols = 14
        self._cyclic_prefix_length = 20
        self._pilot_ofdm_symbol_indices = [2, 11]
        self._num_bs_ant = 8
        self._num_ut = 2
        self._num_ut_ant = 1
        self._num_bits_per_symbol = 2
        self._coderate = 0.5
        self._direction = "uplink"
        self._sat_height = 600000.0
        self._elevation_angle = elevation_angle
        self._doppler_enabled = doppler_enabled

        # Create an RX-TX association matrix
        # rx_tx_association[i,j]=1 means that receiver i gets at least one stream
        # from transmitter j. Depending on the transmission direction (uplink or downlink),
        # the role of UT and BS can change.
        bs_ut_association = np.zeros([1, self._num_ut])
        bs_ut_association[0, :] = 1
        self._rx_tx_association = bs_ut_association
        self._num_tx = self._num_ut
        self._num_streams_per_tx = self._num_ut_ant


        # Setup an OFDM Resource Grid
        self._rg = ResourceGrid(num_ofdm_symbols=self._num_ofdm_symbols,
                                fft_size=self._fft_size,
                                subcarrier_spacing=self._subcarrier_spacing,
                                num_tx=self._num_tx,
                                num_streams_per_tx=self._num_streams_per_tx,
                                cyclic_prefix_length=self._cyclic_prefix_length,
                                pilot_pattern="kronecker",
                                pilot_ofdm_symbol_indices=self._pilot_ofdm_symbol_indices)

        # Setup StreamManagement
        self._sm = StreamManagement(self._rx_tx_association, self._num_streams_per_tx)

        # Configure antenna arrays
        self._ut_array = AntennaArray(
                                num_rows=1,
                                num_cols=1,
                                polarization="single",
                                polarization_type="V",
                                antenna_pattern="omni",
                                carrier_frequency=self._carrier_frequency)

        self._bs_array = AntennaArray(num_rows=1,
                                    num_cols=int(self._num_bs_ant/2),
                                    polarization="dual",
                                    polarization_type="cross",
                                    antenna_pattern="38.901",
                                    carrier_frequency=self._carrier_frequency)

        # Configure the channel model
        if scenario == "dur":
            self._channel_model = DenseUrban(carrier_frequency=self._carrier_frequency,
                                    ut_array=self._ut_array,
                                    bs_array=self._bs_array,
                                    direction=self._direction,
                                    elevation_angle=self._elevation_angle,
                                    enable_pathloss=True,
                                    enable_shadow_fading=True,
                                    doppler_enabled=self._doppler_enabled)
        elif scenario == "sur":
            self._channel_model = SubUrban(carrier_frequency=self._carrier_frequency,
                                    ut_array=self._ut_array,
                                    bs_array=self._bs_array,
                                    direction=self._direction,
                                    elevation_angle=self._elevation_angle,
                                    enable_pathloss=True,
                                    enable_shadow_fading=True,
                                    doppler_enabled=self._doppler_enabled)
        elif scenario == "urb":
            self._channel_model = Urban(carrier_frequency=self._carrier_frequency,
                                    ut_array=self._ut_array,
                                    bs_array=self._bs_array,
                                    direction=self._direction,
                                    elevation_angle=self._elevation_angle,
                                    enable_pathloss=True,
                                    enable_shadow_fading=True,
                                    doppler_enabled=self._doppler_enabled)

        # Instantiate other building blocks
        self._binary_source = BinarySource()
        self._qam_source = QAMSource(self._num_bits_per_symbol)

        self._n = int(self._rg.num_data_symbols*self._num_bits_per_symbol) # Number of coded bits
        self._k = int(self._n*self._coderate)                              # Number of information bits
        self._encoder = LDPC5GEncoder(self._k, self._n)
        self._decoder = LDPC5GDecoder(self._encoder)
        self._mapper = Mapper("qam", self._num_bits_per_symbol)
        self._rg_mapper = ResourceGridMapper(self._rg)

        self._ofdm_channel = OFDMChannel(self._channel_model, self._rg, add_awgn=True,
                                        normalize_channel=True, return_channel=True)

        self._remove_nulled_subcarriers = RemoveNulledSubcarriers(self._rg)
        self._ls_est = LSChannelEstimator(self._rg, interpolation_type="nn")
        self._lmmse_equ = LMMSEEqualizer(self._rg, self._sm)
        self._demapper = Demapper("app", "qam", self._num_bits_per_symbol)



    def new_topology(self, batch_size):
        topology = gen_topology(batch_size, self._num_ut, self._scenario, self._sat_height)
        self._channel_model.set_topology(*topology)
        return topology
    
    #@tf.function(jit_compile=True)
    def call(self, batch_size, ebno_db):

        topology = self.new_topology(batch_size)
        self._channel_model.set_topology(*topology)
        no = ebnodb2no(ebno_db, self._num_bits_per_symbol, self._coderate, self._rg)
        b = self._binary_source([batch_size, self._num_tx, self._num_streams_per_tx, self._k])
        c = self._encoder(b)
        x = self._mapper(c)
        x_rg = self._rg_mapper(x)



        y, h = self._ofdm_channel([x_rg, no])
        if self._perfect_csi:
            h_hat = self._remove_nulled_subcarriers(h)
            err_var = 0.0
        else:
            h_hat = self._remove_nulled_subcarriers(h)
            err_var = 0.0
            #h_hat, err_var = self._ls_est ([y, no])
        x_hat, no_eff = self._lmmse_equ([y, h_hat, err_var, no])
        llr = self._demapper([x_hat, no_eff])
        b_hat = self._decoder(llr)
        return b, b_hat

In [75]:
scenario = "dur"
perfect_csi = False
doppler_enabled = False

model = Model(scenario=scenario,
                perfect_csi=perfect_csi,
                doppler_enabled=doppler_enabled)

In [3]:
# Load the TensorBoard notebook extension
%load_ext tensorboard

import datetime




In [6]:
# Set TensorFlow to log detailed error messages
#tf.debugging.set_log_device_placement(True)
#tf.debugging.enable_check_numerics()  # This will raise errors when invalid values are encountered

scenario = "dur"
perfect_csi = False
doppler_enabled = False
batch_size = 16
ebno = 10.0


model = Model(scenario=scenario,
                perfect_csi=perfect_csi,
                doppler_enabled=doppler_enabled)



# Set up TensorBoard logging directory
log_dir = "logs/fit/model_architecture"
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

# To log the model architecture, you can use the following:
model(batch_size,ebno)
#model.build(input_shape=[(1,),(1,)])  # Specify the input shape as needed
model.summary()  # This will print the model summary

with tf.summary.create_file_writer(log_dir).as_default():
    tf.summary.graph(tf.get_default_graph())

# Save the model architecture to TensorBoard
with tf.summary.create_file_writer(log_dir).as_default():
    tf.summary.text("Model Architecture", model.summary())



Model: "model_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 binary_source_4 (BinarySou  multiple                  0         
 rce)                                                            
                                                                 
 qam_source_3 (QAMSource)    multiple                  0 (unused)
                                                                 
 ldpc5g_encoder_1 (LDPC5GEn  multiple                  0         
 coder)                                                          
                                                                 
 ldpc5g_decoder_1 (LDPC5GDe  multiple                  0         
 coder)                                                          
                                                                 
 mapper_5 (Mapper)           multiple                  0         
                                                           

ValueError: Attempt to convert a value (None) with an unsupported type (<class 'NoneType'>) to a Tensor.

In [None]:
SIMS = {
    "ebno_db" : list(np.arange(-5, 16, 1.0)),
    "scenario" : ["dur", "urb", "sur"],
    "perfect_csi" : [True, False],
    "ber" : [],
    "bler" : [],
    "duration" : None,
    "doppler_enabled" : [True, False]
}

ber, bler = sim_ber(model,
                    SIMS["ebno_db"],
                    batch_size=128,
                    max_mc_iter=1000,
                    num_target_block_errors=1000)