In [9]:
%matplotlib inline
import os
import subprocess as sub
import matplotlib.pyplot as plt
import psycopg2
import psycopg2.extras
import matplotlib
from datetime import datetime, timedelta
from tabulate import tabulate

# expects to find connection credentials in local runtime environment
db_host=os.environ['SQL_LOCAL_SERVER']
db_host_port=int(os.environ['SQL_LOCAL_PORT'])
db_user=os.environ['SQL_USER']
city_num=int(os.environ['INGEST_CITY'])
wave_num=int(os.environ['INGEST_WAVE'])
db_name='interact_db'
db_schema='level_0'

city_data = {1:{'prefix':'vic', 'name':'Victoria'},
             2:{'prefix':'van', 'name':'Vancouver'},
             3:{'prefix':'ssk', 'name':'Saskatoon'},
             4:{'prefix':'mtl', 'name':'Montreal'}}

# these should be derivable from above settings
wavenumstr = f"{wave_num:02}"
city_prefix = city_data[city_num]['prefix']
city_name = city_data[city_num]['name']
datadir = f"/media/cedar/projects/def-dfuller/interact/permanent_archive/{city_name}/Wave{int(wavenumstr)}/SenseDoc"
telemetrydir = f"{datadir}/{city_name.lower()}_{wavenumstr}"
linkage_file = f"{datadir}/../linkage.csv"

# Create a list of users who are to be specifically excluded from a particular study
# It's being done this way because the linkage CSV files do not encode anything about study numbers, and
# adding such a feature would break a lot of code, plus risk contaminating the CSV files through sloppy changes
# So since these cases are rare, I'm putting them here, at the head of the code where they should be 
# obvious and apparent.
suppressors = {} #none needed for Mtl W2


def log(instr):
    timestr = datetime.now().strftime("[%Y-%m-%d %H:%M:%S] ")
    print(f"{timestr}→ {instr}")
    
log(f"Initial parameters registered. You are about to ingest Wave {wave_num} SenseDoc data for the city of {city_name}.")

[2021-08-17 09:47:35] → Initial parameters registered. You are about to ingest Wave 2 SenseDoc data for the city of Saskatoon.


**After running the block:** The output line above is telling you which city, wave, and telemetry type you are about to ingest. *Read that line carefully!* If any of it is wrong, you've probably forgotten to set your environment variables correctly. You'll have to shut down Jupyter Lab, update your environment, and then re-launch Jupyter.

## Linkage Data
The first step of ingest is to ensure that all contributing participants have a complete user record in the system that links their interact_id to the sensedoc(s) devices they were assigned, and the time window within which their data was captured. This linkage data needs to be validated and ingested before the telemetry data can be loaded.


### Verify Linkage CSV
This step will assess the CSV file to ensure it's in a parsable format and then confirm that the users listed  match those who have contributed telemetry data. If any errors or mismatches are reported here, the linkage file records should be updated until this verification test passes cleanly.


#### Possible problems and their solutions

**User in linkage file but has no telemetry data:**
This can happen when a user was assigned an Ethica account, but either dropped out of the study or switched to SenseDoc before data collection began. If the study coordinator confirms that the user dropped out and produced no usable data, then mark the user with 'ignore' in the data_disposition field of the linkage file. This will exclude them from the list of ids who are expected to have telemetry data.

**User has telemetry data but is not in linkage file:**
This usually happens when data got collected from a staff member who was testing the system. For these people, there *is* data, but we want to remove it from the ingest. So we don't just ignore these records, we have to actively cull the associated data. This is indicated by marking the data_disposition field with 'cullsd'.

In [10]:
# Helper Functions: will be used further down to verify the linkage CSV file

import os
import re
import sys
import csv
import subprocess

# In one city, the linkage encode ethica id as 'Ethica ID'
# In another, it is encoded as 'ethica_id'
# We may want to make that column name an input parameter

def validate_iid(idstr, logfailures=True):
    """
    Given an Interact ID in string form, ensure it's well formed.
    Return True if so, False otherwise.
    """
    if not len(idstr.strip()) > 8:
        if logfailures: log(f"Interact ID '{idstr}' is not long enough.")
        return False
    try:
        iid = int(idstr)
    except ValueError:
        if logfailures: log(f"Interact ID '{idstr}' is not an integer.")
        return False
    global iids
    iids.add(iid)
    return True


def validate_eid(idstr, logfailures=True):
    """
    Given an Ethica ID in string form, ensure it's well formed and
    unique. Return True if so, False otherwise.
    """
    try:
        eid = int(idstr)
    except ValueError: # filter B
        if logfailures: log(f"Ethica ID '{idstr}' is not an integer.")
        return False
    global eids
    if eid in eids: # filter A
        if logfailures: log(f"Redundant record for Ethica ID '{idstr}'.")
        return False
    eids.add(str(eid)) 
    # we only converted to int to be sure the string was int-friendly
    # all subsequent tests against id will be done as string compares
    return True


def validate_sdid(instr, logfailures=True):
    """
    Given a SenseDoc ID in string form, ensure it's well formed and unique.
    Return True if so, False otherwise.
    """
    if '-' in instr:
        idstr,dash,revno = instr.partition('-')
        if logfailures: log(f"Extracted {idstr} from {instr}")
    else:
        idstr = instr

    try:
        sdid = int(idstr)
    except ValueError: 
        if logfailures: log(f"SenseDoc ID '{idstr}' is not an integer.")
        return 0
    global sdids
    if sdid in sdids:
        if logfailures: log(f"Redundant record for SenseDoc ID '{idstr}'")
        return 0
    #log(f"SenseDoc ID '{idstr}' is a unique integer.")
    sdids.add(f"{sdid:03}")
    return sdid

def validate_csv_dates(row, colnames):
    """
    Given a CSV record and a list of date columns, 
    ensure the date fields are all present and well-formed.
    Return True if so, False otherwise.
    Wear dates are only valid for SenseDoc data. With SD, data
    capture begins the moment the device is booted, so it captures
    activity of the coordinator prior to delivery to the participant,
    and then again after the device has been recovered. Wear dates
    are used in that situation to filter out the coordinator data.
    (But )Ethica data is different and has no coordinator "pollution"
    that needs to be removed.)
    """
    return True
    try:
        for colname in colnames:
            date_str = row[colname]
            dateval = datetime.strptime(date_str, "%Y-%m-%d")
    except KeyError:
        log(f"EID '{row['ethica_id']}' has missing date field(s)")
        return False
    except ValueError:
        log(f"EID '{row['ethica_id']}' has missing/invalid date info {start_str} - {end_str}")
        return False
    if start_date >= end_date:
        log(f"EID '{row['ethica_id']}' has invalid date window {start_str} - {end_str}")
        return False
    log(f"EID '{row['ethica_id']}' has good date window {start_str} - {end_str}")
    return True


