In [None]:
import numpy as np
import matplotlib.pyplot as plt

# NOTE: these lines define global figure properties used for publication.
import matplotlib_inline.backend_inline
matplotlib_inline.backend_inline.set_matplotlib_formats('svg') # print figures in svg format
plt.rcParams.update({'font.size':14}) # set global font size

# correlatation

In [None]:
N = 30

# correlated random var
x = np.linspace(0,10,N) + np.random.rand(N)
y = x + np.random.randn(N)

# setup figure
_, axs = plt.subplots(2, 2, figsize=(6,6))

def common_axs_setting(axs):
    axs.set_xlabel("Variable x")
    axs.set_ylabel("Variable y")
    axs.set_xticks([])
    axs.set_yticks([])
    axs.axis("square")

axs[0,0].plot(x, y, "ko")
axs[0,0].set_title("Positive correlation", fontweight="bold")
common_axs_setting(axs[0,0])

axs[0,1].plot(x, -y, "ko")
axs[0,1].set_title("negative correlation", fontweight="bold")
common_axs_setting(axs[0,1])

axs[1,0].plot(np.random.randn(N), np.random.randn(N), "ko")
axs[1,0].set_title('Zero correlation',fontweight='bold')
common_axs_setting(axs[1,0])

# /20 to scale down
x = np.cos(np.linspace(0, 2*np.pi, N)) + np.random.randn(N)/20
y = np.sin(np.linspace(0, 2*np.pi, N)) + np.random.randn(N)/20
axs[1,1].plot(x, y, "ko")
axs[1,1].set_title('Zero correlation',fontweight='bold')
common_axs_setting(axs[1,1])

plt.tight_layout()
# plt.savefig('Figure_04_01.png',dpi=300) # write out the fig to a file
plt.show()

# Ex1

Write a Python function that takes two vectors as input and provides two numbers as output: the Pearson correlation coefficient and the cosine similarity value. Write code that follows the formulas presented in this chapter; don’t simply call `np.corrcoef` and `spatial.distance`.cosine.</br>
Check that the two output values are identical when the variables are already mean centered and different when the variables are not mean centered.

In [None]:
def corrAndCos(x, y):
    """
    calcs cosine similarity and pearson corr.
    """
    # cosine similarity
    cos = np.dot(x, y)/(np.linalg.norm(x)*np.linalg.norm(y))
    
    # pearson corr.
    xm = x - np.mean(x)
    ym = y - np.mean(y)
    num = np.dot(xm, ym)
    den = (np.linalg.norm(xm)*np.linalg.norm(ym)) 
    cor = num/den
    return cor, cos

a = np.random.randn(3)
b = np.random.randn(3)

cor, cos = corrAndCos(a, b)
np_ans = np.corrcoef(a, b)[0,1]
print(f"{cor = }\n{cos = }\n{np_ans = }")
assert round(cor, 3)==round(np_ans, 3)

In [None]:
# compare r and c without mean-centering
a = np.random.randn(3) + 10
b = np.random.randn(3)

c = a - np.mean(a)
d = b - np.mean(b)

print(f"No mean center (should differ): {np.round(corrAndCos(a, b), 4)}")
print(f"With mean center (should be same): {np.round(corrAndCos(c, d), 4)}")
print("Note that the pearson. corr for both cases are the same.")

# Ex2

Create a variable containing the integers 0 through 3, and a second variable equaling the first variable plus some offset. 

You will then create a simulation in which you systematically vary that offset between −50 and +50 (that is, the first iteration of the simulation will have the second variable equal to [−50, −49, −48, −47]). 

In a for loop, compute the correlation and cosine similarity between the two variables and store these results. Then make a line plot showing how the correlation and cosine similarity are affected by the mean offset. 

In [None]:
a = np.arange(4, dtype=int) # essentially one of the vector for calc correlation
offsets = np.arange(-50, 51)

results = np.zeros((len(offsets), 2))

for i in range(len(offsets)):
    results[i,:] = corrAndCos(a, a+offsets[i]) 
    
plt.figure(figsize=(8,4))
h = plt.plot(offsets,results)
h[0].set_color('k')
h[0].set_marker('o')
h[1].set_color([.7,.7,.7])
h[1].set_marker('s')

