[Home](../index.ipynb) / FFT
***
<span style="font-size:20pt;">FFT</span>

In [7]:
###############################################################################
# WindowedRingBuffer
#
# A RingBuffer where the values are "windowed" with a Nuttall-windows.
###############################################################################

from math import pi, cos

class WindowedRingBuffer :
    def __init__(self,length):
        self.length   = length
        self.iLast    = length
        self.aData    = [0]*length # [i for i in range( length )]

        # Nuttall window, see https://en.wikipedia.org/wiki/Window_function#Hann_and_Hamming_windows
        self.aWindow  = [0.355768-0.487396*cos(2*pi*n/length)+0.144232*cos(4*pi*n/length)-0.012604*cos(6*pi*n/length) for n in range(length)] # Hanning window
                        # Hanning = 0.5 – 0.5*cos(2*pi * n / length )

    def get(self, iIndex):
        return self.aData[ (self.iLast + iIndex) % self.length ]*self.aWindow[ iIndex ]
        
    def add(self,val):
        self.iLast = (self.iLast - 1) % self.length
        self.aData[self.iLast] = val

    def __str__(self) :
        return( "{} {}".format( self.__class__, [ self.get(iIndex) for iIndex in range(self.length) ] ) )

In [8]:
###############################################################################
# Timer
#
# Measuring time used by a process.
###############################################################################

# Usefull class to get elapsed time:
import time

class Timer():
    def __init__(self, name=None):
        self.name = name
        self.getTime    = time.time_ns   # Python
        self.TIME_SCALE = 10E9
#        self.getTime    = time.ticks_ms # MicroPython
#        self.TIME_SCALE = 10E6
    def __enter__(self):
        self.start()
        
    def __exit__(self, exc_type, exc_value, exc_traceback):
        self.stop()

    def start( self ):
        self.t_start = self.getTime()

    def stop( self ):
        if self.name:
            print('[{}]'.format(self.name), end=" " )
            
        print('Total time: {} s'.format((self.getTime() - self.t_start)/self.TIME_SCALE))
        
#==============================================================================
# Usage:        
#with Timer():
#    print( "do" )


In [9]:
###############################################################################
# ValueSource
#
# Wrapper for getting data.
###############################################################################


class ValueSource :
    def getValue( self ):
        return 0
    
    
    def iter( self, length = -1 ):
        if  length < 0 :
            while True:
                yield self.getValue()
        else:
            for iIndex in range( length ):
                yield self.getValue()
    

#==============================================================================

import wave
import struct

class WavValueSource( ValueSource ):
    
    def __init__( self, framesPerGet = 1 ):
        self.objSoundFile      = None
        self.framesPerGet      = framesPerGet
        self.iNumberOfFrames   = 1
        self.iNumberOfChannels = 1
        self.samplesPerGet     = 1
        
    def __enter__(self):
        self.open()
        
    def __exit__(self, exc_type, exc_value, exc_traceback):
        self.close()

    def open( self ):
        self.objSoundFile = wave.open( "sample.wav", 'r' )
        print( self.objSoundFile )
        self.iNumberOfFrames   = self.objSoundFile.getnframes()
        self.iNumberOfChannels = self.objSoundFile.getnchannels()
        self.samplesPerGet     = self.iNumberOfChannels*self.framesPerGet
        
        #self.objSoundFile.getsampwidth()
        #self.objSoundFile.getframerate()
        #self.objSoundFile.getparams   ()
        
        return self
    
    def close(self):
        try:
            self.objSoundFile.close()
            self.objSoundFile = None

        except: pass

    def getValue( self ):
        return int(sum(struct.unpack("%ih" % self.samplesPerGet, self.objSoundFile.readframes(self.framesPerGet) ))/(self.samplesPerGet)+0.5)
    

#==============================================================================
# Usage:
#N_SAMPLE = 128
#
#import matplotlib.pyplot as plt
#
#aX = [x for x in range(N_SAMPLE)]
#with WavValueSource( 16 ) as source:
#    aY = [ s for s in source.iter( N_SAMPLE ) ]
#    
## print( aY )
#plt.figure(figsize=(20, 2))
#plt.scatter(aX, aY, s=1, marker='o', color="black" )
#plt.show()
    
    

In [10]:
###############################################################################
# FFT
#
# fft(bufXIn) bufXIn has to implement get( iIndex ).
###############################################################################


import math

class FFT:
    TWO_PI = 2 * math.pi
    
    def __init__( self, iSamples = 128 ):
        self.iSamples = iSamples
        
        self.yIn  = [0]*iSamples
        self.xOut = [0]*iSamples
        self.yOut = [0]*iSamples

        self.bufX = WindowedRingBuffer( iSamples )

    def fft( self, bufXIn, direction = 1 ):
        n = bufXIn.length
        factor = self.TWO_PI * direction

        for i in range( n ):
            self.xOut[i] = 0
            self.yOut[i] = 0

            arg = factor * i / n

            for k in range( n ):
                a = k * arg
                cosarg = math.cos(a);
                sinarg = math.sin(a);

                self.xOut[i] += ( bufXIn.get(k) * cosarg - self.yIn[k] * sinarg )
                self.yOut[i] += ( bufXIn.get(k) * sinarg + self.yIn[k] * cosarg )

        return self.yOut

    
#    def fft( xIn, yIn, xOut, yOut, direction = -1 ):
#        n = xIn.length
#        factor = -direction * self.TWO_PI
#
#        for i in range( n ):
#            self.xOut[i] = 0
#            self.yOut[i] = 0
#
#            arg = factor * i / n
#
#            for k in range( n ):
#                a = k * arg
#                cosarg = math.cos(a);
#                sinarg = math.sin(a);
#
#                xOut[i] += ( xIn.get(k) * cosarg - yIn[k] * sinarg )
#                yOut[i] += ( xIn.get(k) * sinarg + yIn[k] * cosarg )
#
#        return (xOut, yOut)
    

In [11]:
import matplotlib.animation
import matplotlib.pyplot as plt
import numpy as np
plt.rcParams["animation.html"] = "jshtml"
plt.rcParams['figure.dpi'] = 150  
plt.ioff()

figure, ax = plt.subplots()

N_SAMPLE = 2**8
#aX = np.linspace(0,10,100)

bufferX = WindowedRingBuffer( N_SAMPLE )
fft     = FFT( N_SAMPLE )

aX      = [x for x in range(N_SAMPLE)]

timer = Timer()
timer.start()
source = WavValueSource()
source.open()

for iIndex in range( N_SAMPLE -1):
    bufferX.add(source.getValue())

#print( bufferX )

def animate(t):
    global source
    
    #print( source.objSoundFile )
    
    global bufferX
    global fft
    plt.cla()
    bufferX.add(source.getValue())
    #print( bufferX )
    fft.fft( bufferX )
    plt.scatter(aX[:int(len(fft.yOut)/20)], fft.yOut[:int(len(fft.yOut)/20)], s=1, marker='o', color="black" )
    plt.ylim(-3000,3000)

anim = matplotlib.animation.FuncAnimation(figure, animate )#, frames=50)

#timer.stop()    

HTML(anim.to_jshtml())
#HTML(anim.to_html5_video())

#source.close()
#print( "Done." )
#=========================================
# Save animation as video (if required)
# ani.save('dynamic_images.mp4')

<wave.Wave_read object at 0x7f7f387d8c70>