def get_possible_values_from_csv(filename, colnum):
    """
    Given a CSV file and the index number of a column,
    return a list of all unique values in that column
    """
    values = set()
    # To show a helpul progress bar, we have to know how many
    # lines need to be processed, so we need to traverse the file
    # twice. This might seem like overkill, but the data files are
    # pretty big and can take hours to process. Showing signs of
    # life on the console while the script is churning away silently
    # in the background can be very reassuring.
    if os.path.isfile(filename):
        log(f"Scanning file: {filename}")
    else:
        log(f"WARNING: Unable to find file '{filename}'")
        return values
    #nlines = sum(1 for x in open(filename))
    #with open(filename,'r',encoding='ISO-8859-1') as fcsv:
    #    reader = csv.DictReader(fcsv,delimiter=',')
    #    for row in reader:
    #        values.add(row[colname])
            
    # TURNS OUT THE ABOVE IS HORRIBLY INEFFICIENT
    # IT'S ORDERS OF MAGNITUDE FASTER TO RUN A SHELL SCRIPT LIKE THIS:
    # cut -d, -f 1 Ethica/montreal_en_01/raw/gps.csv | sort -u
    
    # set up the piped commands
    skipcmd = ['tail', '-n', '+2', filename] # skip the first line, which has column names
    listuseridscmd = ['cut', '-d,', '-f', colnum] # user_id = first field of each row
    distinctifycmd = ['sort', '-u'] # create a list of distinct ids
    
    # run the piped commands
    p0 = subprocess.Popen(skipcmd, stdout=subprocess.PIPE)
    p1 = subprocess.Popen(listuseridscmd, stdin=p0.stdout, stdout=subprocess.PIPE)
    p2 = subprocess.Popen(distinctifycmd, stdin=p1.stdout, stdout=subprocess.PIPE, encoding='utf8')
    #p1.stdout.close()  # Allow p1 to receive a SIGPIPE if p2 exits.
    output,err = p2.communicate()
    
    # parse the output
    values = []
    for val in output.split('\n'):
        if val: # skip empty lines
            if val.isnumeric():
                values.append(val)
            else:
                log(f"Unexpected non-numeric entry in column {colnum} = '{val}'")

    return values

# Quick test of the above routine
# These are only relevant to Ethica
#log("Testing function: get_possible_values_from_csv()")
#foo = get_possible_values_from_csv(os.path.join(datadir, 'montreal_en_01/raw/gps.csv'), '1')
#log(f"Found {len(foo)} users in English data.")
#foo = get_possible_values_from_csv(os.path.join(datadir, 'montreal_fr_01/raw/gps.csv'), '1')
#log(f"Found {len(foo)} users in French data.")

def detect_missing_values_in_csv(master_values, csvfilelist, colnum, colname, streamname):
    """
    Compare the values in a particular column of a CSV against a
    master list of expected values and report values in the CSV
    that do not match the expectation.
    NOTE: This is only used for Ethica validation, because SenseDoc keeps a unique telemetry
          file for each user.
    """
    found_values = set([])
    for csvfile in csvfilelist:
        studynum = csvfilelist[csvfile]
        log(f"File {csvfile} is associated with study number {studynum}")
        users_in_file = get_possible_values_from_csv(csvfile, colnum)
        for user in users_in_file:
            if user in participant_study and participant_study[user] != studynum:
                log(f"WARNING: User {user} has data in conflicting studies: {studynum} and {participant_study[user]}")
            participant_study[user] = studynum         
        found_values.update(users_in_file)
    
    log(f"Found values: {sorted(list(found_values))}")
    log(f"Master values: {sorted(list(master_values))}")

    missing_values = master_values - found_values - culled_eids
    log(f"{len(master_values)} user_ids found in master linkage CSV")
    log(f"{len(found_values)} distinct ethica_id values found in telemetry CSV data")
    if missing_values: # filter D
        log(f"{len(missing_values)} expected column {colnum} values missing from {streamname} data")
        log(','.join([str(x) for x in sorted(list(missing_values))]))
    else:
        log(f"All expected column {colnum} values found for {streamname}")
    unexpected_values = found_values - master_values
    if unexpected_values: # filter E
        log(f"{len(unexpected_values)} column {colnum} values in {streamname} data were unexpected:")
        log(','.join([str(x) for x in sorted(list(unexpected_values))]))
    else:
        log(f"All values found for column {colnum} in {streamname} were expected")
    log(f"Master values: {sorted(list(master_values))}")
    log(f"Found values: {sorted(list(found_values))}")
    log(f"Missing values: {sorted(list(missing_values))}")
    log(f"Unexpected values: {sorted(list(unexpected_values))}")
    
    
    # ALSO NEED TO POPULATE participant_study BASED ON APPEARANCE IN DATA STREAM
    
    # JUST BEWARE THAT THIS COUNTS THE HEADER ROW AS WELL AS THE DATA ROWS
    
def detect_missing_telemetry_files_for_sensedoc_users(csvfile, telemetry_dir):
    """
    Given a CSV linkage table and a telemetry_dir, create a list of all iid/sdid pairings
    expected (from the csv) and all the pairings present (from the directory), then
    report any missing or unexpected pairings found in either set.
    """
    numiids = len(device_assignments)
    numdirsfound = 0
    numdirsexpected = 0
    
    # Look for device pairings specified in the linkage that are missing from the telemetry directory
    for iid in device_assignments:
        numdirsexpected += len(device_assignments[iid])
        for sdid in device_assignments[iid]:
            pairstring = f"{iid}_{int(sdid):03}"
            user_telemetry_dirname = os.path.join(telemetry_dir, pairstring)
            if not os.path.isdir(user_telemetry_dirname):
                log(f"No telemetry directory found for expected pairing '{pairstring}'")
                continue
            numdirsfound += 1
            # Now see if there's an SDB in that directory with the expected prefix
            r = re.compile(f"SD{int(sdid)}fw.*sdb$")
            telemetry_files = [x for x in os.listdir(user_telemetry_dirname) if r.match(x)]
            if not telemetry_files:
                log(f"No SDB file found for expected pairing '{pairstring}'")
            filepaths_for_ingest.extend([os.path.join(user_telemetry_dirname,x) for x in telemetry_files])
    log(f"Checking {numiids} users and found {numdirsfound} of expected {numdirsexpected} dirs")

    # Now look for device pairings found in the telemetry directory that are not specified in the linkage
    unknown_contributors = []
    unexpected_device_assignments = []
    for file in os.listdir(telemetry_dir):
        if not os.path.isdir(os.path.join(telemetry_dir, file)): # skip non-directories
            continue
        iid,bar,sdid = file.partition('_')
        # log(f"User {iid} produced data on SD {sdid}")
        if iid not in device_assignments:
            unknown_contributors.append(iid)
            #log(f"Unknown data contributor: {iid}")
        else:
            if not sdid in device_assignments[iid]:
                assignment_str = f"{iid}_{int(sdid):03}"
                unexpected_device_assignments.append(assignment_str)
                #log(f"Unexpected telemetry data found for {assignment_str}")
    log(f"There were {len(unknown_contributors)} unexpected contributors:")
    if unknown_contributors:
        log(f"{unknown_contributors}")
    log(f"There were {len(unexpected_device_assignments)} unexpected device assignments found")
    if unexpected_device_assignments:
        log(f"{unexpected_device_assignments}")

    