plt.xlabel('Mean offset')
plt.ylabel('r or c')
plt.legend(['Pearson','Cosine sim.'])
# plt.savefig('Figure_04_02.png',dpi=300) # write out the fig to a file
plt.show()

# EX3

There are several Python functions to compute the Pearson correlation coefficient. One of them is called pearsonr and is located in the stats module of the SciPy library. Open the source code for this file (hint: ??functionname) and make sure you understand how the Python implementation maps onto the formulas introduced in this chapter

In [None]:
from scipy.stats import pearsonr

??pearsonr

# Ex 4

Why do you ever need to code your own functions when they already exist in Python? Part of the reason is that writing your own functions has huge educational value, because you see that (in this case) the correlation is a simple computation and not some incredibly sophisticated black-box algorithm that only a computer-science PhD could understand. But another reason is that built-in functions are sometimes slower because of myriad input checks, dealing with additional input options, converting data types, etc. This increases usability but at the expense of computation time.

Your goal in this exercise is to determine whether your own bare-bones correlation function is faster than NumPy’s corrcoef function. Modify the function from Exercise 4-2 to compute only the correlation coefficient. Then, in a for loop over 1,000 iterations, generate two variables of 500 random numbers and compute the correlation between them. Time the for loop. Then repeat but using `np.corrcoef`. In my tests, the custom function was about 33% faster than np.corrcoef. In these toy examples, the differences are measured in milliseconds, but if you are running billions of correlations with large datasets, those milliseconds really add up! 

(Note that writing your own functions without input checks has the risk of input errors that would be caught by np.corrcoef.) (Also note that the speed advantage breaks down for larger vectors.)

In [None]:
import time

def rho(x,y):
  xm = x-np.mean(x)
  ym = y-np.mean(y)
  n  = np.dot(xm,ym)
  d  = np.linalg.norm(xm) * np.linalg.norm(ym)
  return n/d


it = 1000
n = 500

tic = time.time()
for i in range(it):
  x = np.random.randn(n,2)
  rho(x[:,0], x[:,1])
t1 = time.time() - tic

tic = time.time()
for i in range(it):
  x = np.random.randn(n,2)
  pearsonr(x[:,0], x[:,1])
t2 = time.time() - tic

# Note: time() returns seconds, so I multiply by 1000 for ms
print(f'My function took {t1*1000:.2f} ms')
print(f'   pearsonr took {t2*1000:.2f} ms')

# Ex5

Let’s build an edge detector. The kernel for an edge detector is very simple: [−1 +1]. The dot product of that kernel with a snippet of a time series signal with constant value (e.g., [10 10]) is 0. 
But that dot product is large when the signal has a steep change (e.g., [1 10] produces a dot product of 9). The signal we’ll work with is a plateau function. Graphs A and B in Figure 4-5 show the kernel and the signal. The first step in this exercise is to write code that creates these two time series.

<p align="center">
  <img src=".\imgs\book\plad_0405.png" alt="">
</p>

Next, write a for loop over the time points in the signal. At each time point, compute the dot product between the kernel and a segment of the time series data that has the same length as the kernel. You should produce a plot that looks like graph C in Figure 4-5. 
Notice that our edge detector returned 0 when the signal was flat, +1 when the signal jumped up, and −1 when the signal jumped down.

Feel free to continue exploring this code. For example, does anything change if you pad the kernel with zeros ([0 −1 1 0])? What about if you flip the kernel to be [1 −1]? How about if the kernel is asymmetric ([−1 2])?

In [None]:
kernel = np.array([-1, 1])

signal = np.zeros(30)
signal[10:20]=1

feature_map = np.zeros(len(signal))

for t in range(1, len(signal)-1):
    feature_map[t] = np.dot(kernel, signal[t-1:t+1]) 

_, axs = plt.subplots(1,3, figsize=(15,3))
axs[0].plot(kernel,"ks-")
axs[0].set_title("kernel")
axs[0].set_xlim([-15,15])

axs[1].plot(signal, 'ks-')
axs[1].set_title("Time series signal")

axs[2].plot(signal, 'ks-', label="signal", linewidth=3)
markers, stemlines, _ = axs[2].stem(range(len(feature_map)), feature_map, 
                                    basefmt=" ", linefmt = "", markerfmt="o",
                                    label="Edge detection")
plt.setp(stemlines, "color", [.7,.7,.7])
plt.setp(markers, "color", [.7,.7,.7])


