# Do Neural Networks Learn to Do Harmonic Analysis? 

### This is all the code for the circular data project. Included is datagen (for circular and survey), as well as the neural network.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from random import randint
import random
import numpy as np
import pandas as pd 
import torch.optim as optim
import os
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import Dataset
import torch
from scipy.stats import vonmises
import numpy.fft as fft 
import math

In [None]:
'''
    Class code for the network
'''
class CircleNet(nn.Sequential): 
    def __init__(self, res, dropout, n_out, hidden_nodes=4): 
   '''
       res :: int; input layer size
       dropout:: ; TODO
       n_out::int; output layer size 
   '''
        super(CircleNet, self).__init__()
        self.n_in = res
        self.affine1 = nn.Linear(self.n_in, hidden_nodes)
        self.drop1 = nn.Dropout(dropout)
        self.affine3 = nn.Linear(hidden_nodes, n_out)
    
    def forward(self, x): 
        '''
            Forward feed method
            Currently utilises ReLu activation function and softmax for output.
        '''
        x = F.relu(self.affine1(x))
        x = self.drop1(x)
        x = F.relu(self.affine3(x))
        x = F.softmax(x, dim = 1)
        return x

### The following code blocks are all for data generation. Data generation outputs to files so that we can keep them for consistent testing. Note: data generation for the hypercube distribution is at the end of the file because it is somewhat long.

In [None]:
'''
    Generate Uniform Circular Data
'''
num_points = 100 # Total number of data points for each sample
num_samples = 10000 # Total number of samples to generate
res = 8
mean = num_points / res
std = math.sqrt(1/res*(1-1/res)) * math.sqrt(num_points)
data = np.zeros([num_points,res + 1])
output_name = "uniform8ptnormalised.csv"

# generates uniform 
# Generates by coosing a random bucket
for i in range(num_points): 
    for j in range(num_samples): 
        num = randint(0,res-1)
        data[i][num]+= 1
        data[i][res] = -1
    data[i][:-1] = (data[i][:-1]-mean)/std
np.savetxt(output_name, data, delimiter = ",")

In [None]:
'''
    Generate unimodal von mesis data.
'''