log("Code has been loaded.")

[2021-08-17 09:47:43] → Code has been loaded.


**After running the block:** A collection of helper functions have been loaded, but nothing has been executed. The output above should simply confirm that the code has been loaded. If an error was reported, it is most likely either a syntax error in the code or a missing import statement. Whatever it was, that needs to be fixed before continuing.

In [11]:
# Now parse the linkage file and be sure it's well formed.

ignorables = set([])
usercount = 0
notsensedoc = 0
colnames = ['sensedoc1_wear_start_date', 'sensedoc1_wear_end_date',
            'sensedoc2_wear_start_date', 'sensedoc2_wear_end_date']
iids = set([])
eids = set([])
sdids = set([])
culled_eids = set([])
culled_sdids = set([])
device_assignments = {}
device_wear_windows = {}
filepaths_for_ingest = []
participant_study = {} # a list of which study each participant is registered in


# read the linkage file and validate key data fields
with open(linkage_file,'r',encoding='ISO-8859-1') as fcsv:
    reader = csv.DictReader(fcsv,delimiter=',')
    # optimization, should probably reduce sdid values to set
    # of unique strings first and then validate those
    for rownum,row in enumerate(list(reader)):
        if row['sensedoc1_id'] or row['sensedoc2_id'] or row['interact_id'] or row['ethica_id']:
            usercount += 1
        iid = row['interact_id']
        #log(f"Reading CSV row {rownum}")
        pat = 'sensedoc(?P<index>\d)_id'
        row_sdids = {}
        for key in row.keys():
            m = re.match(pat,key)
            if m:
                index = int(m.group('index'))
                if index and row[key]:
                    sdid = validate_sdid(row[key], logfailures=False)
                    if sdid:
                        row_sdids[index] = sdid
                    else:
                        log(f"Linkage file specifies non-numeric device id '{row[key]}' for user {iid}. Ignoring.")
                        #log(f"{index}th sdid in the row is {row_sdids[index]}")
        if len(row_sdids):
            log(f"User {iid} was assigned {len(row_sdids)} devices")    
        else:
            notsensedoc += 1
            #log(f"CSV row #{rownum} has no sdid")
            continue # we're only interested in sensedoc users
        if not iid.strip() or not validate_iid(iid, logfailures=False): # filter C 
            log(f"Sensedoc user {'/'.join(row_sdids)} has no iid")
            continue
        #log(f"CSV row #{rownum} with iid {iid} has at least one sdid {sdid1} {sdid2}")
        if 'data_disposition' in row.keys():
            if 'cullsd' in row['data_disposition']:
                log(f"Culling SD telemetry data from ethica user {iid}, as instructed by linkage record")
                culled_sdids.update(row_sdids.values())
                # We leave the sdids in the list, so that they will not show up as unexpected ids, 
                # but we also alert the operator that telemetry from this id will be culled at ingest time
            if 'ignore' in row['data_disposition']:
                log(f"Ignoring record for ethica user {eid}, as instructed by linkage record")
                ignorables.update(row_sdids.values()) # keep track of which IDs have been intentionally ignored
                continue
        device_assignments[iid] = []
        for index in row_sdids:
            sdid = row_sdids[index]
            if not validate_csv_dates(row, [f"sensedoc{index}_wear_start_date", f"sensedoc{index}_wear_end_date"]):
                log(f"Problem with date format in CSV, line {rownum}, for the {index}th assignment")
                continue
            device_assignments[iid].append(f"{sdid:03}")
            device_wear_windows[(iid,sdid)] = (row[f"sensedoc{index}_wear_start_date"][:10], row[f"sensedoc{index}_wear_end_date"][:10])

log(f"There were {usercount} participant records in linkage file.")
log(f"{notsensedoc} were not SenseDoc users.")
log(f"{len(device_assignments)} were SenseDoc users with {len(sdids)} distinct devices.")
if len(device_assignments) + notsensedoc == usercount:
    log(f"CONFIRMED: Number of SenseDoc plus non-SenseDoc users equals number of linkage records.")
else:
    log(f"PROBLEM: SenseDoc users and non-SenseDoc users do not add up to number of linkage records.")
log(f"{len(ignorables)} were flagged as 'ignore'. (Probably test users.)")
log(f"{len(culled_sdids)} were flagged as 'cullsd'. (Probably due to known missing or corrupt data.)")

# add the ignorables to the id list so that they will not be
# reported as unexpected
sdids = sdids.union(ignorables)

log(f"There are now {len(sdids)} known SenseDoc ids in the user list.")
if len(device_assignments) >= len(sdids) >= len(device_assignments)/3.0:
    log("CONFIRMED: Number of known device ids seems reasonable.")
elif len(sdids) < len(device_assignments)/3.0:
    log("PROBLEM: Number of known device ids seems unusually low.")
else:
    log("PROBLEM: Number of known device ids seems unusually high.")
log(sorted(sdids))

[2021-08-17 09:47:57] → Linkage file specifies non-numeric device id 'NA' for user 302000096. Ignoring.
[2021-08-17 09:47:57] → Linkage file specifies non-numeric device id 'NA' for user 302000096. Ignoring.
[2021-08-17 09:47:57] → Linkage file specifies non-numeric device id 'NA' for user 302000096. Ignoring.
[2021-08-17 09:47:57] → Linkage file specifies non-numeric device id 'NA' for user 302001622. Ignoring.
[2021-08-17 09:47:57] → Linkage file specifies non-numeric device id 'NA' for user 302001622. Ignoring.
[2021-08-17 09:47:57] → Linkage file specifies non-numeric device id 'NA' for user 302001622. Ignoring.
[2021-08-17 09:47:57] → Linkage file specifies non-numeric device id 'NA' for user 302002484. Ignoring.
[2021-08-17 09:47:57] → Linkage file specifies non-numeric device id 'NA' for user 302002484. Ignoring.
[2021-08-17 09:47:57] → Linkage file specifies non-numeric device id 'NA' for user 302002484. Ignoring.
[2021-08-17 09:47:57] → Linkage file specifies non-numeric devic

