In [2]:
!pip install XGBOOST


Collecting XGBOOST
  Downloading xgboost-2.1.1-py3-none-win_amd64.whl.metadata (2.1 kB)
Downloading xgboost-2.1.1-py3-none-win_amd64.whl (124.9 MB)
   ---------------------------------------- 0.0/124.9 MB ? eta -:--:--
   ---------------------------------------- 0.0/124.9 MB ? eta -:--:--
   ---------------------------------------- 0.0/124.9 MB ? eta -:--:--
   ---------------------------------------- 0.1/124.9 MB 1.1 MB/s eta 0:01:55
   ---------------------------------------- 0.1/124.9 MB 1.1 MB/s eta 0:01:55
   ---------------------------------------- 0.2/124.9 MB 803.1 kB/s eta 0:02:36
   ---------------------------------------- 0.2/124.9 MB 803.1 kB/s eta 0:02:36
   ---------------------------------------- 0.2/124.9 MB 793.0 kB/s eta 0:02:38
   ---------------------------------------- 0.2/124.9 MB 793.0 kB/s eta 0:02:38
   ---------------------------------------- 0.4/124.9 MB 855.4 kB/s eta 0:02:26
   ---------------------------------------- 0.4/124.9 MB 855.4 kB/s eta 0:02:26
   

In [3]:
import pandas as pd
import numpy as np 
import matplotlib.pyplot as plt

from xgboost import XGBClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report, precision_score, recall_score, f1_score

In [5]:
class DVBS2X:
    def __init__(self, num_symbols, samples_per_symbol, carrier_freq):
        self.num_symbols = num_symbols
        self.samples_per_symbol = samples_per_symbol
        self.carrier_freq = carrier_freq

    def generate_bpsk(self):
        bits = np.random.randint(0, 2, self.num_symbols)
        symbols = 2 * bits - 1  # Map 0 to -1 and 1 to 1
        return self.modulate(symbols), symbols, bits

    def generate_qpsk(self):
        bits = np.random.randint(0, 2, 2 * self.num_symbols)
        qpsk_map = {
            (0, 0): 1 + 1j, (0, 1): -1 + 1j,
            (1, 1): -1 - 1j, (1, 0): 1 - 1j
        }
        symbols = np.array([qpsk_map[tuple(bits[i:i+2])] for i in range(0, len(bits), 2)])
        return self.modulate(symbols), symbols, bits

    # Add methods for APSK modulation (omitted for brevity, you already have them)
    def generate_8apsk(self):
        return self._generate_apsk(8, 3, [1, 2.6], [1, 7])

    def generate_16apsk(self):
        return self._generate_apsk(16, 4, [1, 2.6], [4, 12])

    def generate_32apsk(self):
        return self._generate_apsk(32, 5, [1, 2.6, 4.15], [4, 12, 16])

    def generate_64apsk(self):
        return self._generate_apsk(64, 6, [1, 1.6, 2.4, 3.5], [4, 12, 20, 28])

    def generate_128apsk(self):
        return self._generate_apsk(128, 7, [1, 1.5, 2.2, 3.0, 3.8], [4, 12, 20, 40, 52])

    def generate_256apsk(self):
        return self._generate_apsk(256, 8, [1, 1.4, 1.9, 2.5, 3.2, 4.0], [4, 12, 20, 28, 60, 132])
    
    def _generate_apsk(self, m, bits_per_symbol, radii, points_per_ring):
        bits = np.random.randint(0, 2, bits_per_symbol * self.num_symbols)

        constellation = []
        for r, n in zip(radii, points_per_ring):
            for k in range(n):
                angle = 2 * np.pi * k / n
                constellation.append(r * np.exp(1j * angle))
        constellation = np.array(constellation)

        constellation /= np.sqrt(np.mean(np.abs(constellation)**2))

        symbols = np.zeros(self.num_symbols, dtype=complex)
        for i in range(self.num_symbols):
            bit_chunk = bits[i*bits_per_symbol:(i+1)*bits_per_symbol]
            symbol_index = int(''.join(map(str, bit_chunk)), 2)
            symbols[i] = constellation[symbol_index]

        return self.modulate(symbols), symbols, bits

    def modulate(self, symbols):
        t = np.arange(self.num_symbols * self.samples_per_symbol) / (self.carrier_freq * self.samples_per_symbol)
        upsampled = np.repeat(symbols, self.samples_per_symbol)
        carrier = np.exp(2j * np.pi * self.carrier_freq * t)
        signal = np.real(upsampled * carrier)
        return t, signal

    def add_noise(self, signal, snr_db):
        """ Add Gaussian noise to the signal based on the desired SNR in dB. """
        signal_power = np.mean(np.abs(signal)**2)
        snr_linear = 10 ** (snr_db / 10.0)
        noise_power = signal_power / snr_linear
        noise = np.sqrt(noise_power) * np.random.normal(size=signal.shape)
        noisy_signal = signal + noise
        return noisy_signal, noise

    def calculate_snr(self, signal, noise):
        """ Calculate the Signal-to-Noise Ratio (SNR) based on the signal and noise. """
        signal_power = np.mean(np.abs(signal)**2)
        noise_power = np.mean(np.abs(noise)**2)
        snr_linear = signal_power / noise_power
        snr_db = 10 * np.log10(snr_linear)
        return snr_db

    def plot_signal(self, t, signal, modulation_type, num_symbols_to_plot=10):
        """ Plot the signal for a limited number of symbols. """
        samples_to_plot = num_symbols_to_plot * self.samples_per_symbol
        plt.figure(figsize=(12, 6))
        plt.plot(t[:samples_to_plot], signal[:samples_to_plot])
        plt.title(f'{modulation_type} Modulated Signal with Noise (Zoomed-in on {num_symbols_to_plot} symbols)')
        plt.xlabel('Time')
        plt.ylabel('Amplitude')
        plt.grid(True)
        plt.show()

    def plot_constellation(self, symbols, modulation_type):
        """ Plot the constellation diagram. """
        plt.figure(figsize=(8, 8))
        plt.scatter(symbols.real, symbols.imag, c='r', alpha=0.5)
        plt.title(f'{modulation_type} Constellation Diagram')
        plt.xlabel('In-phase')
        plt.ylabel('Quadrature')
        plt.grid(True)
        plt.axis('equal')
        plt.show()

