# Coefficient Symmetry Under Parallelism

Here we will try to verify our (manually) derived equations, describing how many MCM multiplications we will really need to implement given different coefficient symmetries. We'll be comparing polyphase structures (with a shared MCM block per phase) to FFA structures (where each MCM block is isolated).

We'll do this verification by implementing our equations and checking the results against a reference, using symbolic programming. This reference will:

  1. Generate symbols (using `sympy`) for each style of coefficient symmetry
  2. Rearrange and combine these coefficient sets into the appropriate subfilters for FFA or polyphase
  3. Simplify these terms and remove any duplicates, zeros, or negations of other terms
  4. Count the remaining terms!

Let's get started

In [None]:
import sympy as sym
import pandas as pd
from math import ceil, floor, log2
import enum

First, let's go from a flat set of coefficients to a nested list, describing equivalent FFA and polyphase subfilters.

In [None]:
def hffa(p, ws):
    """ Get all subfilter coefficient sets for FFA structures.
    
        `p` is the level of parallelism (should be a power-of-2)
        `ws` is the list of coefficients (values or symbols)
    """
    if p==1:
        return [ws]
    return hffa(p//2, ws[0::2]) + \
           hffa(p//2, [a+b for (a,b) in zip(ws[0::2],ws[1::2])]) + \
           hffa(p//2, ws[1::2])


def hpoly(p, ws):
    """ Get all subfilter coefficient sets for polyphase structures.
    
        `p` is the level of parallelism
        `ws` is the list of coefficients (values or symbols)
    """
    return [ws for _ in range(p)]

Now we'll define helpers for our reference, including checking equality symbolically, filtering out duplicates, and counting the survivors.

In [None]:
def eqTerms(x, y):
    """ Check if two terms are symbolically equivalent.
    
        `x` and `y` are symbols or expressions from the `sympy` library
    """
    return 0 == sym.simplify(x-y)


def uniqAbsTerms(xs):
    """ Filter our any convertible terms from a list.
    
        Two terms are _convertible_ if they are zero, equal, or the negation of each other.
        
        `xs` is a list of terms to check
    """
    uniqs = []
    for x in xs:
        xabs = sym.Abs(x)
        if not ( eqTerms(x,0) or any([eqTerms(xabs,u) for u in uniqs]) ):
            uniqs.append(xabs)
    # NOTE: We aren't checking if one term is a bitshifted version of an existing one...
    #       Can we prove this will not happen?
    return uniqs

def countMults(h):
    """ Count the number of multiplications in a set of subfilter sets.
    
        `h` is the list of subfilter coefficient sets
    """
    return sum([len(ws) for ws in h])

We also need to generate coefficient symbols for each style. Let's do this as one helper function, `genCoeffs`. 

In [None]:
_nonlinPhase = lambda n : [sym.Symbol('w'+str(i)) for i in range(n)]

_type2 = lambda n :       [sym.Symbol('w'+str(i)) for i in range(n//2)] + \
                          [sym.Symbol('w'+str(i)) for i in range(n//2-1,-1,-1)]

_type4 = lambda n :       [sym.Symbol('w'+str(i)) for i in range(n//2)] + \
                          [(-1)*sym.Symbol('w'+str(i)) for i in range(n//2-1,-1,-1)]

_type1Padded = lambda n : [sym.Symbol('w'+str(i)) for i in range(n//2 - 1)] + \
                          [sym.Symbol('w'+str(n//2-1))] + \
                          [sym.Symbol('w'+str(i)) for i in range(n//2-1-1,-1,-1)] + \
                          [0]

_type3Padded = lambda n : [sym.Symbol('w'+str(i)) for i in range(n//2 - 1)] + \
                          [sym.Symbol('w'+str(n//2-1))] + \
                          [(-1)*sym.Symbol('w'+str(i)) for i in range(n//2-1-1,-1,-1)] + \
                          [0]

def _halfBand(n):
    ws = [0 for _ in range(n)]
    if n//2 % 2 == 1:
        ws[::2] = _type1Padded(n//2+1)[:-1]
    else:
        ws[::2] = _type1Padded(n//2)
    return ws

class CoeffType(enum.Enum):
    """ Enum for coefficient set types """
    Type1Pad = 1
    Type2 = 2
    Type3Pad = 3
    Type4 = 4
    NonLinPhase = 5
    HalfBand = 6

def genCoeffs(coeffType, n):
    """ Generate symbolic coefficient sets for a given style
    
        `coeffType` is a `CoeffType` enum
        `n` is the number of taps
    """
    if coeffType == CoeffType.Type1Pad:
        return _type1Padded(n)
    if coeffType == CoeffType.Type2:
        return _type2(n)
    if coeffType == CoeffType.Type3Pad:
        return _type3Padded(n)
    if coeffType == CoeffType.Type4:
        return _type4(n)
    if coeffType == CoeffType.NonLinPhase:
        return _nonlinPhase(n)
    if coeffType == CoeffType.HalfBand:
        return _halfBand(n)

We'll also implement the equations we have derived for each of these styles. We collect them up into the `ffa_eqns` and `poly_eqns` functions.

In [None]:
def ffa_hnlp(par, n):
    p = int(log2(par))
    return n*3**(p)

def ffa_htype2(par, n):
    p = int(log2(par))
    return ceil(n/2.0)+2*n*sum([3**i for i in range(0,p)])

def ffa_htype4(par, n):
    p = int(log2(par))
    return floor(n/2.0)+2*n*sum([3**i for i in range(0,p)])

def ffa_htype1(par, n):
    p = int(log2(par))
    return n*(2 + sum([3**k for k in range(1,p)]) + 2*sum([sum ([3**j for j in range(0,i+1)]) for i in range(0,p-1)])) + (p-1)*(ceil(n/2.0))

def ffa_htype3(par, n):
    p = int(log2(par))
    return n*(2 + sum([3**k for k in range(1,p)]) + 2*sum([sum ([3**j for j in range(0,i+1)]) for i in range(0,p-1)])) + (p-1)*(floor(n/2.0))

def ffa_hhb(par, n):
    p = int(log2(par))
    if p == 1:
        return 2*ceil(n/2.0)
    return 2 * ffa_htype1(2**(p-1), n)

def ffa_eqns(ctype, par, n):
    if ctype == CoeffType.Type1Pad:
        return ffa_htype1(par, n)
    if ctype == CoeffType.Type2:
        return ffa_htype2(par, n)
    if ctype == CoeffType.Type3Pad:
        return ffa_htype3(par, n)
    if ctype == CoeffType.Type4:
        return ffa_htype4(par, n)
    if ctype == CoeffType.NonLinPhase:
        return ffa_hnlp(par, n)
    if ctype == CoeffType.HalfBand:
        return ffa_hhb(par,n)

def poly_eqns(ctype, par, n):
    taps = par*n
    if ctype == CoeffType.Type1Pad:
        return par*ceil(taps/2.0)
    if ctype == CoeffType.Type2:
        return par*floor(taps/2.0)
    if ctype == CoeffType.Type3Pad:
        return par*ceil(taps/2.0)
    if ctype == CoeffType.Type4:
        return par*floor(taps/2.0)
    if ctype == CoeffType.NonLinPhase:
        return par*taps
    if ctype == CoeffType.HalfBand:
        return par*ceil(taps/4.0)

Let's iterate through a set of parallelisms and taps, recording the symbolic reference and our equation results for FFA and polyphase filters. This might take a few minutes!

In [None]:
frame = pd.DataFrame(columns=['FIR', 'CoeffType', 'Parallelism', 'N', 'SubfiltN', 'Mults'])

for p in [2,4,8,16]:
    print(f'Running p{p}')
    for nByP in [1,2,3,4]:
        print(f'Running nByP{nByP}')
        for coeffType in CoeffType:
            ws = genCoeffs(coeffType, p*nByP)
            
            mults = ffa_eqns(coeffType, p, nByP)
            frame = frame.append(dict(FIR='FFAEqn', CoeffType=coeffType.name, Parallelism=p, N=(p*nByP), SubfiltN=nByP, Mults=mults, MultsPerChannel=mults/p), ignore_index=True)
            
            mults = countMults([uniqAbsTerms(h) for h in hffa(p, ws)])
            frame = frame.append(dict(FIR='FFA'  , CoeffType=coeffType.name, Parallelism=p, N=(p*nByP), SubfiltN=nByP, Mults=mults, MultsPerChannel=mults/p), ignore_index=True)
            
            mults = poly_eqns(coeffType, p, nByP)
            frame = frame.append(dict(FIR='PolyEqn', CoeffType=coeffType.name, Parallelism=p, N=(p*nByP), SubfiltN=nByP, Mults=mults, MultsPerChannel=mults/p), ignore_index=True)
            
            mults = countMults([uniqAbsTerms(h) for h in hpoly(p, ws)])
            frame = frame.append(dict(FIR='Poly' , CoeffType=coeffType.name, Parallelism=p, N=(p*nByP), SubfiltN=nByP, Mults=mults, MultsPerChannel=mults/p), ignore_index=True)

In [None]:
frame.to_csv('./outputs/coeffs_sym_and_eqn.csv', index=False, header=True)

Most importantly, let's check that our equation outputs match the symbolic reference _exactly_.

In [None]:
all(frame[frame['FIR']=='FFAEqn']['Mults'].values == frame[frame['FIR']=='FFA']['Mults'].values)

In [None]:
all(frame[frame['FIR']=='PolyEqn']['Mults'].values == frame[frame['FIR']=='Poly']['Mults'].values)

Woo hoo! We can also plot this if you'd like to explore the results a little more.

In [None]:
import plotly.express as px
px.line(frame, x='SubfiltN',y='MultsPerChannel', color='FIR', facet_row='Parallelism', facet_col='CoeffType',  category_orders={"CoeffType": ['NonLinPhase','Type1Pad','Type2','HalfBand']}, height=1000)

We'll re-run our equations only this time... just to generate a small set of results and output it to a set of CSV files for use in the paper, slides, and poster.

In [None]:
frame = pd.DataFrame(columns=['p', 'structure', 'coeff_type', 'N', 'mults'])

for p in [2,4,8,16]:
    for coeffType in CoeffType:
        for nByP in [1,2,3,4,5]:
            ws = genCoeffs(coeffType, p*nByP)
            frame = frame.append(dict(
                p=p,
                structure='FFA',
                coeff_type = coeffType.name,
                N=(p*nByP),
                mults=ffa_eqns(coeffType, p, nByP)
            ), ignore_index=True).append(dict(
                p=p,
                structure='Poly',
                coeff_type = coeffType.name,
                N=(p*nByP),
                mults=poly_eqns(coeffType, p, nByP)
            ), ignore_index=True)
            
frame = frame.pivot(index=['N'], columns=['structure','p', 'coeff_type'], values='mults')
frame.columns = ['_'.join([str(c) for c in col]) for col in frame.columns.values]
frame.to_csv(f'outputs/coeffs_eqns.csv', index = True, header=True)