**After running the block:** The output above is telling you how many records were processed in the linkage.csv file, how many were assigned SenseDocs and how many were not. If the number of SenseDoc users and non-SenseDoc users don't add up to the number of user records processed, that will be reported with a PROBLEM line instead of the usual CONFIRMED line. Figure out what's going on before continuing.

Also look at the number of users reported as 'ignore' and 'cullsd'. These are users for whom we do not expect to find telemetry data. Initially, these will probably both be zero, but as we work our way through the process and discover test users, corrupt data, missing data, etc., we will flag those records in the linkage.csv so that the ingest protocol knows how to handle them. Be sure you understand both number reported here and that they make sense to you, before you continue.

Lastly, a count of the unique SenseDoc ids is reported, along with a list of their actual values. Be sure the id numbers look reasonable (usually a 3-digit integer). Also confirm that the number of distinct ids seems reasonable. Usually, it tends to be about 1/2 or 1/3 of the number of total SenseDoc users in the study, but if it seems off to you, consult with the regional coordinator to find out how many physical devices were used in the study. The two numbers should match.

In [12]:
# And now we can verify the telemetry file against the individual records we just loaded.
detect_missing_telemetry_files_for_sensedoc_users(linkage_file, telemetrydir)

log(f"Found {len(filepaths_for_ingest)} ingestable SDB files")
log("Telemetry files all probed.")
# Users 2032 and 2593 expected to be missing GPS data for Victoria

[2021-08-17 09:48:36] → No telemetry directory found for expected pairing '302955394_383'
[2021-08-17 09:48:44] → No SDB file found for expected pairing '303846901_421'
[2021-08-17 09:48:45] → Checking 32 users and found 33 of expected 34 dirs
[2021-08-17 09:48:45] → There were 0 unexpected contributors:
[2021-08-17 09:48:45] → There were 0 unexpected device assignments found
[2021-08-17 09:48:45] → Found 44 ingestable SDB files
[2021-08-17 09:48:45] → Telemetry files all probed.


**After running the block:** Read the output carefully. The first time you run it for a city, it will probably report a number of missing files and directories. This is where we catch the discrepancy between what the linkage file says happened, and what actually happened.

**No SDB file found for...:** This is usually caused by an SDB file that was created but not given the proper filename. Look in the indicated directory for a .sdb file. A properly named file should be in the form SD{DEVICEID}fw{REVNO}_{DATE}_{TIME}.sdb. The **sdb_generate_fname** script can often reconstruct the correct filename, but just as often it fails for the same reasons that the file was originally misnamed. The DEVICEID can be taken from the second half of the folder name; the REVNO may be encoded into some of the other filenames in the directory; and the timestamp info can be extracted from the SDB file using the **sdb_dates** script. Once you've gathered all these elements, rename the SDB file manually to conform to this format. 

Also, remember to update the .prvlog sidecar file to point to the new filename.

**No telemetry directory found for expected pairing...:** For some reason, no data was collected from that device assignment. This is usually a case of a directory being misnamed. So look at the 'unexpected contributors'. Chances are good that there's a typo in the directory name that's causing both entries. But if not, consult with the coordinator to see if there are other explanations.

### Ingest Linkage

Once the CSV file is known to be valid and complete, we can ingest those records into the sensedoc_assignments table, either adding records or updating them as necessary.

In [13]:
# Create new users based on records found in linkage file
create_sql = """
    INSERT INTO portal_dev.sensedoc_assignments (interact_id, sensedoc_serial, city_id, wave_id, started_wearing, stopped_wearing)
    VALUES (%s, %s, %s, %s, %s, %s);           
    """

# Open a database session
with psycopg2.connect(user=db_user, host=db_host, port=db_host_port, database='interact_db') as conn:
    cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    for iid,sdid in device_wear_windows:
        start_date, end_date = device_wear_windows[(iid,sdid)]
        sql = f"""SELECT count(1) as count from portal_dev.sensedoc_assignments
                  WHERE interact_id = {iid} 
                  AND city_id = {city_num}
                  AND wave_id = {wave_num}
                  AND sensedoc_serial = {sdid};
                  """
        cur.execute(sql)
        row = cur.fetchone()
        if row['count'] == 0:   
            cur.execute(create_sql, (iid, sdid, city_num, wave_num, start_date, end_date))
            if cur.rowcount == 1:
                log(f"Interact User {iid} not previously assigned to SD {sdid} for city {city_num} wave {wave_num}. Created.  ({start_date} - {end_date})")
            else:
                log(f"Problem trying to create participation record for user {iid} with SD {sdid} for city {city_num} wave {wave_num}")
        else:
            log(f"INTERACT USER {iid} ALREADY ASSIGNED TO SD {sdid} FOR CITY {city_num} WAVE {wave_num}") 
            
    # Verify that we have the expected number of assignments for this city/wave
    sql = f"""SELECT count(1) as count from portal_dev.sensedoc_assignments
              WHERE city_id = {city_num}
              AND wave_id = {wave_num};
              """
    cur.execute(sql)
    row = cur.fetchone()
    num_in_db = row['count']
    if num_in_db == len(device_wear_windows):
        log(f"CONFIRMED: Number of SD assignments in DB for city {city_num} wave {wave_num} is as expected ({num_in_db})")
    else:
        log(f"PROBLEM: Expected to find {num_in_db} SD assignments in DB for city {city_num} wave {wave_num} but found {len(device_wear_windows)})")

[2021-08-17 09:51:18] → Interact User 302045191 not previously assigned to SD 483 for city 3 wave 2. Created.  ( - )
[2021-08-17 09:51:18] → Interact User 302078583 not previously assigned to SD 468 for city 3 wave 2. Created.  (2020-10-17 - 2020-10-26)
[2021-08-17 09:51:18] → Interact User 302319371 not previously assigned to SD 391 for city 3 wave 2. Created.  (2020-10-24 - 2020-11-02)
[2021-08-17 09:51:18] → Interact User 302387519 not previously assigned to SD 460 for city 3 wave 2. Created.  (2021-01-05 - 2021-01-19)
[2021-08-17 09:51:18] → Interact User 302417689 not previously assigned to SD 347 for city 3 wave 2. Created.  (2020-10-17 - 2020-10-26)
[2021-08-17 09:51:18] → Interact User 302515834 not previously assigned to SD 319 for city 3 wave 2. Created.  (2020-10-30 - 2020-11-01)
[2021-08-17 09:51:18] → Interact User 302515834 not previously assigned to SD 403 for city 3 wave 2. Created.  (2020-11-21 - 2020-11-28)
[2021-08-17 09:51:18] → Interact User 302539875 not previousl

