https://betterexplained.com/articles/intuitive-convolution/
https://medium.com/analytics-vidhya/sum-of-two-random-variables-or-the-rocky-path-to-understanding-convolutions-of-probability-b0fc29aca3b5https://medium.com/analytics-vidhya/sum-of-two-random-variables-or-the-rocky-path-to-understanding-convolutions-of-probability-b0fc29aca3b5

In [1]:
import numpy as np
import matplotlib.pyplot as plt
from ipywidgets import interactive, fixed
import pandas as pd
from matplotlib.gridspec import GridSpec
from itertools import product

ipywidgets version: 8.1.0


### Convolution of 1D arrays without taking into account probabilities

In [2]:
# Define the arrays
array_1 = np.array([3, 3, 3])
array_2 = np.array([1, 2, 3, 4, 5])

# Calculate the final convolution result
final_result = np.convolve(array_1, array_2, 'full')

# Function to plot arrays and simulate convolution
def plot_arrays(offset):
    plt.figure(figsize=(5, 5))
    gs = GridSpec(3, 2)
    
    ext_length = len(array_1) + 2 * len(array_2) - 2
    extended_array_1 = np.zeros(ext_length)
    start_index1 = len(array_2) - 1
    end_index1 = start_index1 + len(array_1)
    extended_array_1[start_index1:end_index1] = array_1
    
    extended_array_2 = np.zeros(ext_length)
    start_index_2 = ext_length - offset - len(array_2)
    end_index_2 = start_index_2 + len(array_2)
    extended_array_2[start_index_2:end_index_2] = array_2
    
    # Create a DataFrame to display the extended arrays
    df = pd.DataFrame({
        'Extended array_1': extended_array_1,
        'Extended array_2 Reversed': extended_array_2[::-1]  # Reverse the second extended array
    }).T
    display(df)
    
    result = np.zeros(ext_length)
    partial_result = 0
    column_prods = df.prod()
    column_prods_df = pd.DataFrame({'Element-wise row multipl.': column_prods}).T
    partial_result = column_prods.sum()
    display(column_prods_df)
    print(f"Partial result: {partial_result}")
        
    result[offset] = partial_result  # Store the partial result at the current offset
    
    plt.subplot(gs[0, 0])
    plt.stem(extended_array_1, linefmt='-g', markerfmt='go', basefmt=" ")
    plt.title("array_1")
    
    plt.subplot(gs[1, 0])
    plt.stem(extended_array_2[::-1], linefmt='-b', markerfmt='bo', basefmt=" ")  # Reverse only for visualization
    plt.title("array_2 (Visualized as Reversed and Sliding)")
    
    plt.subplot(gs[2, 0])
    plt.stem(result, linefmt='-r', markerfmt='ro', basefmt=" ")
    plt.title("Convolution Result (Partial)")
    
    plt.subplot(gs[2, 1])
    plt.stem(final_result, linefmt='-m', markerfmt='mo', basefmt=" ")
    plt.title("Final Convolution Result")
    plt.xlabel("Offset")
    
    print(f"\nFinal convolution result: {final_result}")
    
    plt.tight_layout()
    plt.show()
    
interactive_plot = interactive(plot_arrays, offset=(0, len(array_1) + len(array_2) - 2))
interactive_plot.children[0].value = 0  # Set initial value of the slider to 0
output = interactive_plot.children[-1]
interactive_plot