In [6]:
from scipy.signal import correlate
from scipy.fft import fftshift, fft

def cyclic_autocorrelation(signal, lag): 
    return np.mean(signal[:-lag] * np.conjugate(signal[lag:]))

def compute_scd(signal, freqs, alpha):
    """
    Computes the Spectral Correlation Density (SCD) at a given cyclic frequency (alpha).
    """
    N = len(signal)
    scd = np.zeros(len(freqs), dtype=complex)
    
    for i, f in enumerate(freqs):
        # Shift the signal in frequency by `f`
        shifted_signal = signal * np.exp(-2j * np.pi * f * np.arange(N))
        
        # Compute cyclic autocorrelation
        cyclic_autocorr = np.mean(shifted_signal[:-1] * np.conjugate(shifted_signal[1:]))
        
        # Apply the exponential factor for the given cyclic frequency alpha
        scd[i] = cyclic_autocorr * np.exp(2j * np.pi * alpha)
    return scd

In [9]:
import random
# Modulation types and noise levels
modulation_types = ['bpsk', 'qpsk', '8apsk', '16apsk', '32apsk', '64apsk', '128apsk', '256apsk']
noise_levels = [5, 10, 15, 20, 25, 30, 35]
num_symbol_choices = [10, 25, 50, 100]

# Create a list to store the features for each symbol
features_list = []

