In [915]:
import plotly.graph_objects as go
import numpy as np
from math import ceil

In [916]:
def draw(indices, amplitudes):

    fig_cont = go.Figure()
    fig_cont.add_trace(
        go.Scatter(x=indices, y=amplitudes, mode="lines", name="Continuous Signal")
    )

    fig_cont.update_layout(
        title="Continuous Signal",
        xaxis_title="Index",
        yaxis_title="Amplitude",
        width=1000,
        height=500,
    )

    fig_cont.show()  # For notebooks
    # st.plotly_chart(fig_cont)  # For Streamlit

    # Discrete Signal
    fig_disc = go.Figure()

    fig_disc.add_trace(
        go.Scatter(x=indices, y=amplitudes, mode="markers", name="Discrete Signal")
    )

    # Set titles and labels
    fig_disc.update_layout(
        title="Discrete Signal",
        xaxis_title="Index",
        yaxis_title="Amplitude",
        width=1000,
        height=500,
    )

    # fig_disc.show()  # For notebooks
    # st.plotly_chart(fig_disc)  # For Streamlit

In [917]:
def readSignal(fileName):
    with open(fileName, "r") as f:
        timeFlag = f.readline()
        periodicFlag = f.readline()

        nOfSamples = int(f.readline())
        indices = []
        amplitudes = []
        for _ in range(nOfSamples):
            line = f.readline().strip().split(" ")
            indices.append(int(line[0]))
            amplitudes.append(float(line[1]))
    return indices, amplitudes

In [918]:
def generate_signal(sineFlag, A, f, fs, theta):
    signal = []
    indices = np.linspace(0, fs - 1, fs)

    if sineFlag:  # if sineFlag is True, then it's a sine wave
        for i in range(fs):
            signal.append(A * np.sin(2 * np.pi * f / fs * i + theta))
    else:
        for i in range(fs):
            signal.append(A * np.cos(2 * np.pi * f / fs * i + theta))
    return indices, signal

In [919]:
def addSignals(firstSignalFile, secondSignalFile):
    # Read the files
    indices1, amplitudes1 = readSignal(firstSignalFile)
    indices2, amplitudes2 = readSignal(secondSignalFile)
    if len(indices1) != len(indices2):
        print("The two files must be the same size")
        return
    addedAmplitudes = list(x + y for x, y in zip(amplitudes1, amplitudes2))
    draw(indices1, addedAmplitudes)
    return indices1, addedAmplitudes

In [920]:
addSignals(
    "signals/task2/input signals/Signal1.txt", "signals/task2/input signals/Signal2.txt"
)[1]