interactive(children=(IntSlider(value=0, description='offset', max=6), Output()), _dom_classes=('widget-intera…

### Sum of all the combinations of arrays taking into account their probabilities without using convolution

In [3]:
def sum_diff_pmf(array_1, prob_array_1, array_2, prob_array_2, operation='sum'): # operation = 'sum' or 'difference'
    
    partial_df = pd.DataFrame(columns=["array_1", "array_2", "prob_1", "prob_2", operation, "prod_probs"])
    
    # Loop through all combinations of outcomes from array_1 and array_2
    for i, elem_1 in enumerate(array_1):
        for j, elem_2 in enumerate(array_2):
            
            # Get the probabilities
            prob_1 = prob_array_1[i]
            prob_2 = prob_array_2[j]

            # Calculate the operation and its probability
            if operation == "sum":
                res_oper = elem_1 + elem_2
            elif operation == "difference":
                res_oper = elem_1 - elem_2
            else:
               raise ValueError("Operation unknown!")
            
            prod_prob = prob_1 * prob_2

            # Populate the DataFrame
            partial_df = pd.concat([partial_df, pd.DataFrame([{"array_1": elem_1, "array_2": elem_2, "prob_1": prob_1, "prob_2": prob_2, operation: res_oper, "prod_probs": prod_prob}])], ignore_index=True)
    
    df_operation = partial_df[[operation, "prod_probs"]].copy(deep=True).groupby(operation).agg({'prod_probs': 'sum'}).reset_index()

    return partial_df, df_operation


# PMF 1
array_1 = np.array([1, 2])
prob_array_1 = np.array([0.5, 0.2])

# PMF 2
array_2 = np.array([7, 8, 9])
prob_array_2 = np.array([0.5, 0.2, 0.3])

# Calculate the sum of PMFs
partial_df_sum, df_sum = sum_diff_pmf(array_1, prob_array_1, array_2, prob_array_2, "sum")
partial_df_diff, df_diff = sum_diff_pmf(array_1, prob_array_1, array_2, prob_array_2, "difference")

# Display the DataFrame
print("\nSum of PMF's")
display(partial_df_sum)
display(df_sum)
print("\nDifference of PMF's")
display(partial_df_diff)
display(df_diff)


Sum of PMF's


Unnamed: 0,array_1,array_2,prob_1,prob_2,sum,prod_probs
0,1,7,0.5,0.5,8,0.25
1,1,8,0.5,0.2,9,0.1
2,1,9,0.5,0.3,10,0.15
3,2,7,0.2,0.5,9,0.1
4,2,8,0.2,0.2,10,0.04
5,2,9,0.2,0.3,11,0.06


Unnamed: 0,sum,prod_probs
0,8,0.25
1,9,0.2
2,10,0.19
3,11,0.06



Difference of PMF's


Unnamed: 0,array_1,array_2,prob_1,prob_2,difference,prod_probs
0,1,7,0.5,0.5,-6,0.25
1,1,8,0.5,0.2,-7,0.1
2,1,9,0.5,0.3,-8,0.15
3,2,7,0.2,0.5,-5,0.1
4,2,8,0.2,0.2,-6,0.04
5,2,9,0.2,0.3,-7,0.06


Unnamed: 0,difference,prod_probs
0,-8,0.15
1,-7,0.16
2,-6,0.29
3,-5,0.1


### Sum of arrays with probabilities using convolution

In [4]:
def find_all_sums(array_1, array_2):
    all_sums = [a + b for a, b in product(array_1, array_2)]
    return sorted(all_sums)

def find_step_size(arr):
    max_decimals = 0
    for num in arr:
        if '.' in str(num):
            decimal_part = str(num).split('.')[-1]
            non_zero_decimals = len(decimal_part.rstrip('0'))
            max_decimals = max(max_decimals, non_zero_decimals)
    step_size = 10 ** -max_decimals if max_decimals > 0 else 1
    return step_size

def extend_prob_array(arr, prob_arr, step_size):
    temp = np.arange(np.min(arr), np.max(arr) + step_size, step_size)
    # Remove any number greater than np.max(arr)
    temp = temp[temp < np.max(arr) + step_size]
    extended_array = np.zeros(len(temp))
    counter = 0
    for i in range(len(temp)):
        if any(np.isclose(temp[i], value) for value in arr):
            extended_array[i] = prob_arr[counter]
            counter += 1
    return extended_array, step_size

def fill_sum_array(input_array, step_size):
    min_value = np.min(input_array)
    max_value = np.max(input_array)
    # Generate the filled array using np.arange
    filled_array = np.arange(min_value, max_value + step_size, step_size)
    # Remove any number greater than max_value
    filled_array = filled_array[filled_array < max_value + step_size]
    return filled_array

def plot_arrays(offset, array_1, array_2, all_sums_filled):
    plt.figure(figsize=(5, 5))
    gs = GridSpec(3, 2)
    
    ext_length = len(array_1) + 2 * len(array_2) - 2
    extended_array_1 = np.zeros(ext_length)
    start_index1 = len(array_2) - 1
    end_index1 = start_index1 + len(array_1)
    extended_array_1[start_index1:end_index1] = array_1
    
    extended_array_2 = np.zeros(ext_length)
    start_index_2 = ext_length - offset - len(array_2)
    end_index_2 = start_index_2 + len(array_2)
    extended_array_2[start_index_2:end_index_2] = array_2
    
    df = pd.DataFrame({
        'Extended array_1': extended_array_1,
        'Extended array_2 Reversed': extended_array_2[::-1]  # Reverse the second extended array
    }).T
    display(df);
    
    result = np.zeros(ext_length)
    partial_result = round(df.prod().sum(), 4)
    
    display(pd.DataFrame({'Element-wise row multipl.': df.prod()}).T);
    print(f"Partial result: {partial_result:.2f}")
        
    result[offset] = partial_result  # Store the partial result at the current offset
    
    plt.subplot(gs[0, 0])
    plt.stem(extended_array_1, linefmt='-g', markerfmt='go', basefmt=" ")
    plt.title("array_1")
    
    plt.subplot(gs[1, 0])
    plt.stem(extended_array_2[::-1], linefmt='-b', markerfmt='bo', basefmt=" ")
    plt.title("array_2 (Visualized as Reversed and Sliding)")
    
    plt.subplot(gs[2, 0])
    plt.stem(result, linefmt='-r', markerfmt='ro', basefmt=" ")
    plt.title("Convolution Result (Partial)")
    
    final_result = np.convolve(array_1, array_2)
    plt.subplot(gs[2, 1])
    plt.stem(final_result, linefmt='-m', markerfmt='mo', basefmt=" ")
    plt.title("Final Convolution Result")
    plt.xlabel("Offset")
    plt.xticks(ticks=range(len(final_result)), labels=np.round(all_sums_filled, 4), rotation=90)
    
    print(f"\nFinal convolution result: {final_result}")
    
    nonzero_indices = np.where(final_result != 0)[0]
    nonzero_sums = all_sums_filled[nonzero_indices]
    nonzero_probs = final_result[nonzero_indices]
    
    plt.tight_layout()
    plt.show()
    df_result = pd.DataFrame({"sum":nonzero_sums, "prod_probs":nonzero_probs})
    print("Result without elements with prob = 0")
    display(df_result)

step_size = min(find_step_size(array_1), find_step_size(array_2))

extended_prob_array_1, step_size_array_1 = extend_prob_array(array_1, prob_array_1, step_size)
extended_prob_array_2, step_size_array_2 = extend_prob_array(array_2, prob_array_2, step_size)

all_sums = find_all_sums(array_1, array_2)
all_sums_filled =  fill_sum_array(all_sums, step_size)
interactive_plot = interactive(plot_arrays, offset=(0, len(extended_prob_array_1) + len(extended_prob_array_2) - 2), 
                               array_1=fixed(extended_prob_array_1), array_2=fixed(extended_prob_array_2), all_sums_filled=fixed(all_sums_filled))
interactive_plot.children[0].value = 0  # Set initial value of the slider to 0
output = interactive_plot.children[-1]
interactive_plot

interactive(children=(IntSlider(value=0, description='offset', max=3), Output()), _dom_classes=('widget-intera…

In [5]:
import numpy as np

def find_step_size(arr):
    # Replace this with your actual step size calculation logic
    return 0.1

def extend_prob_array(arr, prob_arr):
    step_size = find_step_size(arr)
    min_value = np.min(arr)
    max_value = np.max(arr)
    
    # Calculate the number of points needed
    num_points = int((max_value - min_value) / step_size) + 1

    # Generate the array using linspace
    temp = np.linspace(min_value, max_value, num_points, endpoint=True)
    extended_array = np.zeros(len(temp))
    
    counter = 0
    for i in range(len(temp)):
        if any(np.isclose(temp[i], value) for value in arr):
            extended_array[i] = prob_arr[counter]
            counter += 1
    return extended_array, step_size

# Test the function
arr = np.array([0.1, 0.5, 0.7])
prob_arr = np.array([0.2, 0.4, 0.6])
extended_array, step_size = extend_prob_array(arr, prob_arr)
print("Extended Array:", extended_array)
print("Step Size:", step_size)


Extended Array: [0.2 0.  0.  0.  0.  0.4]
Step Size: 0.1


In [6]:
import numpy as np
num_points = int((6 - 3.1) / 0.1) + 1  # Calculate the number of points
vector = np.linspace(3.1, 6, num_points)
print(vector)

[3.1        3.20357143 3.30714286 3.41071429 3.51428571 3.61785714
 3.72142857 3.825      3.92857143 4.03214286 4.13571429 4.23928571
 4.34285714 4.44642857 4.55       4.65357143 4.75714286 4.86071429
 4.96428571 5.06785714 5.17142857 5.275      5.37857143 5.48214286
 5.58571429 5.68928571 5.79285714 5.89642857 6.        ]


In [7]:
import numpy as np

def find_step_size(arr):
    # Your logic to find the step size
    return 0.1  # Replace this with your actual step size computation

def extend_prob_array(arr, prob_arr):
    step_size = find_step_size(arr)
    temp = np.arange(np.min(arr), np.max(arr) + step_size, step_size)
    
    # Remove any number greater than np.max(arr)
    temp = temp[temp <= np.max(arr)]
    
    extended_array = np.zeros(len(temp))
    counter = 0
    for i in range(len(temp)):
        if any(np.isclose(temp[i], value) for value in arr):
            extended_array[i] = prob_arr[counter]
            counter += 1
    return extended_array, step_size

# Example usage
arr = [1.1, 2.2, 3.3]
prob_arr = [0.2, 0.4, 0.1]
extended_array, step_size = extend_prob_array(arr, prob_arr)
print("Extended Array:", extended_array)
print("Step Size:", step_size)

Extended Array: [0.2 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.4 0.  0.  0.  0.  0.  0.
 0.  0.  0.  0. ]
Step Size: 0.1