axs[2].legend()
# plt.savefig('Figure_04_04ac.png',dpi=300)
plt.show()

In [None]:
# define the kernel (a sorta-kinda Gaussian)
kernel = np.array([0,.1,.3,.8,1,.8,.3,.1,0])
kernel /= np.sum(kernel)
"""
Copilot
Divide kernel by np.sum(kernel): This is done to normalize the kernel. In other words, 
the sum of all elements in the kernel will be 1 after this operation. 

This is a common practice when using kernels for convolution in signal processing, 
as it ensures that the overall energy of the 
signal is preserved after the convolution operation.

Normalizing the kernel by its sum instead of its maximum value ensures that the overall energy or amplitude of the signal is 
preserved after the convolution operation.

When you normalize by the sum, the total weight of the kernel is 1. This means that when you apply the kernel to the signal, 
the sum of the original values in the window covered by the kernel will be the same before and after the convolution. 
This is important in many applications where the absolute scale of the signal matters.

If you normalize by the maximum value, the highest value in the kernel will be 1, but the sum of the kernel values may not be 1. 
This could result in the signal being amplified or attenuated after the convolution, depending on the shape of the kernel.

In summary, normalizing by the sum is a common practice in signal processing to preserve the energy or amplitude of the signal during convolution operations. However, the appropriate normalization method can depend on the specific application and the properties of the kernel and signal. I hope this helps! Let me know if you have any other questions.

Let us say we have a kernel of length 3: kernel = np.array([1, 2, 1]).
If we normalize by the sum, we divide each element by the sum of all elements (which is 4 in this case):

kernel = kernel / np.sum(kernel) 
       = [1/4, 2/4, 1/4] 
       = [0.25, 0.5, 0.25]

Now, let us say we apply this kernel to a part of the signal that is [3, 4, 3]. The convolution operation (dot product in this case) would be:

np.dot(kernel, signal) 
= 0.25*3 + 0.5*4 + 0.25*3 
= 0.75 + 2 + 0.75 
= 3.5

Notice that the result (3.5) is close to the original center value of the signal (4). 
This is because the kernel is normalized by sum, which preserves the overall energy of the signal.

On the other hand, if we normalize by the maximum value, the kernel becomes [1/2, 1, 1/2] = [0.5, 1, 0.5]. 
If we apply this kernel to the same part of the signal, the result would be:

np.dot(kernel, signal) 
= 0.5*3 + 1*4 + 0.5*3 
= 1.5 + 4 + 1.5 
= 7

This result (7) is larger than the original center value of the signal (4), which means the signal has been amplified. 
This is why normalizing by sum is often preferred in signal processing, as it preserves the overall energy of the signal.

"""

# length param.
Nkernel = len(kernel)
halfKrn = Nkernel//2 # find midpoint
"""
Copilot
finding the midpoint of the kernel. 

This is useful for the convolution operation, as the kernel is applied symmetrically 
around each point in the signal. By knowing the midpoint of the kernel, the coder can correctly
align the kernel with the signal during the convolution operation. The // operator is used for
integer division, which means that if the kernel length is odd, the midpoint will be rounded down. 
   
For example, if Nkernel is 9, halfKrn will be 4. This means the
kernel will be applied to the signal from 4 points before to 4 points after the current point. 
"""

# and the signal
Nsignal = 100
timeseries = np.random.randn(Nsignal)

# make a copy of the signal for filtering
filtsig = timeseries.copy()

# loop over the signal time points
for t in range(halfKrn+1,Nsignal-halfKrn):
  filtsig[t] = np.dot(kernel,timeseries[t-halfKrn-1:t+halfKrn])

# plot them
_,axs = plt.subplots(1,3,figsize=(25,4))
axs[0].plot(kernel,'ks-')
axs[0].set_title('Kernel')
axs[0].set_xlim([-1,Nsignal])

axs[1].plot(timeseries,'ks-')
axs[1].set_title('Time series signal')

axs[2].plot(timeseries,color='k',label='Original',linewidth=1)
axs[2].plot(filtsig,'--',color=[.6,.6,.6],label='Smoothed',linewidth=2)
axs[2].legend()

# plt.savefig('Figure_04_06c.png',dpi=300)
plt.show()

