Skip to content

Commit

Permalink
Adding necessary transforms and visualizations.
Browse files Browse the repository at this point in the history
  • Loading branch information
gvanhoy authored and Garrett Vanhoy committed Nov 8, 2022
1 parent 96a4bf1 commit 94dbc9c
Show file tree
Hide file tree
Showing 24 changed files with 4,184 additions and 98 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__pycache__/
*.pyc
*.mdb
2 changes: 2 additions & 0 deletions torchsig/transforms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from . import system_impairment
from . import wireless_channel
from . import signal_processing
from . import spectrogram_transforms
from . import deep_learning_techniques
from . import target_transforms
from .functional import *
Expand All @@ -10,5 +11,6 @@
from torchsig.transforms.wireless_channel import *
from torchsig.transforms.expert_feature import *
from torchsig.transforms.signal_processing import *
from torchsig.transforms.spectrogram_transforms import *
from torchsig.transforms.deep_learning_techniques import *
from torchsig.transforms.target_transforms import *
2 changes: 1 addition & 1 deletion torchsig/transforms/deep_learning_techniques/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .dlt import *
from .dlt_functional import *
from .functional import *
455 changes: 447 additions & 8 deletions torchsig/transforms/deep_learning_techniques/dlt.py

Large diffs are not rendered by default.

102 changes: 102 additions & 0 deletions torchsig/transforms/deep_learning_techniques/functional.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import numpy as np


def cut_out(
tensor: np.ndarray,
cut_start: float,
cut_dur: float,
cut_type: str,
) -> np.ndarray:
"""Performs the CutOut using the input parameters
Args:
tensor: (:class:`numpy.ndarray`):
(batch_size, vector_length, ...)-sized tensor.
cut_start: (:obj:`float`):
Start of cut region in range [0.0,1.0)
cut_dur: (:obj:`float`):
Duration of cut region in range (0.0,1.0]
cut_type: (:obj:`str`):
String specifying type of data to fill in cut region with
Returns:
transformed (:class:`numpy.ndarray`):
Tensor that has undergone cut out
"""
num_iq_samples = tensor.shape[0]
cut_start = int(cut_start * num_iq_samples)

# Create cut mask
cut_mask_length = int(num_iq_samples * cut_dur)
if cut_mask_length + cut_start > num_iq_samples:
cut_mask_length = num_iq_samples - cut_start

if cut_type == "zeros":
cut_mask = np.zeros(cut_mask_length, dtype=np.complex64)
elif cut_type == "ones":
cut_mask = np.ones(cut_mask_length) + 1j*np.ones(cut_mask_length)
elif cut_type == "low_noise":
real_noise = np.random.randn(cut_mask_length)
imag_noise = np.random.randn(cut_mask_length)
noise_power_db = -100
cut_mask = (10.0**(noise_power_db/20.0))*(real_noise + 1j*imag_noise)/np.sqrt(2)
elif cut_type == "avg_noise":
real_noise = np.random.randn(cut_mask_length)
imag_noise = np.random.randn(cut_mask_length)
avg_power = np.mean(np.abs(tensor)**2)
cut_mask = avg_power*(real_noise + 1j*imag_noise)/np.sqrt(2)
elif cut_type == "high_noise":
real_noise = np.random.randn(cut_mask_length)
imag_noise = np.random.randn(cut_mask_length)
noise_power_db = 40
cut_mask = (10.0**(noise_power_db/20.0))*(real_noise + 1j*imag_noise)/np.sqrt(2)
else:
raise ValueError("cut_type must be: zeros, ones, low_noise, avg_noise, or high_noise. Found: {}".format(cut_type))

# Insert cut mask into tensor
tensor[cut_start:cut_start+cut_mask_length] = cut_mask

return tensor


