# Chunk size experiment 
Use this jupyter to generate the image stimuli matrix

Goal :
    We need 1000 chunks of stimuli EEG data 
    So, we need to stimuli 0-9 for 1000 times which means that
    
    0 - display 100 times
    1 - display 100 times
    .
    .
    9 - display 100 times

Information:
    Chunk size = 64 samples
    EEG sampling = 128 Hz with 16 electrod nodes
    
    
Requirements:
1. Genterate represent number 0-9 with 1000 number
2. Divide it in to 20 blcok, 50 images per block
3. Current tail should not same number with previous


In [1]:
import random
import pylsl
import numpy as np
import pandas as pd
import time
import itertools
import math
import psychopy 
from psychopy import visual, core, event
from datetime import datetime
from IPython.display import clear_output


In [2]:
#==============================================
# experiment parameters
#==============================================
num_trail  = 10
images_per_block = 5
num_chunk  = 640
num_block  = num_chunk/num_trail
stim_time = 1
stim_blink_time = 0.2

fixation_time = 2  # inter trial interval, i.e., how long the fixation will stay in second


#==============================================
# Configuration
#==============================================
image_folder='numerical_stimuli'


#name, type, channel_count, sampling rate, channel format, source_id
#info = StreamInfo('CytonMarkers', 'Markers', 1, 0.0, 'int32', 'CytonMarkerID')#make an outlet
info = pylsl.StreamInfo('CytonMarkers', 'Markers', 1, 0.0, 'string', 'CytonMarkerID')#make an outlet
outlet = pylsl.StreamOutlet(info)


### Generate 100 number

In [3]:

def genNumbers(num):
    trail_sequence = []
    for loop in range(int(num/10)):
        sequence = np.arange(10)
        trail_sequence.extend(sequence)
    random.shuffle(trail_sequence)
    return trail_sequence


# find the 2 consecutive
def findConsecutive(data):
    for loop_i in range(len(data)):
        if loop_i <= (len(data)-2) :
            if data[loop_i] == data[loop_i+1] :
                fixConsecutive(data, loop_i+1)

# find the positon that can swap
def fixConsecutive(data, position):
    if position <= ( len(data) -2 ) :
        counter = len(data) - position
        for ii in range(counter) :
            if data[position] != data[position+ii] :
                # swap 
                swapPositions(data,position, position+ii )
                return
            
# swap 2 value
def swapPositions(list, pos1, pos2): 
    list[pos1], list[pos2] = list[pos2], list[pos1] 
    #print(f"swap index {pos1} <-> {pos2}")
    return 


# check 2 consecutive value
def checkConsecutive(data):
    num_consecutive = 0
    for loop_i in range(len(data)):
        if loop_i <= (len(data)-2) :
            if data[loop_i] == data[loop_i+1] :
                #print(f"Consecutive found : {loop_i}<->{loop_i+1} , value {data[loop_i]} " )
                num_consecutive = num_consecutive+1
    return num_consecutive


In [4]:
# generate 0-9 for 1000 value
trails = genNumbers(num_trail*images_per_block)


if checkConsecutive(trails) :
    print(f"First consecutive check : There is some consecutive number in the list")

# find and fix consecutive number
findConsecutive(trails)

if checkConsecutive(trails) :
    print(f"Second check : There is still two same consecutive number in the list")
else :
    print(f"Second consecutive check : PASS")
    
blocks_imgs = np.reshape(trails, (-1,num_trail))
blocks_imgs.shape
    

First consecutive check : There is some consecutive number in the list
Second consecutive check : PASS


(5, 10)

In [5]:

np.count_nonzero(blocks_imgs==0)
blocks_imgs

array([[3, 5, 4, 7, 2, 4, 1, 0, 2, 3],
       [2, 4, 9, 2, 8, 0, 5, 3, 6, 7],
       [8, 6, 4, 9, 8, 1, 8, 5, 0, 7],
       [6, 0, 6, 1, 3, 9, 1, 7, 0, 7],
       [9, 6, 3, 9, 1, 5, 4, 5, 8, 2]])

### Experiment section

In [6]:
def drawTextOnScreen(massage) :
    message = visual.TextStim( mywin, text=massage, languageStyle='LTR' )
    message.contrast =  0.3
    message.draw() # draw on screen
    mywin.flip()   # refresh to show what we have draw

    
def drawTrial( fileName, stimTime ) :
    imgPath=image_folder+"/"+str(fileName)+".png"
    img = visual.ImageStim( mywin,  image=imgPath )
    img.size *= 2
    img.draw()
    mywin.flip()
    eegMarking(fileName, "img_stim" )
    core.wait(stimTime)
    drawTextOnScreen('') 
    core.wait(stim_blink_time)
    

    
def drawFixation(fileName, fixationTime):
    fixation = visual.ShapeStim(mywin,
                                vertices=((0, -0.5), (0, 0.5), (0,0), (-0.5,0), (0.5, 0)),
                                lineWidth=5,
                                closeShape=False,
                                lineColor="white"
            )
    fixation.draw()
    mywin.flip() #refresh
    eegMarking(fileName, "fixation" )
    core.wait(fixationTime-0.5)
    drawTextOnScreen('')
    core.wait(0.5)
     

def eegMarking(img, stampType):   # use trial variable from main
    if not isTrianing :
        if stampType == "img_stim" :
            markerString = str(block+1) + "," + str(trial) + ","  + str(img)
        elif stampType == "fixation" :
            markerString = str((block+1)*-1) + "," +str("*") + "," + str("*")
    else:
        markerString = 'Training'
    markerString= str(markerString)                              
    print("Marker string {}".format(markerString))
    outlet.push_sample([markerString])
  


In [7]:
random.randint(1, 10)

2

In [8]:
# mywin = visual.Window([1366, 768], fullscr=True, color="black", units='norm')    # set the screen and full screen mode
mywin = visual.Window([1920, 1080], color="black", units='norm')    # set the screen and full screen mode

##############
####  Training session
while True:
    isTrianing = True
    drawTextOnScreen('Training session, Press space bar to start')
    keys = event.getKeys()
    if 'space' in keys:      # If space has been pushed
        start = time.time()
        drawTextOnScreen('') 
        trial = 1
        for img in blocks_imgs[4]:
            drawTrial(img, stim_time)   # drawTrail(fileName, stimTime, thinkingTime, fixationTime)

            if trial == 3 :
                break
            trial += 1

        drawFixation(img, fixation_time)
        drawTextOnScreen('End of trainning session')
        core.wait(5)
        isTrianing = False
        break

        
################
####### Experiment session
        
while True:
    drawTextOnScreen('Experiment session : Press space bar to start')
    keys = event.getKeys()
    if 'space' in keys:      # If space has been pushed
        start = time.time()
        drawTextOnScreen('') 
        
        for block in range(len(blocks_imgs)) :
            trial = 1
            for img in blocks_imgs[block]:
                clear_output(wait=True)
                drawTrial(img, stim_time)   # drawTrail(fileName, stimTime, thinkingTime, fixationTime)
                trial += 1
    
            drawFixation(img, fixation_time)
        drawTextOnScreen('End of experiment, Thank you')
        
        stop  = time.time()
        print(f"Total experiment time = {stop-start} ")
        core.wait(10)
        break
    
mywin.close()

Marker string 1,4,7


KeyboardInterrupt: 

In [9]:
mywin.close() 

