Spinorama plot with Altair

In [1]:
import math
import pandas as pd
import altair as alt
import numpy as np

import sys,os,os.path
sys.path.append(os.path.expanduser('../src'))

from spinorama.load import graph_melt
from spinorama.load_parse import parse_all_speakers, parse_graphs_speaker
from spinorama.compute_normalize import normalize_mean, normalize_graph
from spinorama.graph import graph_params_default, graph_freq

# df = parse_graphs_speaker('../datas', 'Adam', 'Adam S2V', 'klippel')
# df = parse_graphs_speaker('../datas', 'Neumann', 'Neumann KH 80', 'klippel')
df = parse_graphs_speaker('../datas', 'Genelec', 'Genelec 8341A', 'klippel')
# df = parse_graphs_speaker('../datas', 'Genelec', 'Genelec 8030A', 'princeton')
# print(df)

In [2]:
nearest = alt.selection(
    type='single',
    nearest=True,
    on='mouseover',
    fields=['Freq'],
    empty='none')

In [31]:
def graph_spinorama(dfu, graph_params):
    xmin = graph_params['xmin']
    xmax = graph_params['xmax']
    ymin = graph_params['ymin']
    ymax = graph_params['ymax']
    if xmax == xmin:
        logging.error('Graph configuration is incorrect: xmin==xmax')
    if ymax == ymin:
        logging.error('Graph configuration is incorrect: ymin==ymax')
    # add selectors                                                                                                                                      
    selectorsMeasurements = alt.selection_multi(
        fields=['Measurements'],
        bind='legend')
    scales = alt.selection_interval(
        bind='scales'
    )
    # main charts                                                                                                                                        
    xaxis =alt.X('Freq:Q', title='Freqency (Hz)',
                scale=alt.Scale(type='log', base=10, nice=False, domain=[xmin, xmax]),
                axis=alt.Axis(format='s'))
    yaxis = alt.Y('dB:Q', title='Sound Pressure (dB)',
                  scale=alt.Scale(domain=[ymin, ymax]))
    di_yaxis = alt.Y('dB:Q', title='Sound Pressure DI (dB)',
                    scale=alt.Scale(domain=[-5,ymax-ymin-5]))
    color = alt.Color('Measurements', type='nominal', sort=None)
    opacity = alt.condition(selectorsMeasurements, alt.value(1), alt.value(0.2))

    chart_dfu = alt.Chart(dfu)
    
    chart_spin = chart_dfu.transform_filter(
        alt.FieldOneOfPredicate(
            field='Measurements',
            oneOf=['On Axis', 'Listening Window', 'Early Reflections', 'Sound Power']))
    
    line = chart_spin.mark_line().encode(x=xaxis, y=yaxis, color=color, opacity=opacity)
    circle = chart_spin.mark_circle(size=100).encode(
            x=xaxis, y=yaxis, color=color,
            opacity=alt.condition(nearest, alt.value(1), alt.value(0)),
            tooltip=['Measurements', 'Freq', 'dB']
    )
    
    chart_di = chart_dfu.transform_filter(
        alt.FieldOneOfPredicate(
            field='Measurements',
            oneOf=['Early Reflections DI', 'Sound Power DI']))

    di = chart_di.mark_line().encode(x=xaxis, y=di_yaxis, color=color, opacity=opacity)
    circle_di = chart_di.mark_circle(size=100).encode(
        x=xaxis, y=di_yaxis, color=color,
        opacity=alt.condition(nearest, alt.value(1), alt.value(0)),
        tooltip=['Measurements', 'Freq', 'dB']
    )

    # assemble elements together                                                                                                                         
    spin = alt.layer(alt.layer(line+circle), alt.layer(di+circle_di)).resolve_scale(y='independent'
    ).add_selection(selectorsMeasurements
    ).add_selection(scales
    ).add_selection(nearest
    ).properties(width=graph_params['width'], height=graph_params['height']  
    ).interactive()
    return spin



dfu = df['CEA2034']
params = graph_params_default
params['ymin'] = -40
params['ymax'] = 10
params['width'] = 800
params['height'] = 500
graph_spinorama(dfu, params)

Compare computed sound power wrt the version done by Klippel
Matching is good for low frequency but error increases with frequency

In [None]:
from math import log10, isnan