**After running the block:** The output above will list all new user records added to the DB as well as the records that were not added because they already exist. If you've never run this block before for this city/wave, then most of the records will be new, but if you have, then most/many of them will already exist. What matters is that after executing, the number in the table for this wave/city matches what the linkage file tells us we should have.

## Telemetry Data
Ingesting telemetry data happens in two distinct phases:
1. telemetry data from a user's SDB file is copied into a temporary table in the PostgreSQL DB
1. the temporary table is copied into the main data table, with interact_ids added and illegal records filterd out

The temporary tables created for a given CITY are:
 - level_0.{CITY}_w2_sd_gps
 - level_0.{CITY}_w2_sd_xls

### Process Details

Ideally, we want to use the psql COPY command, since it has been
optimized for fast loading, but the target telemetry table and the
incoming data file have different column names. We *COULD* just rename 
the columns in the CSV file, but that's a manual step that shouldn't
be embedded into the process.

So instead, we'll load the data into a temporary table, and then transfer it from there into the target table and match the records to their interact_ids at the same time. This method will more or less double the ingest time, but having a reliable ingest process that can be fully documented, without manual interventions, seems like the more robust path.

In [14]:
# declare a few ingest-specific functions

# get a list of ethica_ids that are supposed to be culled from the ingest
def get_ids_to_be_culled(csvfilepath):
    culls = set([])
    with open(csvfilepath,'r') as fh:
        reader = csv.DictReader(fh)
        # for each line, determine whether the record already exists
        for record in reader:
            #if 'cullsd' in record['data_disposition'] and not 'ignore' in record['data_disposition']:
                # a record that is both culled and ignored would not have been counted in the expected row count
            if 'cullsd' in record['data_disposition']:    
                culls.add(record['ethica_id'])
    if culls:
        log(f"Ethica IDs to be culled as per linkage file:")
        log(f"{culls}")
    else:
        log("Linkage file includes no cull requests.")
    return culls

def get_ids_to_be_ignored(csvfilepath):
    ignores = set([])
    with open(csvfilepath,'r') as fh:
        reader = csv.DictReader(fh)
        # for each line, determine whether the record already exists
        for record in reader:
            #if 'cullsd' in record['data_disposition'] and not 'ignore' in record['data_disposition']:
                # a record that is both culled and ignored would not have been counted in the expected row count
            if 'ignore' in record['data_disposition']:    
                ignores.add(record['ethica_id'])
    if ignores:
        log(f"Ethica IDs to be ignored as per linkage file:")
        log(f"{ignores}")
    else:
        log("Linkage file includes no ignore requests.")
    return ignores
log("Functions loaded")

[2021-08-17 09:52:11] → Functions loaded


**After running the block:** Nothing more to do. This one just declares a few useful functions for the ingest process.

Due to the way they are administered, SenseDoc devices often contain telemetry that was contributed by the project coordinator in addition to tracking the participant. To filter this, each device assignment record includes a start and end date (the wear-date window) to limit the ingest to just data contributed by the intended participant. Consequently, these files have to be very carefully vetted before ingesting. There are 2 things we need to check before accepting the data.

1) Drop data files that do not contain telemetry data from within the expected wear-date window
2) If a user contributes data in more than one file, and the data windows for any of those files overlap, drop all those user's files

In [17]:
import sqlite3
import dateparser
from itertools import combinations

gps_file_row_counts = {}
min_sample_threshold = 3600 #equates to 1 hr of usable data
expected_gps_rows = 0
detect = False # not currently trying to debug a memory leak

def debug(instr):
    if False:
        timestr = datetime.now().strftime("[%H:%M:%S] ")
        print(f"{timestr}→ {instr}")

def select_usable_files(filelist, iidstr):
    """
    Given a list of SDB filepaths for a single participant, perform basic acceptability tests on their contents.
    If the set contains acceptable data, return a list of them. Otherwise, return None.

    The entire set should be considered unacceptable if:
        - the timespans of two or more datafiles overlap
        - the total number of samples across all the files < threshold
    A specific file is removed from consideration if it contains no usable data.
    """

    # Accel tables are MUCH denser and take a lot longer to scan.
    # So to speed up the process, I'm assuming that if the gps table
    # contains good dates and data, the accel table will as well.
    sql_min = "SELECT utcdate from gps order by ts ASC limit 1"
    sql_max = "SELECT utcdate from gps order by ts DESC limit 1"
    sql_n   = "SELECT count(1) as n from gps"

    fstats = []
    for fpath in filelist:
        # open the file and compute its date range and record count
        mindate = 0
        maxdate = 0
        count = 0
        try:
            # log("Connecting to source file %s"% fpath)
            with sqlite3.connect(fpath) as conn:
                c = conn.cursor()

                mindate = 0
                c.execute(sql_min)
                row = c.fetchone()
                if row:
                    mindate = row[0]

                maxdate = 0
                c.execute(sql_max)
                row = c.fetchone()
                if row:
                    maxdate = row[0]

                count = 0
                c.execute(sql_n)
                row = c.fetchone()
                if row:
                    count = int(row[0])

                # hang onto these counts for final stats
                # after the ingest is finished
                gps_file_row_counts[fpath] = count

        except Exception as e:
            log(f"   CAUGHT A SQL ERROR WHILE COUNTING RECORDS: {e}")

        # make a note of those stats if they're usable
        if mindate and maxdate and count:
            log(f"   Keeping {'/'.join(fpath.split('/')[-3:])}")
            fstats.append([mindate,maxdate,count,fpath])
        else: # otherwise leave this file out of the list
            log(f"   Skipping {'/'.join(fpath.split('/')[-3:])} (Unacceptable dates: {mindate}, {maxdate}, {count})")
            
    if len(fstats) > 1:
        # compare all possible pairings of files and reject
        # this filelist if any pair has overlap between
        # their min and max date stamps.
        debug(f"Testing overlaps for {iidstr}")
        for stat1,stat2 in combinations(fstats,2):
            if test_overlapping_daterange(stat1[0], stat1[1],
                                          stat2[0], stat2[1]):
                log(f"   Overlapping timestamps for {iidstr} found between files")
                debug("  %s" % (stat1[3]))
                debug("    %s -> %s"%(stat1[0], stat1[1]))
                debug("    %s -> %s"%(stat2[0], stat2[1]))
                debug("  %s" % (stat2[3]))
                return None
            else:
                debug("Overlap clean.")
        # total the number of samples from all files and
        # reject the filelist if the number of samples does not
        # meet the minimum threshold

        # COULD THIS BE WHERE THE HIGH NUM OF DROPPED ROWS COMING FROM?
        # No, be, because files that get dropped here never get sent
        # to the ingestor, so their counts not added to expected_gps_rows

        contribution = sum([x[2] for x in fstats])
        if contribution < min_sample_threshold:
            log(f"   Not enough samples from participant {iidstr} ({contribution})")
            return None
        return fstats
    elif len(fstats) == 1:
        # if there's only one file, there can't be any overlap
        # so just look at record count
        if fstats[0][2] >= min_sample_threshold:
            return fstats

    log(f"Not enough data from participant {iidstr}")
    return None

