-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
186 lines (156 loc) · 5.53 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import librosa
import math
import time
import numpy as np
from fastapi import FastAPI
class Args:
def __init__(self, filename, window, thres, decimation, n_fft,
win_length, hop_length, use_sine, plot):
self.filename = filename
self.window = window
self.thres = thres
self.decimation = decimation
self.n_fft = n_fft
self.win_length = win_length
self.hop_length = hop_length
self.use_sine = use_sine
self.plot = plot
app = FastAPI()
@app.get("/bpm/")
async def get_bpm(filename: str):
args = Args(filename, 4, 0.07, 16, 512, 512, 128, False, False)
bpm, offset = process_file(args)
return {"bpm": bpm, "offset": offset}
def gen_sin():
fs = 44100
samps = librosa.tone(5678, duration=10, sr=fs)
return samps, fs
def gcd_spb(spb_candidate, spb_peak):
spb_candidate = np.sort(spb_candidate)
if len(spb_candidate) == 0:
return -1
min_rel_err = float('inf')
result = -1
# Try for every simple fractions
for i in range(1, 10):
for j in range(1, 10):
coef = i / j
unit = spb_peak * coef
# Prefer whole BPM
rel_err = abs(round(60 / unit) - 60 / unit)
# Limit BPM to 80-240
if 60 / unit > 240 or 60 / unit < 80:
continue
last_spb = -1
for val in spb_candidate:
if abs(val - last_spb) < 0.01:
continue
rel_err += 0 if abs(round(val / unit) -
val / unit) < 0.01 else 1
last_spb = val
if rel_err < min_rel_err:
min_rel_err = rel_err
result = unit
return result
def detect_bpm(data, fs, args):
decimation = args.decimation
min_ndx = math.floor(60.0 / 240 * (fs / decimation))
max_ndx = math.floor(60.0 / 30 * (fs / decimation))
# Downsample
remainder = len(data) % decimation
zeros_needed = decimation - remainder
data = np.pad(data, (0, zeros_needed), mode='constant')
data = np.max(data.reshape(-1, decimation), axis=1)
# Normalize
data = data - np.mean(data)
# ACF
correl = np.correlate(data, data, "full")
midpoint = len(correl) // 2
correl_midpoint_tmp = correl[midpoint + min_ndx:midpoint + max_ndx]
# Weaken higher tempo
mult = midpoint - np.arange(min_ndx, max_ndx)
correl_midpoint_tmp = correl_midpoint_tmp / mult
# Normalize
correl_midpoint_tmp = correl_midpoint_tmp / \
np.linalg.norm(correl_midpoint_tmp)
# Detect candidate
high_ndx = np.argwhere(correl_midpoint_tmp > args.thres)
high_ndx_adjusted = high_ndx + min_ndx
spb_candidate = high_ndx_adjusted / (fs / decimation)
# Detect peak
peak_ndx = np.argmax(correl_midpoint_tmp)
peak_ndx_adjusted = peak_ndx + min_ndx
spb_peak = peak_ndx_adjusted / (fs / decimation)
correl = correl_midpoint_tmp[peak_ndx]
# Get seconds per beat
n = np.arange(0, len(correl_midpoint_tmp))
n = (n + min_ndx) / (fs / decimation)
# Return
return spb_candidate, spb_peak, correl
def process_file(args):
print("Loading file...")
initial_time = time.time()
samps, fs = gen_sin() if args.use_sine else librosa.load(args.filename)
data = []
spbc = 0
peak_correl = 0
peak_spb = 0
peak_samp_ndx = 0
spb_candidate = np.array([], dtype=np.float64)
bpm = 0
nsamps = len(samps)
window_samps = int(args.window * fs)
samps_ndx = 0
max_window_ndx = math.floor(nsamps / window_samps)
# Iterate through all windows, collect spb candidates and peak
print("Doing auto correlation...")
for window_ndx in range(0, max_window_ndx):
data = samps[samps_ndx: samps_ndx + window_samps]
if not ((len(data) % window_samps) == 0):
raise AssertionError(str(len(data)))
spbc, spbp, correl = detect_bpm(data, fs, args)
if spbc is None:
continue
spb_candidate = np.append(spb_candidate, spbc)
if correl > peak_correl:
peak_correl = correl
peak_spb = spbp
peak_samp_ndx = samps_ndx
samps_ndx = samps_ndx + window_samps
# Calculate BPM by GCD
spb = gcd_spb(spb_candidate, peak_spb)
bpm = 60 / spb
rounded_bpm = round(bpm)
rel_err = bpm - rounded_bpm
# Calculate offset by onset algorithm
print("Calculating offset...")
data = samps[peak_samp_ndx: peak_samp_ndx + window_samps]
onset_env = librosa.onset.onset_strength(
y=data,
sr=fs,
hop_length=args.hop_length,
n_fft=args.n_fft,
win_length=args.win_length
)
onset_env = np.gradient(onset_env)
onset_env = onset_env / np.max(onset_env)
onset_trim = 8
onset_env = onset_env[onset_trim:]
raw_offset_ndx = np.argmax(onset_env)
onset_offset = args.n_fft // (2 * args.hop_length)
offset_ndx = (raw_offset_ndx + onset_trim) * args.hop_length + \
peak_samp_ndx - onset_offset
offset = offset_ndx / fs
modded_offset = offset % spb * 1000
# Print and return the results
elapsed_time = time.time() - initial_time
if bpm <= 0:
print(f"Failed to get BPM: {rounded_bpm}")
else:
print("Completed!")
print("- Beats Per Minute: %d" % rounded_bpm)
print("- BPM Error: %.2f" % rel_err)
print("- Offset: %.1fms" % modded_offset)
print("- Offset Error: %.1fms" % (args.hop_length / fs * 1000))
print("- Run Time: %.1fs" % elapsed_time)
return bpm, modded_offset