def patch_shuffle(
tensor: np.ndarray,
patch_size: int,
shuffle_ratio: float,
) -> np.ndarray:
"""Apply shuffling of patches specified by `num_patches`
Args:
tensor: (:class:`numpy.ndarray`):
(batch_size, vector_length, ...)-sized tensor.
patch_size (:obj:`int`):
Size of each patch to shuffle
shuffle_ratio (:obj:`float`):
Ratio of patches to shuffle
Returns:
transformed (:class:`numpy.ndarray`):
Tensor that has undergone patch shuffling
"""
num_patches = int(tensor.shape[0] / patch_size)
num_to_shuffle = int(num_patches * shuffle_ratio)
patches_to_shuffle = np.random.choice(
num_patches,
replace=False,
size=num_to_shuffle,
)

for patch_idx in patches_to_shuffle:
patch_start = int(patch_idx * patch_size)
patch = tensor[patch_start:patch_start+patch_size]
np.random.shuffle(patch)
tensor[patch_start:patch_start+patch_size] = patch

return tensor
2 changes: 1 addition & 1 deletion torchsig/transforms/expert_feature/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .eft import *
from .eft_functional import *
from .functional import *
8 changes: 4 additions & 4 deletions torchsig/transforms/expert_feature/eft.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Callable, Tuple, Any

from torchsig.utils.types import SignalData
from torchsig.transforms.expert_feature import eft_functional as F
from torchsig.transforms.expert_feature import functional as F
from torchsig.transforms.transforms import SignalTransform


Expand Down Expand Up @@ -170,7 +170,7 @@ def __call__(self, data: Any) -> Any:


