This repository has been archived by the owner on Dec 15, 2023. It is now read-only.
/
virtual_awg.py
811 lines (675 loc) · 33.1 KB
/
virtual_awg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
# -*- coding: utf-8 -*-
"""
Created on Wed Aug 31 13:04:09 2016
@author: diepencjv
"""
# %%
import numpy as np
import scipy.signal
import logging
import warnings
import qcodes
from qcodes import Instrument
from qcodes.plots.pyqtgraph import QtPlot
from qcodes.data.data_array import DataArray
import qtt
import qtt.utilities.tools
logger = logging.getLogger(__name__)
# %%
class virtual_awg(Instrument):
"""
Attributes:
_awgs (list): handles to instruments
awg_map (dict)
hardware (Instrument): contains AWG to plunger values
corr (float): unknown
delay_FPGA (float): time delay of signals going through fridge
"""
def __init__(self, name, instruments=[], awg_map=None, hardware=None, verbose=1, **kwargs):
super().__init__(name, **kwargs)
logger.info('initialize virtual_awg %s' % name)
self._awgs = instruments
self.awg_map = awg_map
self.hardware = hardware
self.verbose = verbose
self.delay_FPGA = 2.0e-6 # should depend on filterboxes
self.corr = .0 # legacy code, specific for FPGA board not used any more
self.maxdatapts = 16e6 # This used to be set to the fpga maximum, but that maximum should not be handled here
self.awg_seq = None
if len(self._awgs) == 0 and self.verbose:
print('no physical AWGs connected')
elif len(self._awgs) == 1:
self.awg_cont = self._awgs[0]
self.awg_cont.set('run_mode', 'CONT')
elif len(self._awgs) == 2 and 'awg_mk' in self.awg_map:
self.awg_cont = self._awgs[self.awg_map['awg_mk'][0]]
self.awg_cont.set('run_mode', 'CONT')
self.awg_seq = self._awgs[(self.awg_map['awg_mk'][0] + 1) % 2]
self._set_seq_mode(self.awg_seq)
self.delay_AWG = self.hardware.parameters['delay_AWG'].get()
else:
raise Exception(
'Configuration of AWGs not supported by virtual_awg instrument')
self.AWG_clock = 1e8
self.ch_amp = 4.0
for awg in self._awgs:
awg.set('clock_freq', self.AWG_clock)
awg.delete_all_waveforms_from_list()
for i in range(1, 5):
awg.set('ch%s_amp' % i, self.ch_amp)
def _set_seq_mode(self, a):
a.set('run_mode', 'SEQ')
a.sequence_length.set(1)
a.set_sqel_trigger_wait(1, 0)
def get_idn(self):
''' Overrule because the default VISA command does not work '''
IDN = {'vendor': 'QuTech', 'model': 'virtual_awg',
'serial': None, 'firmware': None}
return IDN
def awg_gate(self, gate):
""" Return true of the gate can be controlled by the awg
Args:
gate ()
"""
if gate is None:
return False
if isinstance(gate, dict):
# vector scan, assume we can do it fast if all components are fast
return np.all([self.awg_gate(g) for g in gate])
if self.awg_map is None:
return False
if gate in self.awg_map:
return True
else:
return False
def stop(self, verbose=0):
''' Stops all AWGs and turns of all channels '''
for awg in self._awgs:
awg.stop()
for i in range(1, 5):
awg.set('ch%d_state' % i, 0)
if verbose:
print('Stopped AWGs')
def sweep_init(self, waveforms, period=1e-3, delete=True, samp_freq=None):
''' Send waveform(s) to gate(s)
Arguments:
waveforms (dict): the waveforms with the gates as keys
period (float): period of the waveform in seconds
Returns:
sweep_info (dict): the keys are tuples of the awgs and channels to activate
Example:
--------
>> sweep_info = sweep_init(waveforms)
'''
sweepgates = [g for g in waveforms]
if delete:
for awg in self._awgs:
awg.delete_all_waveforms_from_list()
awgs = [self._awgs[self.awg_map[g][0]] for g in sweepgates]
if 'fpga_mk' in self.awg_map:
marker_info = self.awg_map['fpga_mk']
marker_delay = self.delay_FPGA
marker_name = 'fpga_mk'
elif 'm4i_mk' in self.awg_map:
marker_info = self.awg_map['m4i_mk']
if samp_freq is not None:
pretrigger_period = 16 / samp_freq
else:
pretrigger_period = 0
marker_delay = self.delay_FPGA + pretrigger_period
marker_name = 'm4i_mk'
awgs.append(self._awgs[marker_info[0]])
sweep_info = dict()
wave_len = len(waveforms[sweepgates[0]]['wave'])
for g in sweepgates:
sweep_info[self.awg_map[g]] = dict()
sweep_info[self.awg_map[g]]['waveform'] = waveforms[g]['wave']
sweep_info[self.awg_map[g]]['marker1'] = np.zeros(wave_len)
sweep_info[self.awg_map[g]]['marker2'] = np.zeros(wave_len)
if 'name' in waveforms[g]:
sweep_info[self.awg_map[g]]['name'] = waveforms[g]['name']
else:
sweep_info[self.awg_map[g]]['name'] = 'waveform_%s' % g
if marker_info[:2] == self.awg_map[g]:
sweep_info[marker_info[:2]]['delay'] = marker_delay
# marker points
marker_points = np.zeros(wave_len)
marker_points[int(marker_delay * self.AWG_clock):(int(marker_delay * self.AWG_clock) + wave_len // 20)] = 1.0
if marker_info[:2] not in sweep_info:
sweep_info[marker_info[:2]] = dict()
sweep_info[marker_info[:2]]['waveform'] = np.zeros(wave_len)
sweep_info[marker_info[:2]]['marker1'] = np.zeros(wave_len)
sweep_info[marker_info[:2]]['marker2'] = np.zeros(wave_len)
for g in sweepgates:
marker_name += '_%s' % g
sweep_info[marker_info[:2]]['name'] = marker_name
sweep_info[marker_info[:2]]['delay'] = marker_delay
sweep_info[marker_info[:2]]['marker%d' % marker_info[2]] = marker_points
self._awgs[marker_info[0]].set(
'ch%i_m%i_low' % (marker_info[1], marker_info[2]), 0)
self._awgs[marker_info[0]].set(
'ch%i_m%i_high' % (marker_info[1], marker_info[2]), 2.6)
# awg marker
if getattr(self, 'awg_seq', None) is not None:
awg_info = self.awg_map['awg_mk']
if awg_info[:2] not in sweep_info:
awgs.append(self._awgs[awg_info[0]])
sweep_info[awg_info[:2]] = dict()
sweep_info[awg_info[:2]]['waveform'] = np.zeros(wave_len)
sweep_info[awg_info[:2]]['marker1'] = np.zeros(wave_len)
sweep_info[awg_info[:2]]['marker2'] = np.zeros(wave_len)
sweep_info[awg_info[:2]]['name'] = 'awg_mk'
awg_marker = np.zeros(wave_len)
awg_marker[0:wave_len // 20] = 1
awg_marker = np.roll(
awg_marker, wave_len - int(self.delay_AWG * self.AWG_clock))
sweep_info[awg_info[:2]]['marker%d' %
self.awg_map['awg_mk'][2]] = awg_marker
self._awgs[awg_info[0]].set(
'ch%i_m%i_low' % (awg_info[1], awg_info[2]), 0)
self._awgs[awg_info[0]].set(
'ch%i_m%i_high' % (awg_info[1], awg_info[2]), 2.6)
# send waveforms
if delete:
for sweep in sweep_info:
try:
self._awgs[sweep[0]].send_waveform_to_list(sweep_info[sweep]['waveform'], sweep_info[
sweep]['marker1'], sweep_info[sweep]['marker2'], sweep_info[sweep]['name'])
except Exception as ex:
print(ex)
print('sweep_info[sweep][waveform] %s' % (sweep_info[sweep]['waveform'].shape,))
print('sweep_info[sweep][marker1] %s' % (sweep_info[sweep]['marker1'].shape,))
print('sweep_info[sweep][marker2] %s' % (sweep_info[sweep]['marker2'].shape,))
return sweep_info
def sweep_run(self, sweep_info):
''' Activate AWG(s) and channel(s) for the sweep(s).
Arguments:
sweep_info (dict): the keys are tuples of the awgs and channels to activate
'''
for sweep in sweep_info:
if hasattr(self, 'awg_seq') and self._awgs[sweep[0]] == self.awg_seq:
self._awgs[sweep[0]].set_sqel_waveform(
sweep_info[sweep]['name'], sweep[1], 1)
self._awgs[sweep[0]].set_sqel_loopcnt_to_inf(1)
self._awgs[sweep[0]].set_sqel_event_jump_target_index(
sweep[1], 1)
self._awgs[sweep[0]].set_sqel_event_jump_type(1, 'IND')
else:
self._awgs[sweep[0]].set(
'ch%i_waveform' % sweep[1], sweep_info[sweep]['name'])
for sweep in sweep_info:
self._awgs[sweep[0]].set('ch%i_state' % sweep[1], 1)
awgnrs = set([sweep[0] for sweep in sweep_info])
for nr in awgnrs:
self._awgs[nr].run()
def make_sawtooth(self, sweeprange, period, width=.95, repetitionnr=1, start_zero=False):
'''Make a sawtooth with a decline width determined by width. Not yet scaled with
awg_to_plunger value.
Arguments:
sweeprange (float): the range of voltages to sweep over
period (float): the period of the triangular signal
Returns:
wave_raw (array): raw data which represents the waveform
'''
samplerate = 1. / self.AWG_clock
tt = np.arange(0, period * repetitionnr + samplerate, samplerate)
v_wave = float(sweeprange / ((self.ch_amp / 2.0)))
wave_raw = (v_wave / 2) * scipy.signal.sawtooth(2 * np.pi * tt / period, width=width)
# idx_zero = np.argmin(np.abs(wave_raw))
# wave_raw = np.roll(wave_raw, wave_raw.size-idx_zero)
if start_zero:
o = int((wave_raw.size) * (1 - width) / 2)
wave_raw = np.roll(wave_raw, o)
return wave_raw
def make_pulses(self, voltages, waittimes, reps=1, filtercutoff=None, mvrange=None):
"""Make a pulse sequence with custom voltage levels and wait times at each level.
Arguments:
voltages (list of floats): voltage levels to be applied in the sequence
waittimes (list of floats): duration of each pulse in the sequence
reps (int): number of times to repeat the pulse sequence in the waveform
filtercutoff (float): cutoff frequency of a 1st order butterworth filter to make the pulse steps smoother
Returns:
wave_raw (array): raw data which represents the waveform
"""
if len(waittimes) != len(voltages):
raise Exception('Number of voltage levels must be equal to the number of wait times')
samples = [int(x * self.AWG_clock) for x in waittimes]
if mvrange is None:
mvrange = [max(voltages), min(voltages)]
v_wave = float((mvrange[0] - mvrange[1]) / self.ch_amp)
v_prop = [2 * ((x - mvrange[1]) / (mvrange[0] - mvrange[1])) - 1 for x in voltages]
wave_raw = np.concatenate([x * v_wave * np.ones(y) for x, y in zip(v_prop, samples)])
if filtercutoff is not None:
b, a = scipy.signal.butter(1, 0.5 * filtercutoff / self.AWG_clock, btype='low', analog=False, output='ba')
wave_raw = scipy.signal.filtfilt(b, a, wave_raw)
wave_raw = np.tile(wave_raw, reps)
return wave_raw
def check_frequency_waveform(self, period, width):
""" Check whether a sawtooth waveform with specified period can be generated """
old_sr = self.AWG_clock
new_sr = 5 / (period * (1 - width))
if (new_sr) > old_sr:
warnings.warn('awg sampling frequency %.1f MHz is too low for signal requested (sr %.1f [MHz], period %.1f [ms])' % (
old_sr / 1e6, new_sr / 1e6, 1e3 * period), UserWarning)
return new_sr
def sweep_gate(self, gate, sweeprange, period, width=.95, wave_name=None, delete=True):
''' Send a sawtooth signal with the AWG to a gate to sweep. Also
send a marker to the measurement instrument.
Args:
gate (string): the name of the gate to sweep
sweeprange (float): the range of voltages to sweep over
period (float): the period of the triangular signal
Returns:
waveform (dict): The waveform being send with the AWG.
sweep_info (dict): the keys are tuples of the awgs and channels to activate
Example:
>>> waveform, sweep_info = sweep_gate('P1',sweeprange=60,period=1e-3)
'''
self.check_frequency_waveform(period, width)
self.check_amplitude(gate, sweeprange)
start_zero = True
waveform = dict()
wave_raw = self.make_sawtooth(sweeprange, period, width, start_zero=start_zero)
awg_to_plunger = self.hardware.parameters['awg_to_%s' % gate].get()
wave = wave_raw / awg_to_plunger
waveform[gate] = dict()
waveform[gate]['wave'] = wave
if wave_name is None:
waveform[gate]['name'] = 'sweep_%s' % gate
else:
waveform[gate]['name'] = wave_name
sweep_info = self.sweep_init(waveform, period, delete)
self.sweep_run(sweep_info)
waveform['width'] = width
waveform['start_zero'] = start_zero
waveform['sweeprange'] = sweeprange
waveform['samplerate'] = 1 / self.AWG_clock
waveform['period'] = period
for channels in sweep_info:
if 'delay' in sweep_info[channels]:
waveform['markerdelay'] = sweep_info[channels]['delay']
return waveform, sweep_info
def sweep_gate_virt(self, gate_comb, sweeprange, period, width=.95, delete=True):
''' Send a sawtooth signal with the AWG to a linear combination of
gates to sweep. Also send a marker to the measurement instrument.
Arguments:
gate_comb (dict): the gates to sweep and the coefficients as values
sweeprange (float): the range of voltages to sweep over
period (float): the period of the triangular signal
Returns:
waveform (dict): The waveform being send with the AWG.
sweep_info (dict): the keys are tuples of the awgs and channels to activate
'''
self.check_frequency_waveform(period, width)
waveform = dict()
for g in gate_comb:
self.check_amplitude(g, gate_comb[g] * sweeprange)
for g in gate_comb:
wave_raw = self.make_sawtooth(sweeprange, period, width)
awg_to_plunger = self.hardware.parameters['awg_to_%s' % g].get()
wave = wave_raw * gate_comb[g] / awg_to_plunger
waveform[g] = dict()
waveform[g]['wave'] = wave
waveform[g]['name'] = 'sweep_%s' % g
sweep_info = self.sweep_init(waveform, period, delete)
self.sweep_run(sweep_info)
waveform['width'] = width
waveform['sweeprange'] = sweeprange
waveform['samplerate'] = 1 / self.AWG_clock
waveform['period'] = period
for channels in sweep_info:
if 'delay' in sweep_info[channels]:
waveform['markerdelay'] = sweep_info[channels]['delay']
return waveform, sweep_info
def sweepandpulse_gate(self, sweepdata, pulsedata, wave_name=None, delete=True, shift_zero=True):
''' Makes and outputs a waveform which overlays a sawtooth signal to sweep
a gate, with a pulse sequence. A marker is sent to the measurement instrument
at the start of the waveform.
IMPORTANT: The function offsets the voltages values so that the last point is 0 V on all gates (i.e. it centers the pulse sequence on the last point)
Args:
sweepdata (dict): inputs for the sawtooth (gate, sweeprange, period, width).
See sweep_gate for more info.
pulsedata (dict): inputs for the pulse sequence (gate_voltages, waittimes).
See pulse_gates for more info.
Returns:
waveform (dict): The waveform being sent with the AWG.
sweep_info (dict): the keys are tuples of the awgs and channels to activate
'''
sweepgate = sweepdata['gate']
sweeprange = sweepdata['sweeprange']
period = sweepdata['period']
width = sweepdata.get('width', 0.95)
gate_voltages = pulsedata['gate_voltages'].copy()
if shift_zero:
for g in gate_voltages:
gate_voltages[g] = [x - gate_voltages[g][-1] for x in gate_voltages[g]]
waittimes = pulsedata['waittimes']
filtercutoff = pulsedata.get('filtercutoff', None)
pulsesamp = [int(round(x * self.AWG_clock)) for x in waittimes]
sawsamp = int(round(period * width * self.AWG_clock))
pulsereps = int(np.ceil(self.AWG_clock * period * width / sum(pulsesamp)))
allvoltages = np.concatenate([v for v in gate_voltages.values()])
mvrange = [max(allvoltages), min(allvoltages)]
self.check_frequency_waveform(period, width)
waveform = dict()
wave_sweep = self.make_sawtooth(sweeprange, period, width)
for g in gate_voltages:
self.check_amplitude(g, sweeprange + (mvrange[0] - mvrange[1]))
for g in gate_voltages:
wave_raw = self.make_pulses(gate_voltages[g], waittimes, reps=pulsereps,
filtercutoff=filtercutoff, mvrange=mvrange)
wave_raw = wave_raw[:sawsamp]
wave_raw = np.pad(wave_raw, (0, len(wave_sweep) - len(wave_raw)), 'edge')
if sweepgate == g:
wave_raw += wave_sweep
awg_to_plunger = self.hardware.parameters['awg_to_%s' % g].get()
wave = wave_raw / awg_to_plunger
waveform[g] = dict()
waveform[g]['wave'] = wave
if wave_name is None:
waveform[g]['name'] = 'sweepandpulse_%s' % g
else:
waveform[g]['name'] = wave_name
sweep_info = self.sweep_init(waveform, period, delete)
self.sweep_run(sweep_info)
waveform['width'] = width
waveform['sweeprange'] = sweeprange
waveform['samplerate'] = 1 / self.AWG_clock
waveform['period'] = period
waveform['pulse_voltages'] = gate_voltages
waveform['pulse_waittimes'] = waittimes
for channels in sweep_info:
if 'delay' in sweep_info[channels]:
waveform['markerdelay'] = sweep_info[channels]['delay']
return waveform, sweep_info
def sweep_process(self, data, waveform, Naverage=1, direction='forwards', start_offset=1):
""" Process the data returned by reading out based on the shape of
the sawtooth send with the AWG.
Args:
data (list or Nxk array): the data (N is the number of samples)
waveform (dict): contains the wave and the sawtooth width
Naverage (int): number of times the signal was averaged
direction (string): option to use backwards signal i.o. forwards
Returns:
data_processed (array): The data after dropping part of it.
Example:
>> data_processed = sweep_process(data, waveform, 25)
"""
width = waveform['width']
if isinstance(data, list):
data = np.array(data)
if direction == 'forwards':
end = int(np.floor(width * data.shape[0] - 1))
data_processed = data[start_offset:end]
elif direction == 'backwards':
begin = int(np.ceil(width * data.shape[0] + 1))
data_processed = data[begin:]
data_processed = data_processed[::-1]
data_processed = np.array(data_processed) / Naverage
return data_processed
def sweep_2D(self, samp_freq, sweepgates, sweepranges, resolution, width=.95, comp=None, delete=True):
''' Send sawtooth signals to the sweepgates which effectively do a 2D
scan.
The first sweepgate is the fast changing gate (on the horizontal axis).
Arguments:
samp_freq (float): sampling frequency of the measurement instrument in Hertz.
sweepgates (list): two strings with names of gates to sweep
sweepranges (list): two floats for sweepranges in milliVolts
Returns:
waveform (dict): The waveforms being send with the AWG.
sweep_info (dict): the keys are tuples of the awgs and channels to activate
'''
# JP: I think FPGA exceptions should not be handled by awg
# if resolution[0] * resolution[1] > self.maxdatapts:
# raise Exception('resolution is set higher than FPGA memory allows')
if self.corr != 0:
raise Exception('please do not use the .corr setting any more')
error_corr = resolution[0] * self.corr
period_horz = resolution[0] / samp_freq + error_corr
period_vert = resolution[1] * period_horz
self.check_frequency_waveform(period_horz, width)
for g, r in zip(sweepgates, sweepranges):
self.check_amplitude(g, r)
waveform = dict()
# horizontal waveform
wave_horz_raw = self.make_sawtooth(
sweepranges[0], period_horz, repetitionnr=resolution[1])
awg_to_plunger_horz = self.hardware.parameters[
'awg_to_%s' % sweepgates[0]].get()
wave_horz = wave_horz_raw / awg_to_plunger_horz
waveform[sweepgates[0]] = dict()
waveform[sweepgates[0]]['wave'] = wave_horz
waveform[sweepgates[0]]['name'] = 'sweep_2D_horz_%s' % sweepgates[0]
# vertical waveform
wave_vert_raw = self.make_sawtooth(sweepranges[1], period_vert)
awg_to_plunger_vert = self.hardware.parameters[
'awg_to_%s' % sweepgates[1]].get()
wave_vert = wave_vert_raw / awg_to_plunger_vert
waveform[sweepgates[1]] = dict()
waveform[sweepgates[1]]['wave'] = wave_vert
waveform[sweepgates[1]]['name'] = 'sweep_2D_vert_%s' % sweepgates[1]
if comp is not None:
for g in comp:
if g not in sweepgates:
waveform[g] = dict()
waveform[g]['wave'] = comp[g]['vert'] * \
wave_vert + comp[g]['horz'] * wave_horz
waveform[g]['name'] = 'sweep_2D_comp_%s' % g
else:
raise Exception('Can not compensate a sweepgate')
sweep_info = self.sweep_init(waveform, period=period_vert, delete=delete, samp_freq=samp_freq)
self.sweep_run(sweep_info)
waveform['width_horz'] = width
waveform['sweeprange_horz'] = sweepranges[0]
waveform['width_vert'] = width
waveform['sweeprange_vert'] = sweepranges[1]
waveform['resolution'] = resolution
waveform['samplerate'] = 1 / self.AWG_clock
waveform['period'] = period_vert
waveform['period_horz'] = period_horz
for channels in sweep_info:
if 'delay' in sweep_info[channels]:
waveform['markerdelay'] = sweep_info[channels]['delay']
return waveform, sweep_info
def sweep_2D_virt(self, samp_freq, gates_horz, gates_vert, sweepranges, resolution, width=.95, delete=True):
''' Send sawtooth signals to the linear combinations of gates set by
gates_horz and gates_vert which effectively do a 2D scan of two virtual
gates.
The horizontal direction is the direction where the AWG signal is changing fastest. It is the first element in the resolution and sweepranges.
Arguments:
samp_freq (float): sampling frequency of the measurement instrument in Hertz.
gates_horz (dict): the gates for the horizontal direction and their coefficients
gates_vert (dict): the gates for the vertical direction and their coefficients
sweepranges (list): two floats for sweepranges in milliVolts
resolution (list): two ints for numbers of pixels
Returns:
waveform (dict): The waveforms being send with the AWG.
sweep_info (dict): the keys are tuples of the awgs and channels to activate
'''
# JP: I think FPGA exceptions should not be handled by awg
# if resolution[0] * resolution[1] > self.maxdatapts:
# raise Exception('resolution is set higher than memory allows')
error_corr = resolution[0] * self.corr
period_horz = resolution[0] / samp_freq + error_corr
period_vert = resolution[1] * period_horz
new_sr = self.check_frequency_waveform(period_horz, width)
# self.reset_AWG(new_sr)
waveform = dict()
# horizontal virtual gate
for g in gates_horz:
self.check_amplitude(g, sweepranges[0] * gates_horz[g])
for g in gates_horz:
wave_raw = self.make_sawtooth(sweepranges[0], period_horz, repetitionnr=resolution[1])
awg_to_plunger = self.hardware.parameters['awg_to_%s' % g].get()
wave = wave_raw * gates_horz[g] / awg_to_plunger
waveform[g] = dict()
waveform[g]['wave'] = wave
waveform[g]['name'] = 'sweep_2D_virt_%s' % g
# vertical virtual gate
for g in gates_vert:
self.check_amplitude(g, sweepranges[1] * gates_vert[g])
for g in gates_vert:
wave_raw = self.make_sawtooth(sweepranges[1], period_vert)
awg_to_plunger = self.hardware.parameters['awg_to_%s' % g].get()
wave = wave_raw * gates_vert[g] / awg_to_plunger
if g in waveform:
waveform[g]['wave'] = waveform[g]['wave'] + wave
else:
waveform[g] = dict()
waveform[g]['wave'] = wave
waveform[g]['name'] = 'sweep_2D_virt_%s' % g
# TODO: Implement compensation of sensing dot plunger
sweep_info = self.sweep_init(waveform, period=period_vert, delete=delete, samp_freq=samp_freq)
self.sweep_run(sweep_info)
waveform['width_horz'] = width
waveform['sweeprange_horz'] = sweepranges[0]
waveform['width_vert'] = width
waveform['sweeprange_vert'] = sweepranges[1]
waveform['resolution'] = resolution
waveform['samplerate'] = 1 / self.AWG_clock
waveform['period'] = period_vert
waveform['period_horz'] = period_horz
for channels in sweep_info:
if 'delay' in sweep_info[channels]:
waveform['markerdelay'] = sweep_info[channels]['delay']
return waveform, sweep_info
def sweep_2D_process(self, data, waveform, diff_dir=None):
''' Process data from sweep_2D
Arguments:
data (list): the raw measured data
waveform (dict): The waveforms that was sent with the AWG.
Returns:
data_processed (list): the processed data
'''
width_horz = waveform['width_horz']
width_vert = waveform['width_vert']
resolution = waveform['resolution']
# split up the fpga data in chunks of horizontal sweeps
chunks_ch1 = [data[x:x + resolution[0]] for x in range(0, len(data), resolution[0])]
chunks_ch1 = [chunks_ch1[i][1:int(width_horz * len(chunks_ch1[i]))] for i in range(0, len(chunks_ch1))]
data_processed = chunks_ch1[:int(width_vert * len(chunks_ch1))]
if diff_dir is not None:
data_processed = qtt.utilities.tools.diffImageSmooth(data_processed, dy=diff_dir, sigma=1)
return data_processed
def pulse_gates(self, gate_voltages, waittimes, reps=1, filtercutoff=None, reset_to_zero=False, delete=True):
''' Send a pulse sequence with the AWG that can span over any gate space.
Sends a marker to measurement instrument at the start of the sequence.
Only works with physical gates.
Arguments:
gate_voltages (dict): keys are gates to apply the sequence to, and values
are arrays with the voltage levels to be applied in the sequence
waittimes (list of floats): duration of each pulse in the sequence
reset_to_zero (bool): if True, the function offsets the voltages values so that the last point is 0V
on all gates (i.e. it centers the pulse sequence on the last point).
Returns:
waveform (dict): The waveform being send with the AWG.
sweep_info (dict): the keys are tuples of the awgs and channels to activate
'''
period = sum(waittimes)
if reset_to_zero:
for g in gate_voltages:
gate_voltages[g] = [x - gate_voltages[g][-1] for x in gate_voltages[g]]
allvoltages = np.concatenate([v for v in gate_voltages.values()])
mvrange = [max(allvoltages), min(allvoltages)]
waveform = dict()
for g in gate_voltages:
wave_raw = self.make_pulses(gate_voltages[g], waittimes, reps=reps,
filtercutoff=filtercutoff, mvrange=mvrange)
awg_to_plunger = self.hardware.parameters['awg_to_%s' % g].get()
wave = wave_raw / awg_to_plunger
waveform[g] = dict()
waveform[g]['wave'] = wave
waveform[g]['name'] = 'pulses_%s' % g
sweep_info = self.sweep_init(waveform, period, delete)
self.sweep_run(sweep_info)
waveform['voltages'] = gate_voltages
waveform['samplerate'] = 1 / self.AWG_clock
waveform['waittimes'] = waittimes
for channels in sweep_info:
if 'delay' in sweep_info[channels]:
waveform['markerdelay'] = sweep_info[channels]['delay']
return waveform, sweep_info
def reset_AWG(self, clock=1e8):
""" Reset AWG to videomode and scanfast """
self.AWG_clock = clock
for a in self._awgs:
a.clock_freq.set(clock)
a.trigger_mode.set('CONT')
a.trigger_source.set('INT')
for ii in range(1, 5):
f = getattr(a, 'ch%d_amp' % ii)
val = f()
if val != 4.0:
warnings.warn('AWG channel %d output not at 4.0 V' % ii)
if self.awg_seq is not None:
self._set_seq_mode(self.awg_seq)
def set_amplitude(self, amplitude):
""" Set the AWG peak-to-peak amplitude for all channels
Args:
amplitude (float): peak-to-peak amplitude (V)
"""
if amplitude < 0.02:
warnings.warn('Trying to set AWG amplitude too low, setting it to minimum (20mV)')
amplitude = 0.02
elif amplitude > 4.5:
warnings.warn('Trying to set AWG amplitude too high, setting it to maximum (4.5V)')
amplitude = 4.5
# tektronics 5014 has precision of 1mV
self.ch_amp = round(amplitude, 3)
for awg in self._awgs:
for i in range(1, 5):
awg.set('ch%s_amp' % i, self.ch_amp)
def check_amplitude(self, gate, mvrange):
""" Calculates the lowest allowable AWG peak-to-peak amplitude based on the
ranges to be applied to the gates. If the AWG amplitude is too low, it gives
a warning and increases the amplitude.
Args:
gate (str): name of the gate to check
mvrange (float): voltage range, in mV, that the gate needs to reach
"""
min_amp = mvrange / self.hardware.parameters['awg_to_%s' % gate].get()
if min_amp > 4:
raise(Exception('Sweep range of gate %s is larger than maximum allowed by the AWG' % gate))
if self.ch_amp < min_amp:
min_amp = np.ceil(min_amp * 10) / 10
self.set_amplitude(min_amp)
warnings.warn('AWG amplitude too low for this range, setting to %.1f' % min_amp)
# %%
def plot_wave_raw(wave_raw, samplerate=None, station=None):
''' Plot the raw wave
Arguments:
wave_raw (array): raw data which represents the waveform
Returns:
plot (QtPlot): the plot showing the data
'''
if samplerate is None:
if station is None:
raise Exception('There is no station')
samplerate = 1 / station.awg.getattr('AWG_clock')
else:
samplerate = samplerate
horz_var = np.arange(0, len(wave_raw) * samplerate, samplerate)
x = DataArray(name='time(s)', label='time (s)',
preset_data=horz_var, is_setpoint=True)
y = DataArray(
label='sweep value (mV)', preset_data=wave_raw, set_arrays=(x,))
plot = QtPlot(x, y)
return plot
def sweep_2D_process(data, waveform, diff_dir=None):
''' Process data from sweep_2D
Arguments:
data (list): the raw measured data
waveform (dict): The waveforms that was sent with the AWG.
Returns:
data_processed (list): the processed data
'''
width_horz = waveform['width_horz']
width_vert = waveform['width_vert']
resolution = waveform['resolution']
# split up the fpga data in chunks of horizontal sweeps
chunks_ch1 = [data[x:x + resolution[0]] for x in range(0, len(data), resolution[0])]
chunks_ch1 = [chunks_ch1[i][1:int(width_horz * len(chunks_ch1[i]))] for i in range(0, len(chunks_ch1))]
data_processed = chunks_ch1[:int(width_vert * len(chunks_ch1))]
if diff_dir is not None:
data_processed = qtt.utilities.tools.diffImageSmooth(data_processed, dy=diff_dir, sigma=1)
return data_processed