# Ex7
Replace the 1 in the center of the kernel with −1 and mean center the kernel. Then rerun the filtering and plot code. What is the result? It actually accentuates the sharp features! In fact, this kernel is now a high-pass filter, meaning it dampens the smooth (low-frequency) features and highlights the rapidly changing (high-frequency) features.

In [None]:
# define the kernel (a sorta-kinda Gaussian)
kernel = np.array([0,.1,.3,.8,-1,.8,.3,.1,0])
kernel /= np.sum(kernel)
kernel -= np.mean(kernel)

_, axs = plt.subplots(1,3, figsize=(25,4))
axs[0].plot(kernel, 's-')
axs[0].set_title("Kernel")
axs[0].set_xlim([-1, Nsignal])

axs[1].plot(timeseries, "s-")
axs[1].set_title('Time series signal')

filtsig2 = timeseries.copy()
for t in range(halfKrn+1,Nsignal-halfKrn):
  filtsig2[t] = np.dot(kernel,timeseries[t-halfKrn-1:t+halfKrn])

plt.plot(timeseries,color='k',label='Original',linewidth=1)
plt.plot(filtsig2,color=[.9,.2,.7],label='Sharpened',linewidth=1)
plt.legend()
plt.show()

# Ex8
One way to determine an optimal k is to repeat the clustering multiple times (each time using randomly initialized cluster centroids) and assess whether the final clustering is the same or different. Without generating new data, rerun the k-means code several times using k = 3 to see whether the resulting clusters are similar (this is a qualitative assessment based on visual inspection). Do the final cluster assignments generally seem similar even though the centroids are randomly selected?

In [None]:
## Create data
nPerClust = 50

# blur around centroid (std units)
blur = 1

# XY centroid locations
A = [1, 1]
B = [-3, 1]
C = [3, 3]

# generate data
a = [A[0]+np.random.randn(nPerClust)*blur, A[1]+np.random.randn(nPerClust)*blur]
b = [B[0]+np.random.randn(nPerClust)*blur, B[1]+np.random.randn(nPerClust)*blur]
c = [C[0]+np.random.randn(nPerClust)*blur, C[1]+np.random.randn(nPerClust)*blur]

# concatanate into a matrix
data = np.transpose(np.concatenate((a,b,c),axis=1))

# plot data
plt.plot(data[:,0],data[:,1],'ko',markerfacecolor='w')
plt.title('Raw (preclustered) data')
plt.xticks([])
plt.yticks([])

plt.show()

In [None]:
## initialize random cluster centroids
k = 3 # extract three clusters

# random cluster centers (randomly sampled data points)
ridx = np.random.choice(range(len(data)), k, replace=False)
centroids = data[ridx,:]

# setup the figure
fig, axs = plt.subplots(2,2,figsize=(6,6))
axs = axs.flatten()
lineColors = [[0,0,0],[.4,.4,.4],[.8,.8,.8] ] #'rbm'

# plot data with initial random cluster centroids
axs[0].plot(data[:,0],data[:,1],'ko',markerfacecolor='w')
axs[0].plot(centroids[:,0],centroids[:,1],'ko')
axs[0].set_title('Iteration 0')
axs[0].set_xticks([])
axs[0].set_yticks([])

# loop over iterations
for iteri in range(3):
    
  # step 1: compute distances
  dists = np.zeros((data.shape[0],k))
  for ci in range(k):
    dists[:,ci] = np.sum((data-centroids[ci,:])**2,axis=1)
  """
  Copilot

  we can vectorize this for loop with 
  dists = np.sum((data[:, np.newaxis] - centroids) **2, axis=2)
  """
        
  # step 2: assign to group based on minimum distance
  groupidx = np.argmin(dists,axis=1)
    
  # step 3: recompute centers
  for ki in range(k):
    centroids[ki,:] = [np.mean(data[groupidx==ki,0]), 
                       np.mean(data[groupidx==ki,1]) ]
  
  # plot data points
  for i in range(len(data)):
    axs[iteri+1].plot([data[i,0], centroids[groupidx[i],0]],
                      [data[i,1], centroids[groupidx[i],1]],
                      color=lineColors[groupidx[i]])
  axs[iteri+1].plot(centroids[:,0], centroids[:,1], 'ko')
  axs[iteri+1].set_title(f'Iteration {iteri+1}')
  axs[iteri+1].set_xticks([])
  axs[iteri+1].set_yticks([])