# from the standard appendix                                                                                                                             
# weigth http://emis.impa.br/EMIS/journals/BAG/vol.51/no.1/b51h1san.pdf                                                                                  
sp_weigths = {
    'On Axis': 0.000604486,
       '180°': 0.000604486,
    #                                                                                                                                                    
    '10°':   0.004730189,
    '170°':  0.004730189,
    '-170°': 0.004730189,
    '-10°':  0.004730189,
    #                                                                                                                                                    
    '20°':   0.008955027,
    '160°':  0.008955027,
    '-160°': 0.008955027,
    '-20°':  0.008955027,
    #                                                                                                                                                    
    '30°':   0.012387354,
    '150°':  0.012387354,
    '-150°': 0.012387354,
    '-30°':  0.012387354,
    #                                                                                                                                                    
    '40°':   0.014989611,
    '140°':  0.014989611,
    '-140°': 0.014989611,
    '-40°':  0.014989611,
    #                                                                                                                                                    
    '50°':   0.016868154,
    '130°':  0.016868154,
    '-130°': 0.016868154,
    '-50°':  0.016868154,
    #                                                                                                                                                    
    '60°':   0.018165962,
    '120°':  0.018165962,
    '-120°': 0.018165962,
    '-60°':  0.018165962,
    #                                                                                                                                                    
    '70°':   0.019006744,
    '110°':  0.019006744,
    '-110°': 0.019006744,
    '-70°':  0.019006744,
    #                                                                                                                                                    
    '80°':   0.019477787,
    '100°':  0.019477787,
    '-100°': 0.019477787,
    '-80°':  0.019477787,
    #                                                                                                                                                    
    '90°':   0.019629373,
    '-90°':  0.019629373,
}

sp_weigths_hv = {}
for k,v in sp_weigths.items():
    sp_weigths_hv[k] = v
    sp_weigths_hv['{0}_h'.format(k)] = v
    sp_weigths_hv['{0}_v'.format(k)] = v

def spl2pressure(spl: float) -> float:
    try:
        p = pow(10, (spl-105.0)/20.0)
        return p
    except TypeError as e:
        print('spl={0} e={1}'.format(spl, e))
        logging.error('spl={0} e={1}'.format(spl, e))


def pressure2spl(p: float) -> float:
    if p<0.0:
        print('pressure is negative')
    return 105.0+20.0*log10(p)


def column_trim(c):
    if c[-2:] == '_v' or c[-2:] == '_h':
        return c[:-2]
    return c


def column_valid(c):
    if c[0] == 'O':
        return True
    elif c[0] == 'F':
        return False
    elif int(column_trim(c)[:-1]) % 10 == 0:
        return True
    return False

def spatial_average(sp_window, func='rms'):
    sp_cols = sp_window.columns
    result = pd.DataFrame({
        'Freq': sp_window.Freq,
    })
    
    def weighted_rms(spl):
        avg = [sp_weigths_hv[c] * spl[c]**2 for c in sp_cols if column_valid(c)]
        wsm = [sp_weigths_hv[c] for c in sp_cols if column_valid(c)]
        return np.sqrt(np.sum(avg)/np.sum(wsm))

    def rms(spl):
        avg = [spl[c]**2 for c in sp_cols if column_valid(c)]
        n = len(avg)
        r = np.sqrt(np.sum(avg)/n)
        if isnan(r):
            print(spl)
        return r

    if func == 'rms':
        result['dB'] = sp_window\
            .drop(columns=['Freq'])\
            .apply(spl2pressure)\
            .apply(rms, axis=1)\
            .apply(pressure2spl)
    elif func == 'weighted_rms':
        result['dB'] = sp_window\
            .drop(columns=['Freq'])\
            .apply(spl2pressure)\
            .apply(weighted_rms, axis=1)\
            .apply(pressure2spl)

    shape = result.shape
    # if shape[0] != 200 or shape[1] != 2:
    #     print('Shape is {0},{1}; func={2} columns={2}'.format(shape[0], shape[1], func, sp_cols))
    #     print(result)
        
    return result.reset_index(drop=True)


def spatial_average1(spl, sel, func='rms'):
    if spl is None:
        return None
    spl_window = spl[[c for c in spl.columns if c in sel]]
    # print(spl_window.iloc[0:3])
    sa = spatial_average(spl_window, func)
    # print(sa.iloc[0:3])
    return sa


def spatial_average2(h_spl: pd.DataFrame, h_sel, 
                     v_spl: pd.DataFrame, v_sel,
                     func='rms') -> pd.DataFrame:
    if v_spl is None and h_spl is None:
        return None
    if v_spl is None:
        return spatial_average1(h_spl, h_sel, func)
    if h_spl is None:
        return spatial_average1(v_spl, v_sel, func)
    h_spl_sel = h_spl[[c for c in h_spl.columns if c in h_sel]]
    v_spl_sel = v_spl[[c for c in v_spl.columns if c in v_sel]]
    sp_window = h_spl_sel.merge(
        v_spl_sel,
        left_on='Freq', right_on='Freq', suffixes=('_h', '_v')
    )
    # print(sp_window.iloc[0:3])
    sa = spatial_average(sp_window, func)
    # print(sa.iloc[0:3])
    return sa