[1000.0,
 1002.0,
 1004.0,
 1006.0,
 1008.0,
 1010.0,
 1012.0,
 1014.0,
 1016.0,
 1018.0,
 1020.0,
 1022.0,
 1024.0,
 1026.0,
 1028.0,
 1030.0,
 1032.0,
 1034.0,
 1036.0,
 1038.0,
 1040.0,
 1042.0,
 1044.0,
 1046.0,
 1048.0,
 1050.0,
 1052.0,
 1054.0,
 1056.0,
 1058.0,
 1060.0,
 1062.0,
 1064.0,
 1066.0,
 1068.0,
 1070.0,
 1072.0,
 1074.0,
 1076.0,
 1078.0,
 1080.0,
 1082.0,
 1084.0,
 1086.0,
 1088.0,
 1090.0,
 1092.0,
 1094.0,
 1096.0,
 1098.0,
 1100.0,
 1102.0,
 1104.0,
 1106.0,
 1108.0,
 1110.0,
 1112.0,
 1114.0,
 1116.0,
 1118.0,
 1120.0,
 1122.0,
 1124.0,
 1126.0,
 1128.0,
 1130.0,
 1132.0,
 1134.0,
 1136.0,
 1138.0,
 1140.0,
 1142.0,
 1144.0,
 1146.0,
 1148.0,
 1150.0,
 1152.0,
 1154.0,
 1156.0,
 1158.0,
 1160.0,
 1162.0,
 1164.0,
 1166.0,
 1168.0,
 1170.0,
 1172.0,
 1174.0,
 1176.0,
 1178.0,
 1180.0,
 1182.0,
 1184.0,
 1186.0,
 1188.0,
 1190.0,
 1192.0,
 1194.0,
 1196.0,
 1198.0,
 1200.0,
 1202.0,
 1204.0,
 1206.0,
 1208.0,
 1210.0,
 1212.0,
 1214.0,
 1216.0,
 1218.0,
 1220.0,
 

In [921]:
def subtractSignals(firstSignalFile, secondSignalFile):
    # Read the files
    indices1, amplitudes1 = readSignal(firstSignalFile)
    indices2, amplitudes2 = readSignal(secondSignalFile)
    if len(indices1) != len(indices2):
        print("The two files must be the same size")
        return
    subtractedAmplitudes = list(x - y for x, y in zip(amplitudes1, amplitudes2))
    draw(indices1, subtractedAmplitudes)
    return indices1, subtractedAmplitudes

In [922]:
subtractSignals(
    "signals/task2/input signals/Signal2.txt", "signals/task2/input signals/Signal1.txt"
)[1]

[1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 1000.0,
 

In [923]:
# Normalize the signal from 0 to 1
def normalizeSignal0(signal):
    max_value = max(np.abs(signal))
    normalized_signal = list(x / max_value for x in signal)
    return normalized_signal

In [924]:
# Normalize the signal from -1 to 1
def normalize_signal1(signal):
    min_val = min(signal)
    max_val = max(signal)
    signal = list((i - min_val) / (max_val - min_val) for i in signal)
    return signal

In [925]:
def accumulate(signal):
    return [
        signal[i] + accumulate(signal[:i])[-1] if i > 0 else 0
        for i in range(len(signal))
    ]

In [926]:
def accumulate(signal):
    for i in range(len(signal) - 1):
        signal[i + 1] += signal[i]
    return signal

In [927]:
accumulate([0, 1, 2, 3, 4, 5])

[0, 1, 3, 6, 10, 15]

In [928]:
def quantize(noOfLevels, samples):
    minValue = np.min(samples)
    maxValue = np.max(samples)
    delta = (maxValue - minValue) / noOfLevels
    levelsOfSamples = []
    quantizedValues = []
    minmax = []
    midpoints = []
    encodedLevels = []
    encodedLevelsOfSamples = []
    quantizationErrors = []
    for n in range(noOfLevels):
        minmax.append((minValue + delta * n, minValue + delta * (n + 1)))
        midpoints.append((minmax[n][0] + minmax[n][1]) / 2)
        encodedLevels.append(bin(n)[2:].zfill(int(np.ceil(np.log2(noOfLevels)))))
    for s in samples:
        for n in range(noOfLevels):
            if s >= minmax[n][0] and s <= minmax[n][1]:
                levelsOfSamples.append(n + 1)
                encodedLevelsOfSamples.append(encodedLevels[n])
                quantizedValues.append(midpoints[n])
                quantizationErrors.append(midpoints[n] - s)
    return levelsOfSamples, encodedLevelsOfSamples, quantizedValues, quantizationErrors

In [929]:
_, samples = readSignal("signals/task3/Quan2_input.txt")

In [930]:
quantize(4, samples)

([1, 3, 4, 3, 1, 1, 1, 1],
 ['00', '10', '11', '10', '00', '00', '00', '00'],
 [-1.4849999999999999,
  1.6149999999999998,
  3.1649999999999996,
  1.6149999999999998,
  -1.4849999999999999,
  -1.4849999999999999,
  -1.4849999999999999,
  -1.4849999999999999],
 [-0.2649999999999999,
  0.11499999999999977,
  -0.07500000000000062,
  -0.5850000000000004,
  -0.3849999999999998,
  0.7749999999999999,
  0.395,
  -0.2849999999999999])

In [931]:
def DCT(signal, m):
    N = len(signal)

    y = []

    for k in range(N):
        sum = 0
        for n in range(1, N + 1):
            sum += signal[n - 1] * np.cos(
                np.pi * (2 * (n - 1) - 1) * (2 * k - 1) / (4 * N)
            )

        y.append(np.sqrt(2 / N) * sum)

    with open("DCT_Output.txt", "w") as file:
        for value in y[:m]:
            file.write(f"{value}\n")

    return y

In [932]:
_, signal = readSignal(
    "/home/omar/codes/Digital-Signal-Processing/signals/task5/DCT/DCT/DCT_input.txt"
)  # signals/task5/DCT/DCT_input.txt

In [933]:
DCT(signal, 50)

[128.1597593167855,
 128.1597593167855,
 5.795813291240626,
 33.454448609780435,
 18.464289409252476,
 0.46378139392117945]

In [934]:
def norm_cross_crorrelation(signal1, signal2):
    N = len(signal1)
    result = []
    for j in range(N):
        sum = 0
        firstSum = 0
        secondSum = 0

        for n in range(N):
            sum += signal1[n] * signal2[j + n - N]
            firstSum += signal1[n] ** 2
            secondSum += signal2[n] ** 2

        numerator = sum / N
        denomurator = ((firstSum * secondSum) ** 0.5) / N
        result.append(numerator / denomurator)
    return result

In [935]:
norm_cross_crorrelation([2, 1, 0, 0, 3], [3, 2, 1, 1, 5])

[0.9719273929377941,
 0.5916079783099616,
 0.3803194146278325,
 0.4225771273642583,
 0.6761234037828133]

In [936]:
i, a = readSignal("signals/task7/FIR/Testcase 2/ecg_low_pass_filtered.txt")

In [937]:
draw(i, a)

In [938]:
def Low_Pass(fc, n):
    if n == 0:
        return 2 * fc
    omega_c = 2 * np.pi * fc
    return 2 * fc * (np.sin(n * omega_c) / (n * omega_c))

In [939]:
def High_Pass(fc, n):
    if n == 0:
        return 1 - (2 * fc)
    omega_c = 2 * np.pi * fc
    return -2 * fc * (np.sin(n * omega_c) / (n * omega_c))

In [940]:
def Band_Pass(f, n):
    if n == 0:
        return 2 * (f[1] - f[0])
    omega_1 = 2 * np.pi * f[0]
    omega_2 = 2 * np.pi * f[1]
    term1 = 2 * f[1] * (np.sin(n * omega_2) / (n * omega_2))
    term2 = 2 * f[0] * (np.sin(n * omega_1) / (n * omega_1))
    return term1 - term2

In [941]:
def Band_Stop(f, n):
    if n == 0:
        return 1 - 2 * (f[1] - f[0])
    omega_1 = 2 * np.pi * f[0]
    omega_2 = 2 * np.pi * f[1]
    term1 = 2 * f[0] * (np.sin(n * omega_1) / (n * omega_1))
    term2 = 2 * f[1] * (np.sin(n * omega_2) / (n * omega_2))
    return term1 - term2

In [942]:
FilterType = {
    "Low_Pass": Low_Pass,
    "High_Pass": High_Pass,
    "Band_Pass": Band_Pass,
    "Band_Stop": Band_Stop,
}

In [943]:
def RectangularWindow():
    return 1

In [944]:
def HanningWindow(n, N):
    if n==0:
        return 1
    return 0.5 + 0.5 * np.cos((2 * np.pi * n) / N)

In [945]:
def HammingWindow(n, N):
    if n==0:
        return 1
    return 0.54 + 0.46 * np.cos((2 * np.pi * n) / N)

In [946]:
def BlackmanWindow(n, N):
    if n==0:
        return 1
    term2 = 0.5 * np.cos((2 * np.pi * n) / (N - 1))
    term3 = 0.08 * np.cos((4 * np.pi * n) / (N - 1))
    return 0.42 + term2 + term3

In [947]:
WindowType = {
    "RectangularWindow": RectangularWindow,
    "HanningWindow": HanningWindow,
    "HammingWindow": HammingWindow,
    "BlackmanWindow": BlackmanWindow,
}

In [948]:
def SignalSamplesAreEqual(compareFile, indices, samples):
    expected_indices, expected_samples = readSignal(compareFile)

    if len(expected_samples) != len(samples):
        print(
            "Test case failed, your signal have different length from the expected one"
        )
        return
    for i in range(len(expected_samples)):
        if abs(samples[i] - expected_samples[i]) < 0.01:
            continue
        else:
            print(
                "Test case failed, your signal have different values from the expected one"
            )
            return
    print("Test case passed successfully")

In [949]:
def convolve_signals(
    signal1_indices, signal1_samples, signal2_indices, signal2_samples
):
    n = len(signal1_samples)
    m = len(signal2_samples)
    convolved_signal = [0] * (n + m - 1)

    for i in range(len(convolved_signal)):
        for j in range(max(0, i - m + 1), min(n, i + 1)):
            convolved_signal[i] += signal1_samples[j] * signal2_samples[i - j]

    convolved_indices = np.arange(
        signal1_indices[0] + signal2_indices[0],
        signal1_indices[-1] + signal2_indices[-1] + 1,
    )

    return convolved_signal, convolved_indices

In [964]:
def FIR_Filter(filterType, Fs, StopBandAttenuation, F, TransitionWidth):
    if StopBandAttenuation <= 21:
        window = "RectangularWindow"
        N = 0.9 / (TransitionWidth / Fs)
    elif StopBandAttenuation <= 44:
        window = "HanningWindow"
        N = 3.1 / (TransitionWidth / Fs)
    elif StopBandAttenuation <= 53:
        window = "HammingWindow"
        N = 3.3 / (TransitionWidth / Fs)
    elif StopBandAttenuation <= 74:
        window = "BlackmanWindow"
        N = 5.5 / (TransitionWidth / Fs)
    N = ceil(N)
    if N % 2 == 0:
        N += 1
    if filterType=='Low_Pass':
        F += (TransitionWidth / 2)
    elif filterType=='High_Pass':
        F -= (TransitionWidth / 2)
    F /= Fs
    h = []
    for n in range(ceil(N / 2)):
        h.append(FilterType[filterType](F, n) * WindowType[window](n, N))
    hReversed = h.copy()
    hReversed.reverse()
    hReversed.pop()
    return hReversed + h

In [965]:
i, a = readSignal("signals/task7/FIR/Testcase 4/ecg400.txt")

In [966]:
d = FIR_Filter("High_Pass", 8000, 70, 1500, 500)

In [967]:
ad, id = convolve_signals([0], d, i, a)

In [968]:
SignalSamplesAreEqual("signals/task7/FIR/Testcase 4/ecg_high_pass_filtered.txt", [0], ad)

Test case passed successfully


In [963]:
ad

[-8.470937539409223e-26,
 1.098676876302598e-12,
 -7.301317480293614e-12,
 -3.2250097343427344e-11,
 -3.2392940100342315e-11,
 4.060796769474499e-11,
 1.253174317661366e-10,
 6.28798758941582e-11,
 -1.6636383945550885e-10,
 -2.790304456304986e-10,
 1.4329608600889818e-11,
 4.864199860975075e-10,
 4.531499333883295e-10,
 -3.3645213989838674e-10,
 -1.0328643347377797e-09,
 -4.759998413413735e-10,
 1.1032903450462872e-09,
 1.7583391741286865e-09,
 4.7957402090745223e-11,
 -2.495945662895303e-09,
 -2.392254326225354e-09,
 1.317880448258136e-09,
 4.563167963326519e-09,
 2.3393587668825484e-09,
 -4.1627169112042935e-09,
 -6.982009008047709e-09,
 -6.684646985661253e-10,
 8.804243973257984e-09,
 8.790254483856703e-09,
 -3.884607028401226e-09,
 -1.5140492429522516e-08,
 -8.304266806580816e-09,
 1.2688606878859837e-08,
 2.2240161028890617e-08,
 2.8059910809817852e-09,
 -2.724634362852191e-08,
 -2.7911328860588372e-08,
 1.2501195910118892e-08,
 5.01556031814489e-08,
 2.697205224922697e-08,
 -5.17

In [None]:
# for f in range(len(a)):
#     a[f] /= 8000

In [957]:
ad

[-5.421400025221903e-18,
 7.031532008336627e-05,
 -0.0004672843187387912,
 -0.0020640062299793504,
 -0.0020731481664219083,
 0.00259890993246368,
 0.008020315633032743,
 0.004024312057226125,
 -0.010647285725152566,
 -0.017857948520351913,
 0.0009170949504569426,
 0.031130879110240477,
 0.029001595736853087,
 -0.021532936953496753,
 -0.0661033174232179,
 -0.030463989845847923,
 0.07061058208296239,
 0.11253370714423597,
 0.0030692737338077064,
 -0.15974052242529937,
 -0.1531042768784227,
 0.0843443486885207,
 0.2920427496528972,
 0.14971896108048316,
 -0.2664138823170747,
 -0.4468485765150534,
 -0.04278174070823193,
 0.563471614288511,
 0.562576286966829,
 -0.24861484981767829,
 -0.968991515489441,
 -0.5314730756211722,
 0.8120708402470296,
 1.4233703058489995,
 0.17958342918283388,
 -1.7437659922254025,
 -1.7863250470776557,
 0.8000765382476092,
 3.20995860361273,
 1.7262113439505258,
 -3.313880725543896,
 -5.924614632018731,
 0.8029595827552862,
 17.737638475589417,
 -27016.026564991

In [958]:
ad

[-5.421400025221903e-18,
 7.031532008336627e-05,
 -0.0004672843187387912,
 -0.0020640062299793504,
 -0.0020731481664219083,
 0.00259890993246368,
 0.008020315633032743,
 0.004024312057226125,
 -0.010647285725152566,
 -0.017857948520351913,
 0.0009170949504569426,
 0.031130879110240477,
 0.029001595736853087,
 -0.021532936953496753,
 -0.0661033174232179,
 -0.030463989845847923,
 0.07061058208296239,
 0.11253370714423597,
 0.0030692737338077064,
 -0.15974052242529937,
 -0.1531042768784227,
 0.0843443486885207,
 0.2920427496528972,
 0.14971896108048316,
 -0.2664138823170747,
 -0.4468485765150534,
 -0.04278174070823193,
 0.563471614288511,
 0.562576286966829,
 -0.24861484981767829,
 -0.968991515489441,
 -0.5314730756211722,
 0.8120708402470296,
 1.4233703058489995,
 0.17958342918283388,
 -1.7437659922254025,
 -1.7863250470776557,
 0.8000765382476092,
 3.20995860361273,
 1.7262113439505258,
 -3.313880725543896,
 -5.924614632018731,
 0.8029595827552862,
 17.737638475589417,
 -27016.026564991

In [959]:
ad

[-5.421400025221903e-18,
 7.031532008336627e-05,
 -0.0004672843187387912,
 -0.0020640062299793504,
 -0.0020731481664219083,
 0.00259890993246368,
 0.008020315633032743,
 0.004024312057226125,
 -0.010647285725152566,
 -0.017857948520351913,
 0.0009170949504569426,
 0.031130879110240477,
 0.029001595736853087,
 -0.021532936953496753,
 -0.0661033174232179,
 -0.030463989845847923,
 0.07061058208296239,
 0.11253370714423597,
 0.0030692737338077064,
 -0.15974052242529937,
 -0.1531042768784227,
 0.0843443486885207,
 0.2920427496528972,
 0.14971896108048316,
 -0.2664138823170747,
 -0.4468485765150534,
 -0.04278174070823193,
 0.563471614288511,
 0.562576286966829,
 -0.24861484981767829,
 -0.968991515489441,
 -0.5314730756211722,
 0.8120708402470296,
 1.4233703058489995,
 0.17958342918283388,
 -1.7437659922254025,
 -1.7863250470776557,
 0.8000765382476092,
 3.20995860361273,
 1.7262113439505258,
 -3.313880725543896,
 -5.924614632018731,
 0.8029595827552862,
 17.737638475589417,
 -27016.026564991