def ingest_user_files(iidstr, filepathlist):
    """
    Given an interact_id string and a list of filepaths, perform basic acceptance tests on the data files and
    if they pass, load them into the database, tagged with the iid. Returns True if ingest successful.
    """
    # Perform basic acceptability tests on the candidate files
    # and reject any files that do not meet standards
    ingest_file_stats = select_usable_files(filepathlist, iidstr)

    if not ingest_file_stats:
        log(f"   FILES FOUND WERE NOT USABLE")
        return False

    debug(f"User {iidstr} has ingestible files.")

    # now actually ingest the datafiles
    # not all the sdb files will ingest properly, because some contain duplicate timestamps
    # but as long as one sdb loads cleanly, we will consider this user to be successful
    success = False
    for [min_dt, max_dt, count, sdbfile] in ingest_file_stats:
        result = ingest_sdb_file(sdbfile, city_num, wave_num, iidstr)
        success = success or result

    return success

def test_overlapping_daterange(dt1_st, dt1_end, dt2_st, dt2_end):
    """
    Given the start and end of two different date ranges, determine
    whether the ranges overlap.
    """
    # Make sure each start and end pair are in sorted order
    start1 = min(dt1_st, dt1_end)
    end1 = max(dt1_st, dt1_end)
    start2 = min(dt2_st, dt2_end)
    end2 = max(dt2_st, dt2_end)

    # If one range occurs entirely before the other, there is no overlap.
    # Otherwise there is.
    return not (end1 < start2 or start1 > end2)

def get_participant_wear_dates(iid,city,wave,serial):
    """Given user information, get wear dates from linkage table."""

    # When a sensedoc device gets a new firmware, it is essentially a
    # new device, since it no longer behaves like the old one. To
    # account for this, the ingest code includes both the devid and
    # the revision number as part of the sensedoc_serial number.
    # Unfortunately, the use of the term "serial numbe" is a bit
    # overloaded, so I'm trying to be very careful about which version
    # I'm using in which situation.
    # The sensedoc itself has a record of its serial number, and in
    # that context, it is only revering to the device id portion,
    # without the firmware revision.
    # So in this section of code, I'm going to strip it down and
    # call it the devid.
    if '-' in serial:
        devid = serial.split('-')[0]
    else:
        devid = serial
        log(f"The given serial number ({serial}) was not in compound form.")

    # now we can construct the SQL query
    sql = """
        SELECT  started_wearing as start, stopped_wearing as end
        FROM portal_dev.sensedoc_assignments
        WHERE interact_id = {iid}
        AND city_id = {city}
        AND wave_id = {wave}
        AND sensedoc_serial = {devid};
        """.format(devid=devid,city=city,wave=wave,iid=iid)
    debug(f"Getting wear window with sql: {sql}")
    wear_start = ''
    wear_end = ''
    try:
        with psycopg2.connect(user=db_user,
                            host=db_host,
                            port=db_host_port,
                            database=db_name) as conn:
            c = conn.cursor()
            debug("Looking up participant wear dates with SQL:")
            debug(sql)
            c.execute(sql)
            row = c.fetchone()
            if row:
                # if not (isinstance(row[0], str) or isinstance(row[1], str)):
                # if both date fields have content
                if str(row[0]).strip() and str(row[1]).strip():
                    wear_start = row[0]
                    wear_end = row[1]
                else:
                    log("Empty wear dates for serial: '%s'"%serial)
                debug("Result was:")
                debug(row)
            else:
                log("Unable to find wear dates for serial: '%s'"%serial)
    except Exception as e:
        log(f"SQL CALL FAILED: {e}")
    # The protocol requires that the declared start date in
    # the linkage table be the day the participant actually
    # began wearing the device - not the day they received it.
    # Also assumes the declared final day of wearing includes
    # any data up to 3am of the following day.
    # So to implement this 'look-ahead', we'll
    # advance the wear_end to 03:00 the following day
    if wear_end.strip():
        wear_end = dateparser.parse(wear_end[:10] + " 03:00:00")
        wear_end += timedelta(days=1)
    return wear_start, wear_end


def create_table_prefix(fname, iidstr):
    """
    Given a filename and an iid, combine them to create a compact
    name for a temp table.
    """
    prefix = "%s_" % iidstr
    prefix += ''.join([x for x in fname if x.isalnum()])

    return prefix