class Spectrogram(SignalTransform):
""" Calculates power spectral density over time
"""Calculates power spectral density over time
Args:
nperseg (:obj:`int`):
Expand Down Expand Up @@ -224,14 +224,14 @@ def __init__(

def __call__(self, data: Any) -> Any:
if isinstance(data, SignalData):
data.iq_data = F.spectrogram(data.iq_data)
data.iq_data = F.spectrogram(data.iq_data, self.nperseg, self.noverlap, self.nfft, self.window_fcn, self.mode)
if self.mode == "complex":
new_tensor = np.zeros((2, data.iq_data.shape[0], data.iq_data.shape[1]), dtype=np.float32)
new_tensor[0, :, :] = np.real(data.iq_data).astype(np.float32)
new_tensor[1, :, :] = np.imag(data.iq_data).astype(np.float32)
data.iq_data = new_tensor
else:
data = F.spectrogram(data)
data = F.spectrogram(data, self.nperseg, self.noverlap, self.nfft, self.window_fcn, self.mode)
if self.mode == "complex":
new_tensor = np.zeros((2, data.shape[0], data.shape[1]), dtype=np.float32)
new_tensor[0, :, :] = np.real(data).astype(np.float32)
Expand Down
201 changes: 201 additions & 0 deletions torchsig/transforms/expert_feature/functional.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import pywt
import numpy as np
from scipy import signal
from typing import Callable


def interleave_complex(tensor: np.ndarray) -> np.ndarray:
"""Converts complex vectors to real interleaved IQ vector
Args:
tensor (:class:`numpy.ndarray`):
(batch_size, vector_length, ...)-sized tensor.
Returns:
transformed (:class:`numpy.ndarray`):
Interleaved vectors.
"""
new_tensor = np.empty((tensor.shape[0]*2,))
new_tensor[::2] = np.real(tensor)
new_tensor[1::2] = np.imag(tensor)
return new_tensor


def complex_to_2d(tensor: np.ndarray) -> np.ndarray:
"""Converts complex IQ to two channels representing real and imaginary
Args:
tensor (:class:`numpy.ndarray`):
(batch_size, vector_length, ...)-sized tensor.
Returns:
transformed (:class:`numpy.ndarray`):
Expanded vectors
"""

new_tensor = np.zeros((2, tensor.shape[0]), dtype=np.float64)
new_tensor[0] = np.real(tensor).astype(np.float64)
new_tensor[1] = np.imag(tensor).astype(np.float64)
return new_tensor


def real(tensor: np.ndarray) -> np.ndarray:
"""Converts complex IQ to a real-only vector
Args:
tensor (:class:`numpy.ndarray`):
(batch_size, vector_length, ...)-sized tensor.
Returns:
transformed (:class:`numpy.ndarray`):
real(tensor)
"""
return np.real(tensor)


def imag(tensor: np.ndarray) -> np.ndarray:
"""Converts complex IQ to a imaginary-only vector
Args:
tensor (:class:`numpy.ndarray`):
(batch_size, vector_length, ...)-sized tensor.
Returns:
transformed (:class:`numpy.ndarray`):
imag(tensor)
"""
return np.imag(tensor)


def complex_magnitude(tensor: np.ndarray) -> np.ndarray:
"""Converts complex IQ to a complex magnitude vector
Args:
tensor (:class:`numpy.ndarray`):
(batch_size, vector_length, ...)-sized tensor.
Returns:
transformed (:class:`numpy.ndarray`):
abs(tensor)
"""
return np.abs(tensor)


def wrapped_phase(tensor: np.ndarray) -> np.ndarray:
"""Converts complex IQ to a wrapped-phase vector
Args:
tensor (:class:`numpy.ndarray`):
(batch_size, vector_length, ...)-sized tensor.
Returns:
transformed (:class:`numpy.ndarray`):
angle(tensor)
"""
return np.angle(tensor)


def discrete_fourier_transform(tensor: np.ndarray) -> np.ndarray:
"""Computes DFT of complex IQ vector
Args:
tensor (:class:`numpy.ndarray`):
(batch_size, vector_length, ...)-sized tensor.
Returns:
transformed (:class:`numpy.ndarray`):
fft(tensor). normalization is 1/sqrt(n)
"""
return np.fft.fft(tensor, norm="ortho")


def spectrogram(
tensor: np.ndarray,
nperseg: int,
noverlap: int,
nfft: int,
window_fcn: Callable[[int], np.ndarray],
mode: str,
) -> np.ndarray:
"""Computes spectrogram of complex IQ vector
Args:
tensor (:class:`numpy.ndarray`):
(batch_size, vector_length, ...)-sized tensor.
nperseg (:obj:`int`):
Length of each segment. If window is str or tuple, is set to 256,
and if window is array_like, is set to the length of the window.
noverlap (:obj:`int`):
Number of points to overlap between segments.
If None, noverlap = nperseg // 8.
nfft (:obj:`int`):
Length of the FFT used, if a zero padded FFT is desired.
If None, the FFT length is nperseg.
window_fcn (:obj:`Callable`):
Function generating the window for each FFT
mode (:obj:`str`):
Mode of the spectrogram to be computed.
Returns:
transformed (:class:`numpy.ndarray`):
Spectrogram of tensor along time dimension
"""
_, _, spectrograms = signal.spectrogram(
tensor,
nperseg=nperseg,
noverlap=noverlap,
nfft=nfft,
window=window_fcn(nperseg),
return_onesided=False,
mode=mode,
axis=0
)
return np.fft.fftshift(spectrograms, axes=0)


def continuous_wavelet_transform(
tensor: np.ndarray,
wavelet: str,
nscales: int,
sample_rate: float
) -> np.ndarray:
"""Computes the continuous wavelet transform resulting in a Scalogram of the complex IQ vector
Args:
tensor (:class:`numpy.ndarray`):
(batch_size, vector_length, ...)-sized tensor.
wavelet (:obj:`str`):
Name of the mother wavelet.
If None, wavename = 'mexh'.
nscales (:obj:`int`):
Number of scales to use in the Scalogram.
If None, nscales = 33.
sample_rate (:obj:`float`):
Sample rate of the signal.
If None, fs = 1.0.
Returns:
transformed (:class:`numpy.ndarray`):
Scalogram of tensor along time dimension
"""
scales = np.arange(1, nscales)
cwtmatr, _ = pywt.cwt(
tensor,
scales=scales,
wavelet=wavelet,
sampling_period=1.0/sample_rate
)

# if the dtype is complex then return the magnitude
if np.iscomplexobj(cwtmatr):
cwtmatr = abs(cwtmatr)

return cwtmatr
Loading

0 comments on commit 94dbc9c

Please sign in to comment.