In [1]:
# Initialize Otter
import otter
grader = otter.Notebook("Lab_2_functions.ipynb")

# Lab 2: Statistical analysis of data using numpy

Lab slides: (https://docs.google.com/presentation/d/1ykwwcQ0onMvAjUxfJmKl9tbo-rJPdB5pRwDEmpDsd-g/edit?usp=sharing)

For this lab the goal is to write functions to pull out one data channel and print out statistics for different types of hand motions. We will write a function to load data from a single file, a second to combine data from multiple files, a third to pull out the data for a selected data channel, then a fourth to calculate statistics.

Written properly, you only need one function to do stats for the entire data channel or for just one type of hand motion. For any data channel. In the homework you'll use this logic to apply statistics to a specific data channel.

In [2]:
# Libraries that we need to import - numpy and json (for loading the description file)
import numpy as np
import json as json
# os is needed for calling os.path.basename
import os

### Reading in data

Based on your code from Lab 1, write a function `get_data` that loads the data from a single CSV and adds a column at the end containing the hand motion ID. Then, write a function `get_data_from_files` that loads data from a list of CSV files into a single numpy array using `get_data`.

Both functions will operate on a dictionary containing `"csv_path"` which holds the path to the data file and `"motion_id"` which holds the ID of the motion contained within the CSV file. You will write a third function `get_file_info` that returns this dictionary for a given CSV file, which will extract the motion type from the file name.

Note: The functions initially just contain `pass`, which is just a placeholder that you should remove. In Python, `pass` does nothing at all by design, but removing it and having an empty function is illegal ([docs](https://docs.python.org/3/tutorial/controlflow.html#pass-statements)).

In [3]:
# Numeric ids to indicate hand motion type from Lab 1
clap_id = 1
high_five_id = 2
snap_id = 3

In [4]:
# SCRATCH cell
fname_chop_up = "Data/S01C01.csv"
res = os.path.basename(fname_chop_up)

# TODO:  look at res. How would you get out the C F or S character?
middle_char = res[3]

In [5]:
# TODO: Write the function to take csv_path and return a dictionary
# Step 1: Use `os.path.basename` to get the filename of the CSV file (e.g., S10F01.csv),
# Step 2: Extract the C/F/S character which will appear at a fixed offset in the filename,
# and then 
# Step 3: find the right motion ID based on that character. Remember that C is a
# clap, F is a high five, and S is a snap.
# Step 4: Make a dictionary and return it (see @return for details)
def get_file_info(csv_path):
    """Function that returns a file_info dictionary for a given filepath.
    @param csv_path - path to a CSV file containing a hand motion
    @return A dictionary with key "csv_path", containing csv_path, and "motion_id", containing the type of motion encoded in the file."""
    # The base file path is of the form S##[C|F|S]##.csv.

    file_char = os.path.basename(csv_path)[3] #extracting C, F, or S from filename

    motion_id = 0 #creating varaible to hold motion id number

    #checks for char and assigns correct motion id
    if file_char == 'C':
        motion_id = clap_id
    elif file_char == 'F':
        motion_id = high_five_id
    elif file_char == 'S':
        motion_id = snap_id

    return {'csv_path' : csv_path, 'motion_id' : motion_id}


In [6]:
# Test cell: If you wrote the above function correctly, this should work!
assert(get_file_info("Data/S01C01.csv") == {"csv_path": "Data/S01C01.csv", "motion_id": clap_id})

# TODO... Write another assert test for the high five data 
assert(get_file_info('Data/S01F01.csv') == {"csv_path": "Data/S01F01.csv", "motion_id": high_five_id})

In [7]:
# TODO: Actually read in the data and add the motion id to the end
#   Reminder: file_info will be the dictionary you just created in the above function
def get_data(file_info):
    """ Function that returns the data from the given CSV file.
    @param file_info - a dictionary with keys "csv_path" and "motion_id"
    @return Return array should contain data in file with an extra column at the end containing the motion_id."""

    fname = file_info["csv_path"] #extracting file name from file_info argument

    file_data = np.loadtxt(fname=fname, dtype="float", delimiter=',') #reading desired file

    arr_shape = np.shape(file_data) #getting shape of data set

    rows = arr_shape[0] #variable for number of rows
    columns = arr_shape[-1] #variable for number of columns

    read_data = np.zeros((rows, columns + 1)) #creating empty array for data + motion id

    read_data[:,0:columns] = file_data #inserting data into new array

    read_data[:,-1] = file_info["motion_id"] #assignming motion id

    return read_data


In [8]:
# Test cell: TODO: Write a test that 
#   Calls get_file_info to create a dictionary
#   Passes that dictionary to get_data
#   Checks that the returned data is correct
#     1) the last column is the expected motion id
#     2) the data is correct

#calling functions
file_info = get_file_info("Data/S01C01.csv")
file_data = get_data(file_info)


original_data = np.loadtxt(fname="Data/S01C01.csv", dtype="float", delimiter=',') #reading orginal data

assert np.all(file_data[:,-1]) == float(file_info["motion_id"]) #checking that the motion id is correct
assert np.allclose(file_data[:, 0:-1], original_data) #checking that the data in the file_data array is the same as the original data set

In [9]:
# TODO: This function reads in multiple data sets (not just one)
#  Reminder: Each element of file_list is a dictionary 
#   You should be calling your get_data function from this function...
def get_data_from_files(file_list):
    """ Function that returns data from a list of files.
    @param file_list - a list of dictionaries, where each dictionary contains `csv_path` and `motion_id`.
    @return A single return array containing the data from all of the given input files."""
    # Hint: Use np.concatenate to combine multiple numpy arrays.
    
    all_data = [] #empty list for all of the data

    for file in file_list: #loops through each file
        data = get_data(file) #gets data from n file in list
        all_data.append(data) #appends file data onto all_data (list of arrays)
    
    return np.concatenate(all_data) #returns a combination of all arrays in the list

In [10]:
# SCRATCH cell
# TODO: Use this cell to check that all three functions are working correctly
original_data = np.loadtxt(fname="Data/S01C01.csv", dtype="float", delimiter=',') #reading orginal data
file_info = get_file_info("Data/S01C01.csv")
all_data_check = get_data_from_files([get_file_info("Data/S01C01.csv")])

assert np.all(all_data_check[:,-1]) == float(file_info["motion_id"]) #checking that the motion id is correct
assert np.allclose(all_data_check[:, 0:-1], original_data) #checking that the data in the file_data array is the same as the original data set


In [11]:
# Read in data from the files in lab 1 using the functions you just wrote.
all_data = get_data_from_files([
    get_file_info("Data/S01C01.csv"),
    get_file_info("Data/S01F01.csv"),
    get_file_info("Data/S01S01.csv"),
])

# The autograder will check that get_file_info is correct, then that
#  the last columns of all_data are set to the correct motion ids,
# then check that the actual data in all_data is correct
# Reminder that you can look in the files (eg, S01F01.csv) to see what the values should be
# Reonder 2: If you have print statements in your functions that will also cause the autograder to fail

In [12]:
grader.check("get_data")

## Doing the slice

Get the data for one of the channels. 

In [13]:
# This reads in the json data
try:
    with open("Data/data_description.json", "r") as fp:
        data_description = json.load(fp)
except FileNotFoundError:
    print(f"The file was not found; check that the data directory is in the current one and the file is in that directory")
        

In [14]:
# TODO: Return the descriptor dictionary for the given name
#  Reminder that data_description is a dictionary with the "data channels" key
#   containing the list of dictionaries...
def get_descriptor(data_description, name):
    """ Search through data_description to find the dictionary with name "name"
    @param name - The name of the data channel to look for. 
    @return the dictionary that has name as the 'name' key"""

    for description in data_description["data_channels"]:
        if description["name"] == name: #if the name element is the desired name
            return description #return the dictionary

In [15]:
# TODO: Write a test that, eg, get_descriptor with "Timestamp" returns the first dictionary
#  in the list in data_description.json

test_description = get_descriptor(data_description, "Timestamp")

#checks get_descriptor against first element of data_channels (name = Timestamp)
assert test_description == data_description["data_channels"][0] 

In [16]:
# TODO get the channel data that starts at index_offset and has n_dims
def get_channel_data(all_data, index_offset, n_dims):
    """ Get the data for just one channel (e.g., right hand accelerometer)
    @param all_data - numpy array containing data from one (or more) files
    @param index_offset - the column to begin getting data from
    @param n_dims - number of dimensions for the data channel
    @return Returned array should be: number of rows in all_data X n_dims"""

    return all_data[:, index_offset : index_offset + n_dims]

In [17]:
# Test 1 - the right hand accelerometer data
rh_accelerometer_descriptor = get_descriptor(data_description, "Right hand accelerometer")
rh_accelerometer_data = get_channel_data(all_data, index_offset=rh_accelerometer_descriptor["index_offset"], n_dims=rh_accelerometer_descriptor["dimensions"])


In [18]:
# SELF TESTS
print(f"Shape of rhs_accelerometer_data is {rh_accelerometer_data.shape}, should be 285 X 3")
print(f"First row, first column value {rh_accelerometer_data[0, 0]:0.2f}, should be 0.70")
print(f"First row, last column value {rh_accelerometer_data[0, -1]:0.2f}, should be -0.41")
print(f"Last row, first column value {rh_accelerometer_data[-1, 0]:0.2f}, should be 0.07")
print(f"Last row, last column value {rh_accelerometer_data[-1, -1]:0.2f}, should be -0.98")


Shape of rhs_accelerometer_data is (285, 3), should be 285 X 3
First row, first column value 0.70, should be 0.70
First row, last column value -0.41, should be -0.41
Last row, first column value 0.07, should be 0.07
Last row, last column value -0.98, should be -0.98


In [19]:
# Tests for Left hand gyroscope
lh_gyroscope_descriptor = get_descriptor(data_description, "Left hand gyroscope")
lh_gyroscope_data = get_channel_data(all_data, index_offset=lh_gyroscope_descriptor["index_offset"], n_dims=lh_gyroscope_descriptor["dimensions"])

In [20]:
# Check size and first, last element
assert(lh_gyroscope_data.shape == (all_data.shape[0], 3))
assert(np.isclose(lh_gyroscope_data[0, 0], 429.34))
assert(np.isclose(lh_gyroscope_data[-1, -1], -154.36))

In [21]:
grader.check("check_slice")

## Compute stats: Write a function to calculate the four stats

This is a variation on what you did in lab 1; in this case, we're going to do it with two functions. The first calculates the stats and returns the dictionary (**calc_stats**) the second does the **for** loop to make one dictionary for each dimension in the data.

- Step 1 [this problem] - do the **calc_stats** function
- Step 2 [next problem] - do the loop to calculate the stats for each x,y,z channel

In [22]:
# TODO Fill in the function that calculates the stats for the data
def calc_stats(data):
    """Calculate min, max, mean and standard deviation for the array and put in a dictionary
    @param data a numpy array
    @return a dictionary with the given keys"""

    # Use keys Min, Max, Mean, and SD
    return {
        "Min": np.min(data),
        "Max": np.max(data),
        "Mean": np.mean(data),
        "SD": np.std(data)
    }

In [23]:
# Test the function with known data
test_data = np.linspace(0, 1, 10)
ret_dict = calc_stats(test_data)

assert(np.isclose(ret_dict["Min"], 0.0))
assert(np.isclose(ret_dict["Max"], 1.0))
assert(np.isclose(ret_dict["Mean"], 0.5))
assert(np.isclose(ret_dict["SD"], 0.319, atol=0.01))

In [24]:
grader.check("stats_channel")

### Now do the second half - 

This function calculates the stats for an entire channel of the data, and stores the result in a list of dictionaries

In [25]:
def calc_stats_for_channel(data, n_dims):
    """ Calculate the stats for a channel
    @param data - an n_timestamps * n_dims size array
    @param n_dims - 1, 2, or 3 (just x; x,y; or x,y,z)
    @return A list of dictionaries. The list is the length of n_dims"""

    stats_list = []
    # TODO Copy in your for loop from the statistics problem in Lab 1
    # If you didn't do a for loop in lab 1, you need to make it a for loop now!
    # - You DO need to slice the data into the x,y,z channels
    # - You need to loop n_dims times
    # - Don't forget to return the array
    
    dim = 1 #index for dimention

    for dim in range(n_dims): #loops for how many dimensions exist
        stats = {
            "Min": np.min(data[:, dim]),
            "Max": np.max(data[:, dim]),
            "Mean": np.mean(data[:, dim]),
            "SD": np.std(data[:, dim])
        }
        dim += 1 #increments dimension
        stats_list.append(stats) #adding stats of dim dimension

    return stats_list

In [26]:
# SCRATCH CELL
# If you're having trouble, try setting n_dims to 1 and use test_data for the data input

In [27]:
# Testing with known data - make a fake data set with 5 time steps and x, y, z data
#  
test_stats = np.zeros((5, 3))
# Set the x data to be ones
test_stats[:, 0] = np.ones(5)
# Set the y data to be twos
test_stats[:, 1] = np.ones(5) * 2
# Set the z data to be threes
test_stats[:, 2] = np.ones(5) * 3

# Now get the actual stats
ret_stats_array = calc_stats_for_channel(test_stats, n_dims=3)

# Check the mean result for x, y, and z - should be 1, 2, and 3 respectively
assert(ret_stats_array[0]["Mean"] == 1.0)
assert(ret_stats_array[1]["Mean"] == 2.0)
assert(ret_stats_array[2]["Mean"] == 3.0)

In [28]:
# this should work
ret_stats_rh_accelerometer = calc_stats_for_channel(rh_accelerometer_data, 3)

In [29]:
# As should this
res_stats_lh_gyroscope = calc_stats_for_channel(lh_gyroscope_data, 3)

In [30]:
grader.check("loop_data_calc_stats")

## Boolean slicing to get successful versus unsuccessful statistics out

Use the functions you just wrote to get out the min and max z values for each type of hand motion.

For this problem I have written code that is *incorrect*. You know the functions themselves are correct - you just tested them. The following bits of code have something wrong with either the way the function is called OR with the way the results are gotten back.

In [31]:
# Boolean filters for getting rows for a specific motion type. 
# Motion type should extract the last columnb in the data
#   uncomment and get the last column
motion_type = all_data[:, -1]

# These should match the specific IDs for each motion type, eg clap_id.
# Uncomment and compare motion type to the appropriate XX_id 
b_clap = motion_type == clap_id
b_snap = motion_type == snap_id
b_high_five = motion_type == high_five_id

# Use b_clap to pick out the rows that are for claps. Send all column data for the selected rows.
#   Right hand accelerometer has 3 dimensions (x,y,z)
#   There's two errors here - one that actually will create incorrect results, one that just *happens* to work
#   correctly, although it doesn't do what the first sentance says...
ret_rh_accelerometer_clap = calc_stats_for_channel(rh_accelerometer_data[b_clap], n_dims=3)

# The minimum should be in the third (last) element in the list, the "min" key
z_min_clap = ret_rh_accelerometer_clap[2]["Min"]
z_max_clap = ret_rh_accelerometer_clap[2]["Max"]

# Now, do the same thing above, but for snap and high_five

ret_rh_accelerometer_snap = calc_stats_for_channel(rh_accelerometer_data[b_snap], n_dims=3) #finds stats for snap data
z_min_snap = ret_rh_accelerometer_snap[2]["Min"] #indexes to z and min
z_max_snap = ret_rh_accelerometer_snap[2]["Max"] #indexes to z and max

ret_rh_accelerometer_high_five = calc_stats_for_channel(rh_accelerometer_data[b_high_five], n_dims=3) #finds stats for high five data
z_min_high_five = ret_rh_accelerometer_high_five[2]["Min"] #indexes to z and min
z_max_high_five = ret_rh_accelerometer_high_five[2]["Max"] #indexes to z and max

print(f"Clap: Minimum {z_min_clap} and maximum {z_max_clap} value of right hand accelerometer z channel")
print(f"Snap: Minimum {z_min_snap} and maximum {z_max_snap} value of right hand accelerometer z channel")
print(f"High five: Minimum {z_min_high_five} and maximum {z_max_high_five} value of right hand accelerometer z channel")

Clap: Minimum -1.04 and maximum 9.06 value of right hand accelerometer z channel
Snap: Minimum -2.77 and maximum 1.05 value of right hand accelerometer z channel
High five: Minimum -0.74 and maximum -0.32 value of right hand accelerometer z channel


In [32]:
grader.check("boolean_slicing")

## Optional/Extra credit: print out all of the rows where the minimum z value for a clap motion was reached

See the tutorial on **np.where** (c_tutorial_where.ipynb)

TODO: Use **np.where** to pick out the row that has the minimum z value for a clap motion.

In [33]:
# TODO Use np.where to get out the indices. You can use == OR np.isclose() here; either works. In general, use .isclose for 
#  floating point comparisons.
# Append the row number of any matches to this list
all_rows_with_min = []
#row = 0
#for element in rh_accelerometer_data[:,2]:
    #if np.isclose(element, z_min_clap):
        #all_rows_with_min.append(row)
        
    #elif np.isclose(element, z_min_snap):
        #all_rows_with_min.append(row)
        
    #elif np.isclose(element, z_min_high_five):
        #all_rows_with_min.append(row)
        
    #row += 1

conditions = (np.isclose(rh_accelerometer_data[b_clap][:,2], z_min_clap))

all_rows_with_min = np.where(conditions)[0]

#print(all_rows_with_min)
# Look at JUST the z values in rh_accelerometer_data
#all_indices_from_where = np.where

# Pseudo code - see tutorial for exact format
# for all row in all_indices_from_where
#    if this is row is from a clap: 
#       print(f"Row: {r}, Time step: {c}")

In [34]:
grader.check("optional_where")

## Hours and collaborators
Required for every assignment - fill out before you hand-in.

Listing names and websites helps you to document who you worked with and what internet help you received in the case of any plagiarism issues. You should list names of anyone (in class or not) who has substantially helped you with an assignment - or anyone you have *helped*. You do not need to list TAs.

Listing hours helps us track if the assignments are too long.

In [35]:

# List of names (creates a set)
worked_with_names = {"N/A"}
# List of URLS 2S5 (creates a set)
websites = {"https://numpy.org/doc/stable/reference/generated/numpy.std.html"}
# Approximate number of hours, including lab/in-class time
hours = 4

In [36]:
grader.check("hours_collaborators")

### To submit

- Do a restart then run all to make sure everything runs ok
- Save the file
- Submit just this .ipynb file through gradescope, Lab 2, functions
- You do NOT need to submit the data files - we will supply those
- Where there are given variable/file names (eg, foo = ...) DON'T change those, or the autograder will fail

If the Gradescope autograder fails, please check here first for common reasons for it to fail
    https://docs.google.com/presentation/d/1tYa5oycUiG4YhXUq5vHvPOpWJ4k_xUPp2rUNIL7Q9RI/edit?usp=sharing

Most likely failure for this assignment is not naming the data directory and files correctly; capitalization matters for the Gradescope grader. 