def execute_copy_via_shell(filepath, city, wave, iid, serial, tableid, wear_start, wear_end, tablename, selectionsql):
    """
    On the command line we could run a single command to
    pull data from the sqlite file with sqlite3 and ingest into the
    postgres by piping the output directly to psql.

    In simple form, that console command would be:
    sqlite3 somefile.sdb SELECTIONSQL | psql interact_db INSERTSQL

    In the simple form of the command line shown above, the two
    key elements are SELECTIONSQL, which is the SQL fragment used to
    select a table's content out of the target SQLite file, and the
    INSERTSQL, which is the SQL fragment used to load the incoming
    data into the PostgreSQL database.

    Unfortunately, the data is known to contain occasional duplicate
    records, which happen when the SD device syncs its onboard clock
    with the real world. In these cases, the most accurate data for
    that timestamp are the values associated with the last record
    recorded, not the first.

    To drop those redundant timestamps, we have to wait until after
    we've ingested the file, because the /COPY command does not
    permit ON CONFLICT or DISTINCT clauses.

    Furthermore, we can't use ON CONFLICT to drop dups in the main
    table because the main uses a brin index which can't test for
    uniqueness.

    So we load the entire SDB file into a temporary table.
    Then we copy to a second temp table that has a btree index.
    At the same time, we drop any records outside the wear window.
    Then we finally absorb that clean incoming table into the main table.
    """
    
    # Assemble the SQL commands
    full_tablename = f"{db_schema}.{tablename}"
    raw_tablename = f"tmpA_{tableid}"
    filtered_tablename = f"tmpB_{tableid}"
    idx_name = f"tmpB_{tableid}_btree_idx"
    log(f"   Loading tables for {tableid}")
    ingestcmd = f"""
             SET SCHEMA '{db_schema}';
             CREATE TABLE {raw_tablename} (LIKE {tablename});
             CREATE TABLE {filtered_tablename} (LIKE {tablename});
             ALTER TABLE {filtered_tablename} ADD CONSTRAINT {idx_name} UNIQUE (iid,ts);

             -- load the raw table
             COPY {raw_tablename} FROM STDIN delimiter '|' CSV;

             -- copy the raw table to the nodups tbl
             -- while dropping records outside wear window
             -- and dropping duplicate records as well
             INSERT INTO {filtered_tablename}
                SELECT * FROM {raw_tablename}
                WHERE ts >= '{wear_start}'
                AND   ts <= '{wear_end}'
             ON CONFLICT (iid,ts) DO NOTHING;
             
             -- now merge the filtered table into the main table
             INSERT INTO {tablename}
                SELECT * FROM {filtered_tablename};
                
             -- and then clean up the no-longer needed tables
             DROP TABLE IF EXISTS {raw_tablename};
             DROP TABLE IF EXISTS {filtered_tablename};

             -- and commit the transaction, all of which will
             -- be ignored and unrolled if any of the intermediate
             -- steps fail
             COMMIT;
             """

    # NOTE: There's still a minor issue here. We tend to want to ingest
    # an entire wave at a time, but if something goes wrong partway
    # through, we don't have an elegant way to rollback the ingest and
    # start it again. As it stands, running an ingest a second time
    # should work fine, since the INSERT ON CONFLICT will simply ignore
    # records that were already ingested. This will admittedly take up
    # more time, loading all the previously ingested participants
    # only to ignore their records at copy time. But processing the
    # entire wave and ignoring records we already have seems the more
    # robust solution as it is less likely to accidentally omit data
    # by trying to be clever about skipping redundant ingests.

    # Tell sqlite where the temp directory is
    cmdline = 'SQLITE_TMPDIR=~/scratch '

    # Load the raw data into a temp table
    cmdline += f'sqlite3 {filepath} "{selectionsql}" | psql -h {db_host} -p {db_host_port} -U {db_user} -q -e -c "{ingestcmd}" {db_name}'
    debug("Load command line: %s" % cmdline)
    #res = sub.call(cmdline, shell=True)
    res = sub.run(cmdline, shell=True, stderr=sub.PIPE)
    # res = sub.check_output(cmdline, shell=True,stderr=sub.STDOUT)
    # res = res.decode('utf-8').strip()
    if res.returncode:
        # If table load fails, the ingest for that participant
        # should be reported as unsuccessful.
        # But if the error was caused by duplicate time values, it will fail with an 
        # error that contains: duplicate key value violates unique constraint
        # in this case, the user file has indeed failed to load, but it is not
        # an ingest error. In this case, we still return the False, since the
        # file did not ingest, but we should report the situation more clearly.
        if "duplicate key value violates unique constraint" in res.stderr.decode('utf-8'):
            log(f"   SDB FILE CONTAINS DUPLICATE TIME RECORDS. SKIPPING.")
            return False
        else:
            log(f"Ingesting file {filepath} failed with return code: {res.returncode}")
            debug(f"ERR: {res.stderr}")
        return False

    return True 


def ingest_sdb_file(filepath, city, wave, iidstr):
    """
    Given an SDB data file, load its tables into the DB,
    tagged with the iid of the participant who produced it.
    Return True if the file ingests properly, False otherwise.
    """
    refDate = ''
    # We counted file rows when we were validating the files,
    # so now that we know this file is being ingested, add its
    # row count to the total number of rows we're trying to ingest.
    global expected_gps_rows
    if filepath in gps_file_row_counts:
        expected_gps_rows += gps_file_row_counts[filepath]
    else:
        log("   SDB FILE was never row-counted.")

    log("   Ingesting file: %s"%os.path.basename(filepath))
    try:
        with sqlite3.connect(filepath) as conn:
            # get the reference date from the SDB from which all
            # timestamps are measured
            c = conn.cursor()
            sql = "SELECT value FROM ancillary WHERE key='refDate'"
            debug("   Getting refDate with: %s"%sql)
            c.execute(sql)
            refDate = c.fetchone()[0]
            debug("   Got refDate of: %s"%refDate)
    except Exception as e:
        log("   Caught a SQL error while ingesting %s"%filepath)
        log(e)


    if not refDate.strip(): #strip to be sure it isn't just spaces
        log("   Unable to get refDate from file %s"%filepath)
        return False
    # Each table in the SQLite file needs its own SELECT stmt
    # to ensure data is read from the file in a known format.
    selectionsql = {}
    # process gps first so that failures on duplicate entries happen sooner
    # and we avoid processing the more expensive accel table in those cases
    selectionsql['sd_gps'] = f"""
        SELECT '{iidstr}' AS iid,
            strftime('%Y-%m-%d %H:%M:%f', '{refDate}',
                    (ts/1000000.0)||' seconds') as ts,
                lat, lon,
                speed,
                course,
                -- ignoring mode, fix,
                -- ignoring mode1, mode2,
                -- following fields set to NODATA (-9999) if null
                IFNULL(alt,-9999),
                IFNULL(sat_used,-9999),
                IFNULL(pdop,-9999),
                IFNULL(hdop,-9999),
                IFNULL(vdop,-9999),
                IFNULL(sat_in_view,-9999)
            FROM gps
            WHERE   (lat is not null)
                AND (lon is not null)
                AND (lat BETWEEN -90 and 90)
                AND (lon BETWEEN -180 and 180)
                AND (alt is NULL or alt BETWEEN -10000 and 100000)
                AND (course is NULL or course BETWEEN -360 and 360)
                AND (speed is NULL or speed BETWEEN 0 and 1000)
                AND (sat_used is NULL or sat_used BETWEEN 0 and 50)
                AND (sat_in_view is NULL or sat_in_view BETWEEN 0 and 50)
                -- this clause filters out duplicate timestamps
                AND (rowid in (SELECT max(rowid) FROM gps GROUP BY ts))
            ORDER BY ts, ROWID DESC;
        """
    debug(f"SQL for gps:\n{selectionsql['sd_gps']}")
    selectionsql['sd_accel'] = f"""
                       SELECT '{iidstr}' AS iid,
                            strftime('%Y-%m-%d %H:%M:%f', '{refDate}',
                                    (ts/1000000.0)||' seconds') as ts,
                            x * 0.00390625 as x, ---convert to 0.0-1.0
                            y * 0.00390625 as y, ---convert to 0.0-1.0
                            z * 0.00390625 as z  ---convert to 0.0-1.0
                       FROM accel
                       WHERE x is not null
                         AND y is not null
                         AND z is not null
                         -- this clause filters out duplicate times
                         AND rowid in (SELECT max(rowid) FROM accel GROUP BY ts)
                       ORDER BY ts, ROWID DESC;
                       """
    debug(f"SQL for accel:\n{selectionsql['sd_accel']}")
        # NOTE: In the above queries, strftime() is used instead of
        # datetime() because datetime() truncates to seconds, whereas
        # the strftime() with %f keeps fractional seconds.
        # Also note that the .0 is important at the end of 1000000.0,
        # to preserve the floating-point nature of the result.
        # The rows are sorted by ts, and then  ROWID DESC because
        # we need to filter later by the order in which the rows were
        # written to the file, keeping only the latest instance in cases
        # where records share the same timestamp info.


    # based on the name of the file being processed, construct a
    # temporary tablename we can use for loading this data.
    # In some cases, we may want to leave the table around after
    # ingest (for forensic purposes) and if we want that to work
    # each table has to have a unique name.
    # tail = ''
    serial = ''
    basename = os.path.basename(filepath)
    tableprefix = create_table_prefix(basename, iidstr)
    serialpat = "SD(?P<num>\d*)fw(?P<rev>\d*)_(?P<tail>.*)\.sdb"
    m = re.match(serialpat, basename)
    if m:
        if not m.group('num'): # or not m.group('rev'):
            log("   NOTE: Malformed SDB filename: %s is missing device id" % basename)
            return False
        if not m.group('rev'):
            log("   NOTE: Malformed SDB filename: %s is missing OS revision number" % basename)
            return False
        serial = "%s-%s" % (m.group('num'), m.group('rev'))

    # we also need to know the wear dates for this file in order to
    # filter out extraneous records
    # import pdb; pdb.set_trace()
    wear_start, wear_end = get_participant_wear_dates(iidstr, city, wave, serial)
    if not wear_start or not wear_end:
        log("   No wear dates in linkage tbl for user %s city %s wave %s serial %s"%(iidstr, city, wave, serial))
        return False
    else:
        log("   Keeping records within wear dates: %s - %s."%(wear_start,wear_end))

    # Now ingest each of the required tables into the DB.
    success = True
    for main_tbl in selectionsql:
        tableid = '%s_%s' % (tableprefix, main_tbl)
        success = success and execute_copy_via_shell(filepath,
                                            city, wave,
                                            iidstr, serial, tableid,
                                            wear_start, wear_end,
                                            main_tbl,
                                            selectionsql[main_tbl])
    return success