num_buckets = 8
data = np.zeros([10000,num_buckets+1])
std = math.sqrt(1/num_buckets*(1-1/num_buckets))
mean = num_points / num_buckets
SAMPLE_SIZE = 100
kappa = 1.5
num_samples = 10000
output_name = "vonmises8ptnormtest.csv"
for i in range(num_samples):
    # von mesis always chooses 0 as mean so we apply random rotation to 
    # get a different mean bucket on each sample set.
    rotate_amt = random.random()*2*np.pi
    rotate_amt2 = random.random()*2*np.pi # TODO why 2 rotations?
    vm_dist = vonmises.rvs(kappa, loc = rotate_amt, size=SAMPLE_SIZE//2)
    vm_dist2 = vonmises.rvs(kappa, loc = rotate_amt2, size = SAMPLE_SIZE//2)
    for l in range(SAMPLE_SIZE//2):
        # determines which buckets to put sample in
        vm_dist[l] = ((vm_dist[l] + np.pi)/(2*np.pi))%1*(num_buckets*10)//10
        vm_dist2[l] = ((vm_dist2[l] + np.pi)/(2*np.pi))%1*(num_buckets*10)//10
        data[i][int(vm_dist[l])] += 1
        data[i][int(vm_dist2[l])]+=1
    data[i][:-1] = (data[i][:-1] - mean)/std
    data[i,-1] = -7
np.savetxt(output_name, data, delimiter = ",")    

In [None]:
'''
    Generate Bimodal Von Mesis distribution.
'''

# TODO unhardcode number of buckets
num_samples = 10000
SAMPLE_SIZE = 100
output_name = "bimodal8pt100.csv"
data = np.zeros([num_samples,res+1])
for i in range(num_samples):
    kappa = 1
    # von mesis function always chooses 0 as mean, so we add a random rotation
    rotate_amt = random.random()*2*np.pi
    vm_dist = vonmises.rvs(kappa, loc = rotate_amt, size=SAMPLE_SIZE)
    for l in range(SAMPLE_SIZE):
        random_rotate_factor = randint(0,1)
        # determines which bucket to place sample in
        vm_dist[l] = ((vm_dist[l] + np.pi + np.pi*random_rotate_factor)/(2*np.pi))%1*(res*10)//10
    
    for k in range(SAMPLE_SIZE):
        data[i][int(vm_dist[k])] += 1
    data[i][:-1] = (data[i][:-1] - 12)/5
    data[i,-1] = -6
np.savetxt(output_name, data, delimiter = ",")   

In [None]:
'''
    Class for loading a previously saved dataset. Loads the data in such a way that 
    it can be used by the data loader.
    data_root::string; the name of the file to load
'''
class CircularDataset(Dataset): 
    def __init__(self, data_root): 
        self.data = []
        self.dist_coder = LabelEncoder()
        self.dist_list = []
        for path in data_root: 
            array = pd.read_csv(path)
            
            self.dist_list += [array.values[0,-1]] ##get the distribution type for each file and add it to the list
            if len(self.data) == 0: 
                self.data = array.values
            else:
                print(self.data.shape)
                print(array.values.shape)
                self.data = np.concatenate((self.data,array.values), axis = 0)
        self.dist_coder.fit(self.dist_list)
    def __getitem__(self,idx): 
        return self.data[idx][0:-1], self.to_one_hot(self.dist_coder,([self.data[idx,-1]]))[0]
    def __len__(self): 
        return len(self.data)
    def to_one_hot(self, codec, values):
        value_idxs = codec.transform(values)
        return torch.eye(len(codec.classes_))[value_idxs]

## The next set of code is for the actual training of the network

In [None]:
# Initialize the weights of the network with bias.
def init_weights(m):
    if type(m) == nn.Linear:
        torch.nn.init.xavier_uniform_(m.weight)
        m.bias.data.fill_(0.05)
        
rate = .005
batchsize = 100

'''
    dataset is the list of files to train the network on.
    testdata is the list of files to test the network's performance on.
    Change these filenames as needed
'''
dataset = CircularDataset(["uniform8ptdata.csv","vonmises8ptnorm.csv"])
testdata = CircularDataset(["uniform8ptnormalisedtest.csv", "vonmises8ptnormtest.csv"])

iterator = torch.utils.data.DataLoader(dataset, batch_size = batchsize, shuffle = True)
testiterator = torch.utils.data.DataLoader(testdata, batch_size = 1, shuffle = True)
# Network arguments: input layer size, dropout, output layer size
net = CircleNet(8,0,2)
optimizer = optim.Adam(net.parameters(),lr = rate)
running_loss = 0
netlist = []
epochs = 12
net.apply(init_weights)
net.train()
loss_func = nn.CrossEntropyLoss()
for e in range(epochs): 
    print(running_loss) # TODO what does this do, should we delete it?
    running_loss = 0
    accuracy = 0
    i = 0
    for dist, labels in iterator: 
        
        
        optimizer.zero_grad()
        out = net(dist.float())
        
        loss = loss_func(out,torch.max(labels, 1)[1])
       
           
        loss.backward()
        optimizer.step()
        running_loss += loss
    for dist, labels in testiterator: 
        ps = net(dist.float())

        i += 1
        top_p, top_class = ps.topk(1, dim=1)
       
        top_p2, top_class2 = labels.topk(1, dim = 1)
        
        equals = top_class == top_class2
        
        accuracy += torch.sum(equals.float())
        
    # TODO what precisely does this print?
    print(accuracy)
    print(i)
    accuracy = 0
    

In [None]:
'''
    Circular Classification Weighting Visualisation
    
    Once the network finishes training we use this block to output investigate the weights.
    TODO throw in the visualisation code here too, idk where it is
'''
paramlist2 = []
for parameter in net.parameters(): 
    paramlist2 += [parameter]
array2 = paramlist2[0].detach().numpy()
np.around(np.matmul(array2,fft.ifft(np.identity(8),8)),1)

In [None]:
Here are some example plots for the output. For explanation please read the report document.

![title](document_figures/image4.png)
![title](document_figures/image8.png)

In [None]:
'''
    Hypercube Classification Weighting Visualisation
    
    Run this code block once the network has finished training to output the weight matrix,
    as well as a plot of the weighting on each hadamard basis.
'''
paramlist2 = []
for parameter in net.parameters(): 
    paramlist2 += [parameter]
array2 = paramlist2[0].detach().numpy()
array2
basis = np.matmul(array2, inv(hadamard(8)))
for a in basis:
    plt.plot(a)

Here are some example plots for the output. For explanation please read the report document.

![title](document_figures/image7.png)
![title](document_figures/image1.png)

### The following code blocks are for the data generation. Both circular datasets and survey datasets

In [None]:
import random
import itertools
import functools
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm

'''
    Data generation for the survey data on a hypercube
'''

# Data Generation
# length: int; length of each binary string
def generateNormalData(length, sampleSize, sd=1, plot_res = False):
    # generate all binary strings of given length
    binaryStrings = [''.join(map(str,i)) for i in itertools.product([0,1], repeat=length)]
    meanBucket = random.choice(binaryStrings)
    
    # map each bucket to their distance from the mean bucket
    distances = {i : getDistance(meanBucket, i) for i in binaryStrings}
    samples = {i: 0 for i in binaryStrings} # sample data
    bucket_size = 2 * sd / max(distances.values()) # map buckets from 0 to 2*sd (MAKE THIS A PARAMETER LATER)
    
    # map each point to their pdf
    pdfs = {i : norm.pdf(distances[i] * bucket_size) for i in binaryStrings}
    # normalise and map each pdf to some value between 0 and 1
    norm_const = sum(pdfs.values()) 
    normalised_cumu = {i : 0 for i in binaryStrings}
    s = 0
    for key in pdfs:
        normalised_pdf = pdfs[key] / norm_const
        normalised_cumu[key] = normalised_pdf + s
        s += normalised_pdf
    
    # Maps the right end point of each bucket to its corresponding binary string
    cumu_to_bin = {normalised_cumu[i] : i for i in binaryStrings}
    
    # Get samples
    # find which bin it fits in first
    # This works because we iterate from smallest to largest bucket
    cdf_arr = sorted(list(cumu_to_bin.keys()))
    for n in range(sampleSize):
        roll = random.random()
        first_bucket = None
        for bucket in cdf_arr:
            if roll < bucket:
                first_bucket = bucket
                break
        samples[cumu_to_bin[first_bucket]] += 1

    # randomly invert all of the keys
    # TODO change this so that each digit has an independent chance of inversion
    random_inversion = random.random()
    if random_inversion < 0.5:
        # swap all the ones and zeros
        new_samples = {}
        for key in samples:
            new_key = ''.join(list(map(
                lambda x: "1" if x == "0" else "0", key 
            )))
            new_samples[new_key] = samples[key]
    
    # Optional plotting of distribution
    if plot_res:
        x = sorted(binaryStrings, key = lambda s: distances[s])
        y = [samples[i] for i in x]
        plt.plot(x, y)
        plt.show()
    return samples

# Helper function to get the distance between two binary strings
# b1, b2::string; Binary strings to find distance between
def getDistance(b1, b2):
    # Find the differences between each string
    if len(b1) != len(b2):
        raise Exception("b1, b2 not the same length")
    differences = ''
    for i in range(len(b1)):
        if b1[i] != b2[i]:
            differences += '1'
        else:
            differences += '0'
    return sum([int(i) for i in differences])
            
# Generate Uniform Data
'''
    Length::int; binary string length. ie dimension of the hypercube
    sampleSize::int; number of samples to generate
'''
def generateUniformData(length, sampleSize):
    # Generates a uniform distribution over binary survey results
    # Return: dictionary Strings -> ints
    binaryStrings = [''.join(map(str,i)) for i in itertools.product([0,1], repeat=length)]
    
   
    surveyChoices = {
        i : 0 for i in binaryStrings
    }
    for n in range(sampleSize):
        surveyChoices[random.choice(binaryStrings)] += 1
    
    return surveyChoices

'''
    Generates a bunch of normally distributed data and saves it to a file
    
    length::int; the dimension of the hypercube to generate data on
    data_points::int; how many data points to generate
    sample_size::int; how many samples each data point should be
    name::string; output file name
'''
def genNormalDataToFile(length, data_points, sample_size, name):
    # save to file.
    # save in order of binary strings
    data = np.zeros([data_points, 2**length+1])
    for i in range(data_points):
        samples = generateNormalData(length, sampleSize=sample_size)
        binaryStrings = sorted(samples.keys())
        for j in range(len(binaryStrings)):
            bs = binaryStrings[j]
            # j is binary string index
            data[i][j] = samples[bs]
        data[i][-1] = -11 # Arbritrary label
    np.savetxt(name, data, delimiter = ",") 

'''
    Generates a bunch of uniformally distributed data and saves it to a file
    
    length::int; the dimension of the hypercube to generate data on
    data_points::int; how many data points to generate
    sample_size::int; how many samples each data point should be
    name::string; output file name
'''
def genUniformDataToFile(length, data_points, sample_size, name):
    # save to file.
    # save in order of binary strings
    data = np.zeros([data_points, 2**length+1])
    for i in range(data_points):
        samples = generateUniformData(length, sampleSize=sample_size)
        binaryStrings = sorted(samples.keys())
        for j in range(len(binaryStrings)):
            bs = binaryStrings[j]
            # j is binary string index
            data[i][j] = samples[bs]
        data[i][-1] = -10 # Arbritrary label
    np.savetxt(name, data, delimiter = ",") 
    
# Code used to actually generate the data. Uncomment and adjust as needed
'''
for n in ["surveyDataNormal3qs.csv", "surveyDataNormal3qsTest.csv"]:
    genNormalDataToFile(length=3, data_points=10000, sample_size=250, name=n)

for n in ["surveyDataUniform3qs.csv", "surveyDataUniform3qsTest.csv"]:
    genUniformDataToFile(length=3, data_points=10000, sample_size=250, name=n)
'''

![title](document_figures/image5.png)