# Loop to randomly generate data
for _ in range(50):  # Generate data for 10 different configurations
    # Randomly select modulation type, noise level, and number of symbols
    modulation = random.choice(modulation_types)
    noise_level = random.choice(noise_levels)
    num_symbols = random.choice(num_symbol_choices)

    # Initialize the DVBS2X class with the selected number of symbols
    dvbs2x = DVBS2X(num_symbols=num_symbols, samples_per_symbol=8, carrier_freq=1e6)

    # Select the appropriate modulation function based on the modulation type
    modulation_func = {
        'bpsk': dvbs2x.generate_bpsk,
        'qpsk': dvbs2x.generate_qpsk,
        '8apsk': dvbs2x.generate_8apsk,
        '16apsk': dvbs2x.generate_16apsk,
        '32apsk': dvbs2x.generate_32apsk,
        '64apsk': dvbs2x.generate_64apsk,
        '128apsk': dvbs2x.generate_128apsk,
        '256apsk': dvbs2x.generate_256apsk
    }[modulation]

    (t, signal), symbols, bits = modulation_func()

    noisy_signal, noise = dvbs2x.add_noise(signal, noise_level)
    calculated_snr = dvbs2x.calculate_snr(signal, noise)
    """
    features = {
        'modulation': modulation,
        'num_symbols': num_symbols,
        'noise_level': noise_level,
        'calculated_snr': calculated_snr,
        'first_10_symbols': symbols[:10]
    }
    features_list.append(features)

    # Print the details
    print(f"Modulation: {modulation}, Num Symbols: {num_symbols}, Noise Level: {noise_level}, Calculated SNR: {calculated_snr:.2f} dB")
    print(f"First 10 Symbols: {symbols[:10]}")

    # Optionally plot the noisy signal
    dvbs2x.plot_signal(t, noisy_signal, modulation)
    # Optionally plot the constellation diagram
    # dvbs2x.plot_constellation(symbols, modulation)
    """
    
    lags = range(1, 6)
    alpha = 0.1
    freqs = np.fft.fftfreq(len(signal))

    caf_values = [cyclic_autocorrelation(noisy_signal, lag) for lag in lags]
    scd_values = compute_scd(noisy_signal, freqs, alpha)
    
    for i, symbol in enumerate(symbols):
        magnitude = np.abs(symbol)
        phase = np.angle(symbol)
        real = np.real(symbol)
        imag = np.imag(symbol)
        
        # Create a label based on the bits
        # label = ''.join(map(str, bits[i:i + dvbs2x.num_symbols // len(symbols)]))

        # Create a dictionary to store the features for this symbol
        features = {
            'modulation_type': modulation,
            'symbol': symbol,
            'magnitude': magnitude,
            'phase': phase,
            'caf_1': np.abs(caf_values[0]),  # Example for lag 1
            'caf_2': np.abs(caf_values[1]),  # Example for lag 2
            'caf_3': np.abs(caf_values[2]),  # Example for lag 3
            'caf_4': np.abs(caf_values[3]),  # Example for lag 4
            'caf_5': np.abs(caf_values[4]),  # Example for lag 5
            'scd_mean': np.mean(np.abs(scd_values)),  # Mean of SCD values
            'scd_max': np.max(np.abs(scd_values)),  # Maximum of SCD values
        }

        # Append the features to the list
        features_list.append(features)
random.shuffle(features_list)

# Create a pandas DataFrame from the list of features
df = pd.DataFrame(features_list)
print(df.head())

# Save the DataFrame to a CSV file
df.to_csv('symbol_features_random_noise_cyclostationary.csv', index=False)


  modulation_type              symbol  magnitude     phase     caf_1  \
0         128apsk -0.780197-0.566846j   0.964377 -2.513274  0.329639   
1          16apsk -0.563621-0.976221j   1.127243 -2.094395  0.328434   
2          32apsk -0.297861+0.000000j   0.297861  3.141593  0.333821   
3          32apsk  0.774439+0.000000j   0.774439  0.000000  0.304428   
4            qpsk  1.000000-1.000000j   1.414214 -0.785398  0.623898   

      caf_2     caf_3     caf_4     caf_5  scd_mean   scd_max  
0  0.043416  0.338222  0.247368  0.020392  0.329639  0.329639  
1  0.019397  0.254815  0.268422  0.114739  0.328434  0.328434  
2  0.013553  0.246627  0.276842  0.140586  0.333821  0.333821  
3  0.000498  0.205915  0.220828  0.106963  0.304428  0.304428  
4  0.028248  0.414307  0.532897  0.392044  0.623898  0.623898  