# plt.savefig('Figure_04_03.png',dpi=300)
plt.show()

# Ex9
Repeat the multiple clusterings using k = 2 and k = 4. What do you think of these results?

In [None]:
## initialize random cluster centroids
k = 2 # extract three clusters

# random cluster centers (randomly sampled data points)
ridx = np.random.choice(range(len(data)), k, replace=False)
centroids = data[ridx,:]

# setup the figure
fig, axs = plt.subplots(2,2,figsize=(6,6))
axs = axs.flatten()
lineColors = [[0,0,0],[.4,.4,.4],[.8,.8,.8] ] #'rbm'

# plot data with initial random cluster centroids
axs[0].plot(data[:,0],data[:,1],'ko',markerfacecolor='w')
axs[0].plot(centroids[:,0],centroids[:,1],'ko')
axs[0].set_title('Iteration 0')
axs[0].set_xticks([])
axs[0].set_yticks([])

# loop over iterations
for iteri in range(3):
    
  # step 1: compute distances
  dists = np.zeros((data.shape[0],k))
  for ci in range(k):
    dists[:,ci] = np.sum((data-centroids[ci,:])**2,axis=1)
  """
  Copilot

  we can vectorize this for loop with 
  dists = np.sum((data[:, np.newaxis] - centroids) **2, axis=2)
  """
        
  # step 2: assign to group based on minimum distance
  groupidx = np.argmin(dists,axis=1)
    
  # step 3: recompute centers
  for ki in range(k):
    centroids[ki,:] = [np.mean(data[groupidx==ki,0]), 
                       np.mean(data[groupidx==ki,1]) ]
  
  # plot data points
  for i in range(len(data)):
    axs[iteri+1].plot([data[i,0], centroids[groupidx[i],0]],
                      [data[i,1], centroids[groupidx[i],1]],
                      color=lineColors[groupidx[i]])
  axs[iteri+1].plot(centroids[:,0], centroids[:,1], 'ko')
  axs[iteri+1].set_title(f'Iteration {iteri+1}')
  axs[iteri+1].set_xticks([])
  axs[iteri+1].set_yticks([])

# plt.savefig('Figure_04_03.png',dpi=300)
plt.show()

In [None]:
## initialize random cluster centroids
k = 4 # extract three clusters

# random cluster centers (randomly sampled data points)
ridx = np.random.choice(range(len(data)), k, replace=False)
centroids = data[ridx,:]

# setup the figure
fig, axs = plt.subplots(2,2,figsize=(6,6))
axs = axs.flatten()
lineColors = [[0,0,0],[.4,.4,.4],[.8,.8,.8],[.6,.6,.6]] #'rbm'

# plot data with initial random cluster centroids
axs[0].plot(data[:,0],data[:,1],'ko',markerfacecolor='w')
axs[0].plot(centroids[:,0],centroids[:,1],'ko')
axs[0].set_title('Iteration 0')
axs[0].set_xticks([])
axs[0].set_yticks([])

# loop over iterations
for iteri in range(3):
    
  # step 1: compute distances
  dists = np.zeros((data.shape[0],k))
  for ci in range(k):
    dists[:,ci] = np.sum((data-centroids[ci,:])**2,axis=1)
  """
  Copilot

  we can vectorize this for loop with 
  dists = np.sum((data[:, np.newaxis] - centroids) **2, axis=2)
  """
        
  # step 2: assign to group based on minimum distance
  groupidx = np.argmin(dists,axis=1)
    
  # step 3: recompute centers
  for ki in range(k):
    centroids[ki,:] = [np.mean(data[groupidx==ki,0]), 
                       np.mean(data[groupidx==ki,1]) ]
  
  # plot data points
  for i in range(len(data)):
    axs[iteri+1].plot([data[i,0], centroids[groupidx[i],0]],
                      [data[i,1], centroids[groupidx[i],1]],
                      color=lineColors[groupidx[i]])
  axs[iteri+1].plot(centroids[:,0], centroids[:,1], 'ko')
  axs[iteri+1].set_title(f'Iteration {iteri+1}')
  axs[iteri+1].set_xticks([])
  axs[iteri+1].set_yticks([])

# plt.savefig('Figure_04_03.png',dpi=300)
plt.show()