def sound_power(h_spl: pd.DataFrame, v_spl: pd.DataFrame) -> pd.DataFrame:
    # Sound Power                                                                                                                                        
    # The sound power is the weighted rms average of all 70 measurements,                                                                                
    # with individual measurements weighted according to the portion of the                                                                              
    # spherical surface that they represent. Calculation of the sound power                                                                              
    # curve begins with a conversion from SPL to pressure, a scalar magnitude.                                                                           
    # The individual measures of sound pressure are then weighted according                                                                              
    # to the values shown in Appendix C and an energy average (rms) is                                                                                   
    # calculated using the weighted values. The final average is converted                                                                               
    # to SPL.      
    
    h_cols = h_spl.columns
    v_cols = v_spl.columns #.drop(['On Axis', '180°'])
    # print(len(h_cols)+len(v_cols)) == 72 ok with 2x Freq
    return spatial_average2(h_spl, h_cols, v_spl, v_cols, 'weighted_rms')

sp = sound_power(df['SPL Horizontal_unmelted'], df['SPL Vertical_unmelted'])


In [None]:
dfu = df['CEA2034_unmelted']
check = pd.DataFrame({
    'Freq': dfu.Freq, 
    'Control': dfu['Sound Power'], 
    'Computed': sp.dB })
mcheck = graph_melt(check)
alt.Chart(mcheck).mark_line(clip=True).encode(
    x=alt.X('Freq', scale=alt.Scale(type='log', nice=False)),
    y=alt.Y('dB', scale=alt.Scale(domain=[80, 93])),
    color=alt.Color('Measurements')
).properties(width=800)


In [None]:
onaxis = df['CEA2034']
onaxis = onaxis.loc[onaxis['Measurements'] == 'On Axis']
onaxis_graph = graph_freq(onaxis, graph_params_default)
onaxis_reg = alt.Chart(onaxis).transform_filter(
   'datum.Freq>80 & datum.Freq<10000',
).transform_regression(method='log', on='Freq', regression='dB', extent=[20,20000]
).mark_line().encode(
   alt.X('Freq:Q'),
   alt.Y('dB:Q'),
   color=alt.value('red')
)
onaxis_graph + onaxis_reg


In [None]:
inroom = df['Estimated In-Room Response']
inroom_graph = graph_freq(inroom, graph_params_default)
inroom_reg = alt.Chart(inroom)\
.transform_filter('datum.Freq>80 & datum.Freq<10000')\
.transform_regression(method='log', on='Freq', regression='dB', extent=[20,20000])

inroom_reg3 = inroom_reg.transform_calculate(dBm3=alt.datum.dB-3
).transform_calculate(dBp3=alt.datum.dB+3
).transform_calculate(text='"Band ±3dB"'
)

inroom_reg1 = inroom_reg.transform_calculate(dBm1=alt.datum.dB-1.5
).transform_calculate(dBp1=alt.datum.dB+1.5
).transform_calculate(text='"Band ±1.5dB"')

inroom_reg = inroom_reg.transform_calculate(text='"Linear Regression"')

line = inroom_reg.mark_line().encode(
    x=alt.X('Freq:Q'),
    y=alt.Y('dB:Q', axis=alt.Axis(title='dB SPL')),
    color=alt.Color(
        'text:O', 
        scale=alt.Scale(
            domain=['Linear Regression', 'Band ±3dB', 'Band ±1.5dB'], 
            range=['firebrick', 'firebrick', 'firebrick']),
        legend=alt.Legend(title='Regression')))

err3 = inroom_reg3.mark_area(opacity=0.1).encode(x='Freq:Q', y='dBm3:Q', y2='dBp3:Q', color=alt.Color('text:O'))
err1 = inroom_reg1.mark_area(opacity=0.2).encode(x='Freq:Q', y='dBm1:Q', y2='dBp1:Q', color=alt.Color('text:O'))

alt.layer(err3+err1+line, inroom_graph).resolve_scale(color='independent')

In [None]:
from src.spinorama.cea2034 import early_reflections

early_computed = early_reflections(df['SPL Horizontal_unmelted'], df['SPL Vertical_unmelted'])
# print(early_computed)
early_computed_melted = graph_melt(early_computed)
mean = normalize_mean(graph_melt(dfu))
early_computed_normed = normalize_graph(early_computed_melted, mean)
# print(early_computed_normed)

# get klippel computed value from an ASR measurement
early_klippel = df['Early Reflections']