log("Ingesting inventoried files...")
#for iidstr in ["401341482",]:
for n,iidstr in enumerate(iids):
    participant_files = list(set([x for x in filepaths_for_ingest if f"/{iidstr}_" in x])) #find all filepaths for this iid
    if participant_files:
        log(f"Inventory for {iidstr} {n+1}/{len(iids)} holds:")
        for f in participant_files: 
            log(f"   {'/'.join(f.split('/')[-3:])}")
        result = ingest_user_files(iidstr, participant_files)
        if result:
            log("   User contributed useful data")
        else:
            log("   NO USABLE DATA CONTRIBUTED")
            #log("Halting so we can debug the error that just occurred.")
            #break
    else:
        log(f"Inventory ? {iidstr}")
    #debug("Halting after one user to examine trace...")
    #break
log(f"All users' SDB files processed.")



[2021-08-17 10:04:53] → Ingesting inventoried files...
[2021-08-17 10:04:53] → Inventory ? 302955394
[2021-08-17 10:04:53] → Inventory for 302992006 2/32 holds:
[2021-08-17 10:04:53] →    saskatoon_02/302992006_408/SD408fw_20210406_191729.sdb
[2021-08-17 10:04:54] →    Keeping saskatoon_02/302992006_408/SD408fw_20210406_191729.sdb
[2021-08-17 10:04:54] →    Ingesting file: SD408fw_20210406_191729.sdb
[2021-08-17 10:04:54] →    NOTE: Malformed SDB filename: SD408fw_20210406_191729.sdb is missing OS revision number
[2021-08-17 10:04:54] →    NO USABLE DATA CONTRIBUTED
[2021-08-17 10:04:54] → Inventory for 302045191 3/32 holds:
[2021-08-17 10:04:54] →    saskatoon_02/302045191_483/SD483fw2106_20210416_114244.sdb
[2021-08-17 10:04:54] →    Keeping saskatoon_02/302045191_483/SD483fw2106_20210416_114244.sdb
[2021-08-17 10:04:54] →    Ingesting file: SD483fw2106_20210416_114244.sdb
[2021-08-17 10:04:55] → Empty wear dates for serial: '483-2106'
[2021-08-17 10:04:55] →    No wear dates in link

### Verification
As a final validation, compare the GPS and XLS tables to ensure that both have data from the same users.

In [18]:
with psycopg2.connect(user=db_user, host=db_host, port=db_host_port, database='interact_db') as conn:
    cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
   
    mismatch_sql = f"""
        SET SCHEMA '{db_schema}';
        DROP TABLE IF EXISTS tmpgpsiids;
        DROP TABLE IF EXISTS tmpxlsiids;
        CREATE TABLE tmpgpsiids AS SELECT DISTINCT iid FROM sd_gps;
        CREATE TABLE tmpxlsiids AS SELECT DISTINCT iid FROM sd_accel;
        SELECT tmpgpsiids.iid AS gps_iid, tmpxlsiids.iid AS xls_iid 
        FROM tmpgpsiids 
             RIGHT OUTER JOIN tmpxlsiids 
             ON tmpgpsiids.iid = tmpxlsiids.iid 
        WHERE tmpgpsiids.iid IS NULL 
           OR tmpxlsiids.iid IS NULL;
        """
    
    print(f"\nFigure: SD users with data present in only one table\n")
    cur.execute(mismatch_sql)
    rows = cur.fetchall()
    if rows:
        print(tabulate(rows, headers='keys', stralign='right'))
    else:
        print('No mismatches found.')

    # Interact_id 302562672 is known to have no GPS data for SSK
    # Interact_id 201680208 is known to have no GPS data for VAN
    # Interact_id 402495712 is known to have no GPS data for MTL SD W2


Figure: SD users with data present in only one table

No mismatches found.


Here endeth the ingest.