In [10]:
df = pd.read_csv("symbol_features_random_noise_cyclostationary.csv")
df.tail()


Unnamed: 0,modulation_type,symbol,magnitude,phase,caf_1,caf_2,caf_3,caf_4,caf_5,scd_mean,scd_max
2630,64apsk,(-0.5083135585988359-0.6996335919858019j),0.864795,-2.199115,0.307629,0.000217,0.220947,0.250278,0.132955,0.307629,0.307629
2631,bpsk,(1+0j),1.0,0.0,0.281663,0.055934,0.290217,0.301429,0.136693,0.281663,0.281663
2632,8apsk,(0.6596054463884006-0.8271190682773576j),1.057925,-0.897598,0.349678,0.037277,0.206413,0.273095,0.194056,0.349678,0.349678
2633,16apsk,(2.654758621716181e-17+0.43355498476205995j),0.433555,1.570796,0.328434,0.019397,0.254815,0.268422,0.114739,0.328434,0.328434
2634,bpsk,(1+0j),1.0,0.0,0.256847,0.064486,0.267808,0.236405,0.070778,0.256847,0.256847


In [15]:
label_enc = LabelEncoder()
df['modulation'] = label_enc.fit_transform(df['modulation_type'])

X = df[['magnitude', 'phase', 'caf_1', 'caf_2', 'caf_3', 'caf_4', 'caf_5', 'scd_mean', 'scd_max']]
y = df['modulation']
np.random.seed(42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
rf_model = RandomForestClassifier(n_estimators=100, random_state=42)

rf_model.fit(X_train, y_train)
y_pred = rf_model.predict(X_test)

In [16]:
rf_model.score(X_test,y_test)

1.0

In [17]:
rf_accuracy = accuracy_score(y_test, y_pred)
print("\nRandom Forest Model Accuracy: {:.2f}%".format(rf_accuracy * 100))
print("\nClassification Report for Random Forest:")
print(classification_report(y_test, y_pred))


Random Forest Model Accuracy: 100.00%

Classification Report for Random Forest:
              precision    recall  f1-score   support

           0       1.00      1.00      1.00        50
           1       1.00      1.00      1.00        41
           2       1.00      1.00      1.00       139
           3       1.00      1.00      1.00        51
           4       1.00      1.00      1.00        76
           5       1.00      1.00      1.00        78
           6       1.00      1.00      1.00        54
           7       1.00      1.00      1.00        38

    accuracy                           1.00       527
   macro avg       1.00      1.00      1.00       527
weighted avg       1.00      1.00      1.00       527



In [26]:
import time
import pickle

def weighted_score(model, X_test, y_test, weight_for_accuracy=0.7, weight_for_latency = 0.2, weight_for_model_size=0.1,):
    try:
        accuracy = model.score(X_test, y_test)
        start_time = time.time()
        _ = model.predict(X_test)
        end_time = time.time()
        latency = (end_time - start_time) / len(X_test)

        model_pickle = pickle.dumps(rf_model)
        model_size = len(model_pickle)

        max_latency = 0.01
        max_model_size = 1e6

        normalized_latency = latency/max_latency
        normalized_size = model_size/max_model_size

        score = (weight_for_accuracy * accuracy) - (weight_for_latency * normalized_latency) - (weight_for_model_size * normalized_size)
        
        result_string = (
            f"Accuracy: {accuracy:.3f}\n"
            f"Normalized Latency: {normalized_latency:.3f} and Latency: {latency}\n"
            f"Normalized Size: {normalized_size:.3f} and Size: {model_size} in Bytes\n"
            f"Final Power-Efficient Score: {score:.3f}")

        return result_string

    
    except Exception as e:
        print(f"Error calculating power-efficient score: {e}")
        return None

In [27]:
print(weighted_score(rf_model, X_test, y_test))

Accuracy: 1.000
Normalized Latency: 0.003 and Latency: 2.5245435068298086e-05
Normalized Size: 0.767 and Size: 767226 in Bytes
Final Power-Efficient Score: 0.623