for item in ('Rear Wall Bounce', 'Front Wall Bounce', 'Floor Bounce', 'Ceiling Bounce', 'Side Wall Bounce', 'Total Early Reflection'):
    item_computed = early_computed_normed.loc[early_computed_normed['Measurements'] == item].reset_index()
    item_klippel = early_klippel.loc[early_klippel['Measurements'] == item].reset_index()
    # print(item_computed.shape, item_klippel.shape)
    # print(item_computed)

    check_item = pd.DataFrame({
        'Freq': item_klippel.Freq,
        'Control': item_klippel.dB,
        'Computed': item_computed.dB-5 })
    # print(check_item)
    g_item = alt.Chart(graph_melt(check_item)
    ).mark_line(clip=True
    ).encode(
        x=alt.X('Freq', scale=alt.Scale(type='log', nice=False)),
        y=alt.Y('dB'),
        color=alt.Color('Measurements')
    ).properties(width=600)
    print('{0} done'.format(item))
    g_item.display()





In [None]:
from src.spinorama.compute_cea2034 import horizontal_reflections

hr_computed = horizontal_reflections(df['SPL Horizontal_unmelted'], df['SPL Vertical_unmelted'])
hr_computed_melted = graph_melt(hr_computed)
mean = normalize_mean(graph_melt(dfu))
hr_computed_normed = normalize_graph(hr_computed_melted, mean)

# get klippel computed value from an ASR measurement
hr_klippel = df['Horizontal Reflections']

for item in ('Front', 'Side', 'Rear'):
    item_computed = hr_computed_normed.loc[hr_computed_normed['Measurements'] == item].reset_index()
    item_klippel = hr_klippel.loc[hr_klippel['Measurements'] == item].reset_index()
    # print(item_computed.shape, item_klippel.shape)
    # print(item_computed)

    check_item = pd.DataFrame({
        'Freq': item_klippel.Freq,
        'Control': item_klippel.dB,
        'Computed': item_computed.dB-5 })
    # print(check_item)
    g_item = alt.Chart(graph_melt(check_item)
    ).mark_line(clip=True
    ).encode(
        x=alt.X('Freq', scale=alt.Scale(type='log', nice=False)),
        y=alt.Y('dB'),
        color=alt.Color('Measurements')
    ).properties(width=600)
    print('{0} done'.format(item))
    g_item.display()

In [None]:
from src.spinorama.compute_cea2034 import vertical_reflections

vr_computed = vertical_reflections(df['SPL Horizontal_unmelted'], df['SPL Vertical_unmelted'])
vr_computed_melted = graph_melt(vr_computed)
mean = normalize_mean(graph_melt(dfu))
vr_computed_normed = normalize_graph(vr_computed_melted, mean)

# get klippel computed value from an ASR measurement
vr_klippel = df['Vertical Reflections']

for item in ('Floor Reflection', 'Ceiling Reflection'):
    item_computed = vr_computed_normed.loc[vr_computed_normed['Measurements'] == item].reset_index()
    item_klippel = vr_klippel.loc[vr_klippel['Measurements'] == item].reset_index()
    # print(item_computed.shape, item_klippel.shape)
    # print(item_computed)

    check_item = pd.DataFrame({
        'Freq': item_klippel.Freq,
        'Control': item_klippel.dB,
        'Computed': item_computed.dB-5 })
    # print(check_item)
    g_item = alt.Chart(graph_melt(check_item)
    ).mark_line(clip=True
    ).encode(
        x=alt.X('Freq', scale=alt.Scale(type='log', nice=False)),
        y=alt.Y('dB'),
        color=alt.Color('Measurements')
    ).properties(width=600)
    print('{0} done'.format(item))
    g_item.display()

In [None]:
from src.spinorama.compute_cea2034 import compute_cea2034

cea2034_computed = compute_cea2034(df['SPL Horizontal_unmelted'], df['SPL Vertical_unmelted'])
cea2034_computed_melted = graph_melt(cea2034_computed)
mean = normalize_mean(graph_melt(dfu))
cea2034_computed_normed = normalize_graph(cea2034_computed_melted, mean)

# get klippel computed value from an ASR measurement
cea2034_klippel = df['CEA2034']

for item in ('On Axis', 'Listening Window', 'Early Reflections', 'Sound Power', 'Sound Power DI', 'Early Reflections DI'):
    item_computed = cea2034_computed_normed.loc[cea2034_computed_normed['Measurements'] == item].reset_index()
    item_klippel = cea2034_klippel.loc[cea2034_klippel['Measurements'] == item].reset_index()
    #print(item_computed.shape, item_klippel.shape)
    #print(item_computed)

    check_item = pd.DataFrame({
        'Freq': item_klippel.Freq,
        'Control': item_klippel.dB,
        'Computed': item_computed.dB-5 })
    # print(check_item)
    g_item = alt.Chart(graph_melt(check_item)
    ).mark_line(clip=True
    ).encode(
        x=alt.X('Freq', scale=alt.Scale(type='log', nice=False)),
        y=alt.Y('dB'),
        color=alt.Color('Measurements')
    ).properties(width=600)
    print('{0} done'.format(item))
    g_item.display()