In [1]:
%matplotlib inline
import os
import subprocess as sub
import matplotlib.pyplot as plt
import psycopg2
import psycopg2.extras
#import seaborn as sns
import matplotlib
from tabulate import tabulate

# tell Seaborn that we're producing a document and not a slideshow or poster
#sns.set()
#sns.set_context('paper')

# expects to find connection credentials in local runtime environment
db_host=os.environ['SQL_LOCAL_SERVER']
db_host_port=int(os.environ['SQL_LOCAL_PORT'])
db_user=os.environ['SQL_USER']
db_name=os.environ['SQL_DB']
db_schema=os.environ['SQL_SCHEMA']

# city-specific parameters
wavenumstr = '01'
city_num = 4  # vic=1, van=2, ssk=3, mtl=4
city_prefix = 'mtl'
city_name = 'Montreal'
ethica_study_id = [395, 396] #van=376 #vic=212 ssk=448 mtl=395/396
datadir = f"/home/jeffs/projects/def-dfuller/interact/permanent_archive/{city_name}/Wave1/Ethica"
#telemetrydir = f"{datadir}/{city_name.lower()}_{wavenumstr}/raw"
telemetrydir = f"{datadir}/{city_name.lower()}_en_{wavenumstr}/raw"

#linkage_file = f"{datadir}/../INTERACT_Vancouver_ID_Somtam_20190320-flagged.csv"
#linkage_file = f"{datadir}/../Participant_Tracking_SASK_W1-norm-filtered-redated.csv"
linkage_file = f"{datadir}/../sensedoc_linkage-normalized.csv"

# Create a list of users who are to be specifically excluded from a particular study
# It's being done this way because the linkage CSV files do not encode anything about study numbers, and
# adding such a feature would break a lot of code, plus risk contamination of the CSV files through sloppy changes
# So since these cases are rare, I'm putting them here, at the head of the code where they should be 
# obvious and apparent.
suppressors = {5904:395}


def log(instr):
    print(f"→ {instr}")

# Ingest Log for Ethica Wave 1

City: {{city_name}}

After much investigation and discussion, we have decided to ingest the data in its raw form, with no filters applied, and then create a few obvious subtables from that ingested data. The reasoning is that, due to the nature of the data issues, there appears to be no single cleaning filter we can apply that will be appropriate for all downstream analysis. So we'll ingest the raw table, mark it as toxic so nobody uses it by accident, and then create a few sane starting points that people can actually use.

Building on the work done for the Saskatoon data, this notebook will attempt to parameterize the defining aspects of the ingest, so that the same notebook can be run to ingest all the other cities.

## Checklist
- Ensure that city-specific params have been set up in code section below
- Ensure that any users that are to be re-ingested from linkage have been removed from ethica_assignments table

## Prep
Steps to complete before ingesting the telemetry

- Verify the integrity of the linkage CSV file before ingesting it
- Ingest the linkage CSV file

### Verify Linkage CSV
This step will assess the CSV file to ensure it's in a parsable format and then confirm that the users listed  match those who have contributed telemetry data. If any errors or mismatches are reported here, the linkage file records should be updated until this verification test passes cleanly.

#### Possible problems and their solutions

##### User in linkage file but has no telemetry data
This can happen when a user was assigned an Ethica account, but either dropped out of the study or switched to SenseDoc before data collection began. If the study coordinator confirms that the user dropped out and produced no usable data, then mark the user with 'ignore' in the data_disposition field of the linkage file. This will exclude them from the list of ids who are expected to have telemetry data.

##### User has telemetry data but is not in linkage file
This usually happens when data got collected from a staff member who was testing the system. For these people, there *is* data, but we want to remove it from the ingest. So we don't just ignore these records, we have to actively cull the associated data. This is indicated by marking the data_disposition field with 'cull'.

**Should the information be placed in the linkage file or the linkage table?** The file is used at ingest validation time, while the table is used at actual telemetry ingest time, so probably both.

In [2]:
# Helper functions that will be used to verify the linkage CSV file

import os
import re
import sys
import csv
from datetime import datetime
import subprocess

iids = set([])
eids = set([])
culled_eids = set([])
participant_study = {} # a list of which study each participant is registered in

# In one city, the linkage encode ethica id as 'Ethica ID'
# In another, it is encoded as 'ethica_id'
# We may want to make that column name an input parameter

def validate_iid(idstr, logfailures=True):
    """
    Given an Interact ID in string form, ensure it's well formed.
    Return True if so, False otherwise.
    """
    if not len(idstr.strip()) > 8:
        if logfailures: log(f"Interact ID '{idstr}' is not long enough.")
        return False
    try:
        iid = int(idstr)
    except ValueError:
        if logfailures: log(f"Interact ID '{idstr}' is not an integer.")
        return False
    global iids
    iids.add(iid)
    return True


def validate_eid(idstr, logfailures=True):
    """
    Given an Ethica ID in string form, ensure it's well formed and
    unique. Return True if so, False otherwise.
    """
    try:
        eid = int(idstr)
    except ValueError: # filter B
        if logfailures: log(f"Ethica ID '{idstr}' is not an integer.")
        return False
    global eids
    if eid in eids: # filter A
        if logfailures: log(f"Redundant record for Ethica ID '{idstr}'.")
        return False
    eids.add(str(eid)) 
    # we only converted to int to be sure the string was int-friendly
    # all subsequent tests against id will be done as string compares
    return True


def validate_wear_dates(row):
    """
    Given an Ethica record, ensure it has well-formed start and
    end dates. Return True if so, False otherwise.
                            DO NOT USE!
    Wear dates are only valid for SenseDoc data. With SD, data
    capture begins the moment the device is booted, so it captures
    activity of the coordinator prior to delivery to the participant,
    and then again after the device has been recovered. Wear dates
    are used in that situation to filter out the coordinator data.
    But Ethica data is different and has no coordinator "pollution"
    that needs to be removed.

    I'm leaving this code here though, with this comment, so that
    nobody is tempted to add date filters back in at a later time.
    """
    return True
    try:
        start_str = row['start_date']
        end_str = row['end_date']
        start_date = datetime.strptime(start_str,"%Y-%m-%d") 
        end_date = datetime.strptime(end_str,"%Y-%m-%d") 
    except KeyError:
        log(f"EID '{row['ethica_id']}' has missing date field(s)")
        return False
    except ValueError:
        log(f"EID '{row['ethica_id']}' has missing/invalid date info {start_str} - {end_str}")
        return False
    if start_date >= end_date:
        log(f"EID '{row['ethica_id']}' has invalid date window {start_str} - {end_str}")
        return False
    log(f"EID '{row['ethica_id']}' has good date window {start_str} - {end_str}")
    return True


def get_possible_values_from_csv(filename, colnum):
    """
    Given a CSV file and the name of a column,
    return a list of all unique values in that column
    """
    values = set()
    # To show a helpul progress bar, we have to know how many
    # lines need to be processed, so we need to traverse the file
    # twice. This might seem like overkill, but the data files are
    # pretty big and can take hours to process. Showing signs of
    # life on the console while the script is churning away silently
    # in the background can be very reassuring.
    log(f"Scanning file: {filename}")
    #nlines = sum(1 for x in open(filename))
    #with open(filename,'r',encoding='ISO-8859-1') as fcsv:
    #    reader = csv.DictReader(fcsv,delimiter=',')
    #    for row in reader:
    #        values.add(row[colname])
            
    # TURNS OUT THE ABOVE IS HORRIBLY INEFFICIENT
    # IT'S ORDERS OF MAGNITUDE FASTER TO RUN A SHELL SCRIPT LIKE THIS:
    # cut -d, -f 1 Ethica/montreal_en_01/raw/gps.csv | sort -u
    
    # set up the piped commands
    skipcmd = ['tail', '-n', '+2', filename] # skip the first line, which has column names
    listuseridscmd = ['cut', '-d,', '-f', colnum] # user_id = first field of each row
    distinctifycmd = ['sort', '-u'] # create a list of distinct ids
    
    # run the piped commands
    p0 = subprocess.Popen(skipcmd, stdout=subprocess.PIPE)
    p1 = subprocess.Popen(listuseridscmd, stdin=p0.stdout, stdout=subprocess.PIPE)
    p2 = subprocess.Popen(distinctifycmd, stdin=p1.stdout, stdout=subprocess.PIPE, encoding='utf8')
    #p1.stdout.close()  # Allow p1 to receive a SIGPIPE if p2 exits.
    output,err = p2.communicate()
    
    # parse the output
    values = []
    for val in output.split('\n'):
        if val: # skip empty lines
            if val.isnumeric():
                values.append(val)
            else:
                log(f"Unexpected non-numeric entry in column {colnum} = '{val}'")

    return values

# Quick test of the above routine
#log("Testing function: get_possible_values_from_csv()")
#foo = get_possible_values_from_csv(os.path.join(datadir, 'montreal_en_01/raw/gps.csv'), '1')
#log(f"Found {len(foo)} users in English data.")
#foo = get_possible_values_from_csv(os.path.join(datadir, 'montreal_fr_01/raw/gps.csv'), '1')
#log(f"Found {len(foo)} users in French data.")

def detect_missing_values(master_values, csvfilelist, colnum, colname, streamname):
    """
    Compare the values in a particular column of a CSV against a
    master list of expected values and report values in the CSV
    that do not match the expectation.
    """
    found_values = set([])
    for csvfile in csvfilelist:
        studynum = csvfilelist[csvfile]
        log(f"File {csvfile} is associated with study number {studynum}")
        users_in_file = get_possible_values_from_csv(csvfile, colnum)
        for user in users_in_file:
            if user in participant_study and participant_study[user] != studynum:
                log(f"WARNING: User {user} has data in conflicting studies: {studynum} and {participant_study[user]}")
            participant_study[user] = studynum         
        found_values.update(users_in_file)
    
    #log(f"Found values: {sorted(list(found_values))}")
    #log(f"Master values: {sorted(list(master_values))}")

    missing_values = master_values - found_values - culled_eids
    log(f"{len(master_values)} user_ids found in master linkage CSV")
    log(f"{len(found_values)} distinct ethica_id values found in telemetry CSV data")
    if missing_values: # filter D
        log(f"{len(missing_values)} expected column {colnum} values missing from {streamname} data")
        log(','.join([str(x) for x in sorted(list(missing_values))]))
    else:
        log(f"All expected column {colnum} values found for {streamname}")
    unexpected_values = found_values - master_values
    if unexpected_values: # filter E
        log(f"{len(unexpected_values)} column {colnum} values in {streamname} data were unexpected:")
        log(','.join([str(x) for x in sorted(list(unexpected_values))]))
    else:
        log(f"All values found for column {colnum} in {streamname} were expected")
    #log(f"Master values: {sorted(list(master_values))}")
    #log(f"Found values: {sorted(list(found_values))}")
    #log(f"Missing values: {sorted(list(missing_values))}")
    #log(f"Unexpected values: {sorted(list(unexpected_values))}")
    
    
    # ALSO NEED TO POPULATE participant_study BASED ON APPEARANCE IN DATA STREAM
    
    # JUST BEWARE THAT THIS COUNTS THE HEADER ROW AS WELL AS THE DATA ROWS


In [3]:
# Now parse the linkage file and be sure it's well formed.

ignorables = set([])
usercount = 0
notethica = 0

# read the linkage file and validate key data fields
with open(linkage_file,'r',encoding='ISO-8859-1') as fcsv:
    reader = csv.DictReader(fcsv,delimiter=',')
    # optimization, should probably reduce eid values to set
    # of unique strings first and then validate those
    for rownum,row in enumerate(list(reader)):
        if row['ethica_id'] or row['interact_id'] or row['study_id']:
            usercount += 1
        #log(f"Reading CSV row {rownum}")
        eid = row['ethica_id']
        if not eid: 
            #log(f"CSV row #{rownum} has no eid")
            notethica += 1
            continue # not an ethica row
        if 'data_disposition' in row.keys():
            if 'cull' in row['data_disposition']:
                log(f"Culling telemetry data from ethica user {eid}, as instructed by linkage record")
                culled_eids.add(eid)
                # We leave the eid in the list, so that it will not show up as an unexpected id, 
                # but we also alert the operator that telemetry from this eid will be culled at ingest time
            if 'ignore' in row['data_disposition']:
                log(f"Ignoring record for ethica user {eid}, as instructed by linkage record")
                ignorables.add(eid) # keep track of which IDs have been intentionally ignored
                continue 
        if not validate_eid(eid, logfailures=False):
            notethica += 1
            continue # we're only interested in ethica users
        iid = row['interact_id']
        if not iid or not validate_iid(iid, logfailures=False): # filter C 
            log(f"Ethica user {eid} has no iid")
        iids.add(iid)
log(f"\nThere were {usercount} participant records in linkage file.")
#log(f"{notethica} were not Ethica users.")
log(f"{len(eids)} were Ethica users.")
log(f"{len(ignorables)} were flagged as 'ignore'. (Probably test users.)")
log(f"{len(culled_eids)} were flagged as 'cull'. (Probably due to known missing or corrupt data.)")

# add the ignorables to the eids so that they will not be
# reported as unexpected
eids = eids.union(ignorables)

log(f"There are now {len(eids)} known ethica ids in the master table.")
#log(eids)

→ Culling telemetry data from ethica user 9409, as instructed by linkage record
→ Ignoring record for ethica user 5582, as instructed by linkage record
→ Culling telemetry data from ethica user 5597, as instructed by linkage record
→ Culling telemetry data from ethica user 5891, as instructed by linkage record
→ Culling telemetry data from ethica user 5677, as instructed by linkage record
→ Ignoring record for ethica user 5695, as instructed by linkage record
→ Culling telemetry data from ethica user 6227, as instructed by linkage record
→ Culling telemetry data from ethica user 5930, as instructed by linkage record
→ Culling telemetry data from ethica user 5819, as instructed by linkage record
→ Culling telemetry data from ethica user 5974, as instructed by linkage record
→ Culling telemetry data from ethica user 5865, as instructed by linkage record
→ Culling telemetry data from ethica user 5795, as instructed by linkage record
→ Culling telemetry data from ethica user 6091, as instr

In [5]:
# And now we can verify the GPS telemetry file against the individual records we just loaded.
# But note that for cities with multiple telemetry directories, (ie Mtl), we need to examine
# all the data folders together, and for each file, we need to keep track of which study it
# is associated with.
telemetrydirs = {os.path.join(telemetrydir, 'gps.csv'):395, 
                 os.path.join(telemetrydir, 'gps.csv').replace('_en_', '_fr_'):396}
detect_missing_values(eids, telemetrydirs, '1',  'user_id', 'GPS')
log(f"Participant Study Map has {len(participant_study)} users.")
#log(participant_study)

→ File /home/jeffs/projects/def-dfuller/interact/permanent_archive/Montreal/Wave1/Ethica/montreal_en_01/raw/gps.csv is associated with study number 395
→ Scanning file: /home/jeffs/projects/def-dfuller/interact/permanent_archive/Montreal/Wave1/Ethica/montreal_en_01/raw/gps.csv
→ File /home/jeffs/projects/def-dfuller/interact/permanent_archive/Montreal/Wave1/Ethica/montreal_fr_01/raw/gps.csv is associated with study number 396
→ Scanning file: /home/jeffs/projects/def-dfuller/interact/permanent_archive/Montreal/Wave1/Ethica/montreal_fr_01/raw/gps.csv
→ 574 user_ids found in master linkage CSV
→ 553 distinct ethica_id values found in telemetry CSV data
→ All expected column 1 values found for GPS
→ All values found for column 1 in GPS were expected
→ Participant Study Map has 553 users.


In [6]:
# And again for XL telemetry
telemetrydirs = {os.path.join(telemetrydir, 'accelerometer.csv'):395, 
                 os.path.join(telemetrydir, 'accelerometer.csv').replace('_en_', '_fr_'):396}
detect_missing_values(eids, telemetrydirs, '1',  'user_id', 'Accel')
log(f"Participant Study Map has {len(participant_study)} users.")

→ File /home/jeffs/projects/def-dfuller/interact/permanent_archive/Montreal/Wave1/Ethica/montreal_en_01/raw/accelerometer.csv is associated with study number 395
→ Scanning file: /home/jeffs/projects/def-dfuller/interact/permanent_archive/Montreal/Wave1/Ethica/montreal_en_01/raw/accelerometer.csv
→ File /home/jeffs/projects/def-dfuller/interact/permanent_archive/Montreal/Wave1/Ethica/montreal_fr_01/raw/accelerometer.csv is associated with study number 396
→ Scanning file: /home/jeffs/projects/def-dfuller/interact/permanent_archive/Montreal/Wave1/Ethica/montreal_fr_01/raw/accelerometer.csv
→ 574 user_ids found in master linkage CSV
→ 558 distinct ethica_id values found in telemetry CSV data
→ All expected column 1 values found for Accel
→ All values found for column 1 in Accel were expected
→ Participant Study Map has 559 users.


In [None]:
# Create a hash of user_id -> study_id during the above verification DONE
# Then add a test to find users who appear in more than one study and display a warning. DONE
# Finally, add a mechanism to include a user in one study but exclude them from another

## Ingest Linkage

Once the CSV file is known to be valid and complete, we can ingest those records into the ethica_assignments table, either adding records or updating them as necessary.

- FILTER OUT cull RECORDS BECAUSE THEY AREN'T REAL PARTICIPANTS
  - MEANING: DON'T ADD THEM TO THE USER TABLE AND DON'T INGEST ANY TELEMETRY FOR THEM
- LOAD ignore RECORDS BECAUSE THEY'RE REAL PARTICIPANTS BUT HAVE INCOMPLETE/NO TELEMETRY
- THEN SKIP ignore USERS WHEN INGESTING TELEMETRY 
  - THEY ARE BEING IGNORED BECAUSE THEY'RE INCOMPLETE
  - BUT INCOMPLETE DOESN'T MEAN EMPTY
  - SO WE IGNORE THE PARTIAL DATA IF THERE IS ANY
  

In [7]:
# Create new users based on records found in linkage file
create_sql = """
    INSERT INTO portal_dev.ethica_assignments (interact_id, ethica_id, study_id)
    VALUES (%s, %s, %s);           
    """
#log(f"Participant study map: {participant_study}")

# Open a database session
with psycopg2.connect(user=db_user, host=db_host, port=db_host_port, database='interact_db') as conn:
    cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    # read through each line of the linkage csv
    with open(linkage_file,'r') as fh:
        reader = csv.DictReader(fh)
        # for each line, determine whether the record already exists
        for record in reader:
            # figure out whether to update, insert, or delete, based on CSV record
            ignore_user_rec = 'ignore' in record['data_disposition']
            real_user = not 'cull' in record['data_disposition']
            iid = record['interact_id']
            eid = record['ethica_id']
            if ignore_user_rec:
                log(f"Skipping creation of record for ethica_id {eid} based on 'ignore' flag.")
                continue
            if not validate_eid(eid, logfailures=False): # ignore non-Ethica users
                #log(f"Skipping linkage record for IID {iid} - {eid} is not a valid Ethica ID")
                continue
            #start_date = record['sd_start_1']
            #end_date = record['sd_end_1']
            if not real_user: # confirm user doesn't already exist
                log(f"Skipping creation of participation record for ethica test user {eid}")
                continue
            if isinstance(ethica_study_id, int):
                assign_study = ethica_study_id
            elif eid in participant_study:
                assign_study = participant_study[eid]
            else:
                log(f"Can't determine ethica_study_id for ethica_id {eid}. Skipping user.")
                continue
            sql = f"""SELECT count(1) as count from portal_dev.ethica_assignments
                      WHERE interact_id = {iid} AND study_id = {assign_study};"""
            cur.execute(sql)
            row = cur.fetchone()
            if row['count'] == 0:   
                cur.execute(create_sql, (iid, eid, assign_study))
                if cur.rowcount == 1:
                    log(f"Interact User {iid} had no participation record for study {assign_study}. Created.")
                else:
                    log(f"Problem trying to create participation record for user {iid} in study {assign_study}")
                #log(f"Creating user {iid}, {eid}, {assign_study}, {start_date}, {end_date}")
            else:
                log(f"INTERACT USER {iid} ALREADY EXISTS IN STUDY {assign_study}!")

→ Interact User 401563709 had no participation record for study 396. Created.
→ Interact User 401228518 had no participation record for study 395. Created.
→ Interact User 401837149 had no participation record for study 395. Created.
→ Interact User 401274458 had no participation record for study 396. Created.
→ Interact User 401866249 had no participation record for study 396. Created.
→ Interact User 401657924 had no participation record for study 396. Created.
→ Skipping creation of participation record for ethica test user 9409
→ Interact User 401590820 had no participation record for study 396. Created.
→ Interact User 401630460 had no participation record for study 395. Created.
→ Interact User 401977519 had no participation record for study 395. Created.
→ Interact User 401341065 had no participation record for study 396. Created.
→ Interact User 401959928 had no participation record for study 396. Created.
→ Interact User 401593830 had no participation record for study 396. Cre

→ Interact User 401151515 had no participation record for study 395. Created.
→ Interact User 401533666 had no participation record for study 395. Created.
→ Interact User 401159940 had no participation record for study 396. Created.
→ Skipping creation of participation record for ethica test user 6377
→ Interact User 401573388 had no participation record for study 395. Created.
→ Interact User 401189457 had no participation record for study 396. Created.
→ Interact User 401484804 had no participation record for study 396. Created.
→ Interact User 401796223 had no participation record for study 395. Created.
→ Interact User 401309336 had no participation record for study 395. Created.
→ Interact User 401601193 had no participation record for study 396. Created.
→ Interact User 401367601 had no participation record for study 396. Created.
→ Interact User 401888182 had no participation record for study 396. Created.
→ Interact User 401149339 had no participation record for study 395. Cre

## Ingest Targets

The preliminary ingest for {{city_name}} Wave 1 will create the following tables in the level_0 schema:

  - {{city_prefix}}-w1-eth-gps-raw-TOXIC
  - {{city_prefix}}-w1-eth-xls-raw-TOXIC
  - {{city_prefix}}-w1-eth-xls-delduplrec (raw xls with all rec-time duplicates removed)
  - {{city_prefix}}-w1-eth-xls-delconflrec (raw xls with all rec-time conflicts removed)
  - {{city_prefix}}-w1-eth-gps-delduplsat (raw gps with all sat-time duplicates removed)
  - {{city_prefix}}-w1-eth-gps-delduplrec (raw gps with all rec-time duplicates removed)
  - {{city_prefix}}-w1-eth-gps-delconflsat (raw gps with all sat-time conflicts removed)
  - {{city_prefix}}-w1-eth-gps-delconflrec (raw gps with all rec-time conflicts removed)

### gps_raw_TOXIC

Ideally, we want to use the psql COPY command, since it has been
optimized for fast loading, but the target telemetry table and the
incoming CSV file have different column names. We *COULD* just rename 
the columns in the CSV file, but that's a manual step that shouldn't
be embedded into the process.

So instead, we'll load the data into a temporary table, and then transfer it from there into the target table and match the records to their interact_ids at the same time. This method will more or less double the ingest time, but having a reliable ingest process that can be fully documented, without manual interventions, seems like the more robust path.

In [7]:
# get a list of ethica_ids that are supposed to be culled from the ingest
def get_ids_to_be_culled(csvfilepath):
    culls = set([])
    with open(csvfilepath,'r') as fh:
        reader = csv.DictReader(fh)
        # for each line, determine whether the record already exists
        for record in reader:
            #if 'cull' in record['data_disposition'] and not 'ignore' in record['data_disposition']:
                # a record that is both culled and ignored would not have been counted in the expected row count
            if 'cull' in record['data_disposition']:    
                culls.add(record['ethica_id'])
    if culls:
        log(f"Ethica IDs to be culled as per linkage file:")
        log(f"{culls}")
    else:
        log("Linkage file includes no cull requests.")
    return culls

def get_ids_to_be_ignored(csvfilepath):
    ignores = set([])
    with open(csvfilepath,'r') as fh:
        reader = csv.DictReader(fh)
        # for each line, determine whether the record already exists
        for record in reader:
            #if 'cull' in record['data_disposition'] and not 'ignore' in record['data_disposition']:
                # a record that is both culled and ignored would not have been counted in the expected row count
            if 'ignore' in record['data_disposition']:    
                ignores.add(record['ethica_id'])
    if ignores:
        log(f"Ethica IDs to be ignored as per linkage file:")
        log(f"{ignores}")
    else:
        log("Linkage file includes no ignore requests.")
    return ignores

In [8]:
# Function to create the temporary incoming table and load the raw GPS data into it
def load_raw_CSV_data(csvfilepaths, schema, tmptablename, tmpsqlvars, desttablename, destsqlvars, transfersql):
    log(f"Loading files '{csvfilepaths}' into table '{tmptablename}' of schema '{schema}'")
    with psycopg2.connect(user=db_user, host=db_host, port=db_host_port, database='interact_db') as conn:
        cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)

        # Create the temporary onboarding table
        fulltmptablename = f"{schema}.{tmptablename}"
        sql = f"""
            DROP TABLE IF EXISTS {fulltmptablename}; 
            CREATE TABLE {fulltmptablename} (
                {tmpsqlvars}
                );
                """
        log(f"Creating temp ingest table {tmptablename}")
        cur.execute(sql)
        # now add a comment describing table's purpose
        cur.execute(f"COMMENT ON TABLE {fulltmptablename} IS 'Temporary table for raw data ingest.';")
        
        
        # Now create the final destination table
        # Create the final destination table
        fulldesttablename = f"{schema}.{desttablename}"
        sql = f"""
            DROP TABLE IF EXISTS {fulldesttablename} CASCADE; 
            CREATE TABLE {fulldesttablename} (
                {destsqlvars}
                )
                """
        log(f"Creating destination table {fulldesttablename}")
        cur.execute(sql)

        
        # With tables created, we can now load the data from the CSV files
        for csvfilepath in csvfilepaths:
            log(f"Ingesting raw CSV from {csvfilepath}")
            with open(csvfilepath, 'r') as f:
                # Notice that we don't need the `csv` module.
                next(f) # Skip the header row because copy_from doesn't want it
                cur.copy_from(f, fulltmptablename, sep=',')

        # Now collect some basic stats and validate the loaded raw data
        log(f"Checking success of raw data load")
        rowcount = 0
        for csvfilepath in csvfilepaths:
            with open(csvfilepath,'r') as fh:
                for line in fh:
                    rowcount += 1
            rowcount -= 1 #don't count the header row
        log(f"Expecting {rowcount:,} lines in table")      
        sql = f"""SELECT SUM(num_recs) as num_recs, COUNT(1) as num_users
                  FROM (
                         SELECT COUNT(1) as num_recs
                         FROM {fulltmptablename}
                         GROUP BY user_id
                       ) as records_per_user
                  """
        cur.execute(sql)
        row = cur.fetchone()
        num_raw_recs = row['num_recs']
        num_raw_users = row['num_users']
        log(f"Ingested {num_raw_recs:,} records across {num_raw_users:,} users.")
        if num_raw_recs == rowcount:
            log("Raw data ingest appears successful.")
            conn.commit()
        else:
            log(f"ERROR: INGESTED ROW COUNT ({num_raw_recs}) DOES NOT MATCH SOURCE FILE ({rowcount})")
            return
              
        # And finally, we can transfer the data from the staging table into the final location
        log(f"Matching ethica_ids to interact_ids and transfering records into destination table")
        cur.execute(transfer_sql)

        # And collect some basic stats to validate the ingested data
        log(f"Checking success of ingest")
        sql = f"""SELECT SUM(num_recs) as num_recs, COUNT(1) as num_users
                  FROM (
                         SELECT COUNT(1) as num_recs
                         FROM {fulldesttablename}
                         GROUP BY iid
                       ) as records_per_user
                  """
        cur.execute(sql)
        row = cur.fetchone()
        num_final_recs = row['num_recs']
        num_final_users = row['num_users']
        
        # count the number of records for which there are no matching interact_ids
        missing_sql = f"""
            SELECT count(1) as num_missing_users, sum(num_recs) as total_missing_recs, min(interact_id)
            FROM (
                SELECT count(1) as num_recs, min(interact_id) as interact_id, min(user_id), min(ethica_email)
                FROM 
                   portal_dev.ethica_assignments asgn
                   RIGHT OUTER JOIN {fulltmptablename} raw
                   ON asgn.ethica_id = raw.user_id
                GROUP BY user_id
            ) recs_per_unmatched_user
            WHERE interact_id IS NULL;
            """
        cur.execute(missing_sql)
        row = cur.fetchone()
        num_missing_recs = int(row['total_missing_recs'] or 0) #assign 0 if no records match
        num_missing_users = int(row['num_missing_users'] or 0) #assign 0 if no records match
        log(f"Identified {num_missing_recs:,} records across {num_missing_users:,} users for whom interact_id is not known.")
        
        # Report the success, partial success, or failure of the ingest
        log(f"Transfered {num_final_recs:,} records for {num_final_users:,} users.")
        return
    
        if num_raw_recs == num_final_recs:
            log("Data transfer and ingest operation complete. All records ingested.")
            conn.commit()
        elif num_raw_recs == num_final_recs + num_missing_recs:
            log(f"Data transfer and ingest operation complete. All records accounted for, but {num_missing_users:,} unknown users.")
            conn.commit()
        else:
            log("ERROR: TRANSFERED ROW COUNT DOES NOT MATCH STAGING TABLE")
            #log(f"Expected {num_missing_recs} missing recs but found {} to be missing.")
   
        """    
        A mismatched row count alone is not indicative of an ingest problem because some user data
        is suppressed by design. Any user marked "cull" in the linkage table will be ignored, even
        though they might have associated data in the telemetry file. (These are test users, for the 
        most part, but could also be problematic cases identified during ingest.)
        
        To create a more robust and accurate verification pass, we need to:
        - have a list of all users who are being actively culled
        - have a count of telemetry rows associated with each such culled account
          - can count with join against user_category
        - then compare the ingested raw ingest row count with the final ingest PLUS the culled rows
        """
            
        data = [ ['num_recs', num_final_recs], ['num_users', num_final_users], ]
        print(f"\nFigure: Counts for {fulldesttablename}")
        print(tabulate(data, floatfmt=',.0f', stralign='right' ))
        
        
        # If there were some unmatched user_ids, list them so operator can investigate
        if num_missing_recs:
            problems_sql = f"""
                SELECT count(1) as num_recs, 
                       min(user_id) as user_id
                FROM portal_dev.ethica_assignments asgn
                   RIGHT OUTER JOIN {fulltmptablename} raw
                   ON asgn.ethica_id = raw.user_id
                WHERE interact_id IS NULL
                GROUP BY user_id;
                """
            print(f"\nFigure: Unmatched user_ids from {tmptablename}")
            cur.execute(problems_sql)
            rows = cur.fetchall()
            print(tabulate(rows, headers='keys'))
            print(f"Total Unmatched Rows: {num_missing_recs:,}")
#        else:
#            log("Record count in CSV matches record count of ingested table")


In [27]:
# Set up the parameters for loading the GPS table data and then load it
telemetryfilename = "gps.csv"
fullcsvpath = os.path.join(telemetrydir, telemetryfilename)
tmptablename = "tmpgps"
desttablename = f"{city_prefix}_w1_eth_gps_raw_TOXIC"

ids_to_cull = get_ids_to_be_culled(linkage_file)
ids_to_ignore = get_ids_to_be_ignored(linkage_file)

# make sure we cull any data from both cull records and ignore records
ids_to_cull.update(ids_to_ignore)

tmpsqlvars = """
    user_id BIGINT NOT NULL,
    date TEXT,
    device_id TEXT NOT NULL,
    record_time TIMESTAMP WITH TIME ZONE NOT NULL,
    timestamp TEXT,
    accu DOUBLE PRECISION,
    alt DOUBLE PRECISION,
    bearing DOUBLE PRECISION,
    lat DOUBLE PRECISION NOT NULL,
    lon DOUBLE PRECISION NOT NULL,
    provider TEXT,
    satellite_time TIMESTAMP WITH TIME ZONE,
    speed DOUBLE PRECISION
    """

destsqlvars = """
    iid BIGINT NOT NULL,  -- interact_id
    record_time TIMESTAMP WITH TIME ZONE NOT NULL, -- participant's UTC time, to millisec, from phone clock
    satellite_time TIMESTAMP WITH TIME ZONE NOT NULL, -- timestamp taken from satellite data
    lat DOUBLE PRECISION NOT NULL,
    lon DOUBLE PRECISION NOT NULL,
    speed DOUBLE PRECISION DEFAULT 'NaN',
    course DOUBLE PRECISION DEFAULT 'NaN',
    alt DOUBLE PRECISION DEFAULT 'NaN',
    accu DOUBLE PRECISION DEFAULT 'NaN',
    provider TEXT DEFAULT ''
    """

transfer_sql = f"""
   INSERT INTO {db_schema}.{desttablename} (iid,record_time,satellite_time,lat,lon,speed,course,alt,accu,provider)
        SELECT asgn.interact_id,
                raw.record_time,
                raw.satellite_time,
                raw.lat,
                raw.lon,
                raw.speed,
                raw.bearing,
                raw.alt,
                raw.accu,
                raw.provider
        FROM {db_schema}.{tmptablename} raw 
            INNER JOIN portal_dev.ethica_assignments AS asgn
            ON raw.user_id = asgn.ethica_id AND asgn.study_id in {tuple(ethica_study_id)}
        WHERE raw.user_id not in {tuple(ids_to_cull)}
        """

telemetryfiles = {os.path.join(telemetrydir, 'gps.csv'):395, 
                 os.path.join(telemetrydir, 'gps.csv').replace('_en_', '_fr_'):396}
load_raw_CSV_data(telemetryfiles, db_schema, tmptablename, tmpsqlvars, desttablename, destsqlvars, transfer_sql)

→ Ethica IDs to be culled as per linkage file:
→ {'7820', '6402', '7441', '10581', '6091', '6227', '8889', '7712', '8236', '5795', '5930', '6044', '5597', '9321', '5819', '5699', '9409', '9005', '6376', '6377', '5974', '5677', '5891', '5865'}
→ Ethica IDs to be ignored as per linkage file:
→ {'10581', '5992', '5582', '9026', '5695', '9321', '9000', '5699'}
→ Loading files '{'/home/jeffs/projects/def-dfuller/interact/permanent_archive/Montreal/Wave1/Ethica/montreal_en_01/raw/gps.csv': 395, '/home/jeffs/projects/def-dfuller/interact/permanent_archive/Montreal/Wave1/Ethica/montreal_fr_01/raw/gps.csv': 396}' into table 'tmpgps' of schema 'level_0'
→ Creating temp ingest table tmpgps
→ Creating destination table level_0.mtl_w1_eth_gps_raw_TOXIC
→ Ingesting raw CSV from /home/jeffs/projects/def-dfuller/interact/permanent_archive/Montreal/Wave1/Ethica/montreal_en_01/raw/gps.csv
→ Ingesting raw CSV from /home/jeffs/projects/def-dfuller/interact/permanent_archive/Montreal/Wave1/Ethica/montreal_

In [28]:
# Validate the ingested table
# Count the number of ingested records and compare with expected number
desttablename = f"{city_prefix}_w1_eth_gps_raw_TOXIC"
fulldesttablename = f"{db_schema}.{desttablename}"

# count records produced by users who were not ingested
# First, generate the list of culled users
#culled_eids = set([])
#with open(linkage_file,'r') as fh:
#    reader = csv.DictReader(fh)
    # for each line, determine whether the record already exists
#    for record in reader:
        #if 'cull' in record['data_disposition'] and not 'ignore' in record['data_disposition']:
            # a record that is both culled and ignored would not have been counted in the expected row count
#        if 'cull' in record['data_disposition']:    
#            culled_eids.add(record['ethica_id'])
culled_eids = get_ids_to_be_culled(linkage_file)
ignored_eids = get_ids_to_be_ignored(linkage_file)
log(f"There were {len(culled_eids)} ethica_ids culled from the ingest.")
#log(f"Culled IDs: {culled_eids}")
log(f"There were {len(ignored_eids)} ethica_ids ignored from the ingest.")
#log(f"Ignored IDs: {ignored_eids}")

# for the purposes of which ids are expected to have absent data, we include the ignored ids as well as culled
culled_eids.update(ignored_eids)
log(f"All user_ids expected to have data missing: {culled_eids}")

with psycopg2.connect(user=db_user, host=db_host, port=db_host_port, database='interact_db') as conn:
    cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    
    # Next, count how many records are associated with those culled ids
    sql = f"""
           SELECT COUNT(1) AS total_samples, 
                  SUM(1) FILTER (WHERE user_id IN %s) AS num_culled_samples
           FROM {db_schema}.{tmptablename}"""
    cur.execute(sql, (tuple(culled_eids),))
    row = cur.fetchone()
    num_culled_samples = row['num_culled_samples'] if row['num_culled_samples'] else 0
    total_raw_samples = row['total_samples']
    
    # Count number of actual records ingested into final table
    sql = f"""SELECT SUM(num_recs) as num_recs, COUNT(1) as num_users
              FROM (
                     SELECT COUNT(1) as num_recs
                     FROM {fulldesttablename}
                     GROUP BY iid
                   ) as records_per_user
              """
    cur.execute(sql)
    row = cur.fetchone()
    data = [['total_raw_samples', total_raw_samples], 
            ['culled_samples', num_culled_samples],   
            ['expected_final_samples', total_raw_samples - num_culled_samples],
            ['actual_final_samples', row['num_recs']], ]

    
    if total_raw_samples - num_culled_samples == row['num_recs']:
        log("Ingest passes basic validation.")
    else:
        log("ERROR: INGEST VALIDATION FAILED")
        
    print(f"\nFigure: Validating table '{fulldesttablename}'\n")
    print(tabulate(data, floatfmt=',.0f', stralign='right'))

→ Ethica IDs to be culled as per linkage file:
→ {'7820', '6402', '7441', '10581', '6091', '6227', '8889', '7712', '8236', '5795', '5930', '6044', '5597', '9321', '5819', '5699', '9409', '9005', '6376', '6377', '5974', '5677', '5891', '5865'}
→ Ethica IDs to be ignored as per linkage file:
→ {'10581', '5992', '5582', '9026', '5695', '9321', '9000', '5699'}
→ There were 24 ethica_ids culled from the ingest.
→ There were 8 ethica_ids ignored from the ingest.
→ All user_ids expected to have data missing: {'7820', '6402', '7441', '10581', '6091', '5992', '6227', '8889', '7712', '8236', '5795', '5930', '6044', '5597', '9321', '5695', '5819', '5699', '9409', '9005', '6376', '5582', '6377', '5974', '9000', '5677', '5891', '9026', '5865'}
→ Ingest passes basic validation.

Figure: Validating table 'level_0.mtl_w1_eth_gps_raw_TOXIC'

----------------------  ----------
     total_raw_samples  84,086,076
        culled_samples     470,210
expected_final_samples  83,615,866
  actual_final_samples 

### xls_raw_TOXIC
Proceeds as per the GPS table, but with accelerometry source file and column names.

In [9]:
# Set up the parameters for loading the XL table data and then load it
telemetryfilename = "accelerometer.csv"
fullcsvpath = os.path.join(telemetrydir, telemetryfilename)
tmptablename = "tmpxl"
desttablename = f"{city_prefix}_w1_eth_xls_raw_TOXIC"

ids_to_cull = get_ids_to_be_culled(linkage_file)
ids_to_ignore = get_ids_to_be_ignored(linkage_file)

# make sure we cull any data from both cull records and ignore records
ids_to_cull.update(ids_to_ignore)

tmpsqlvars = """
    user_id BIGINT NOT NULL,
    date TEXT,
    device_id TEXT NOT NULL,
    record_time TIMESTAMP WITH TIME ZONE NOT NULL,
    timestamp TEXT,
    accu DOUBLE PRECISION,
    x_axis DOUBLE PRECISION NOT NULL,
    y_axis DOUBLE PRECISION NOT NULL,
    z_axis DOUBLE PRECISION NOT NULL
    """

destsqlvars = """
    iid BIGINT NOT NULL,   -- interact_id
    record_time TIMESTAMP WITH TIME ZONE NOT NULL, -- participant's UTC time, to millisec 
    x DOUBLE PRECISION NOT NULL,
    y DOUBLE PRECISION NOT NULL,
    z DOUBLE PRECISION NOT NULL
    """

transfer_sql = f"""
    INSERT INTO {db_schema}.{desttablename} (iid,record_time,x,y,z)
    SELECT asgn.interact_id,
        raw.record_time,
        raw.x_axis,
        raw.y_axis,
        raw.z_axis
    FROM {db_schema}.{tmptablename} raw 
        INNER JOIN portal_dev.ethica_assignments asgn
        ON raw.user_id = asgn.ethica_id AND asgn.study_id in {tuple(ethica_study_id)}
        WHERE raw.user_id not in {tuple(ids_to_cull)}
        """

telemetryfiles = {os.path.join(telemetrydir, 'accelerometer.csv'):395, 
                 os.path.join(telemetrydir, 'accelerometer.csv').replace('_en_', '_fr_'):396}

load_raw_CSV_data(telemetryfiles, db_schema, tmptablename, tmpsqlvars, desttablename, destsqlvars, transfer_sql)

→ Ethica IDs to be culled as per linkage file:
→ {'5699', '6091', '9409', '5974', '7441', '6227', '5819', '6044', '9321', '6376', '7820', '5795', '9005', '6402', '6377', '8236', '5677', '5891', '5865', '5930', '10581', '8889', '7712', '5597'}
→ Ethica IDs to be ignored as per linkage file:
→ {'5699', '5582', '5992', '9000', '9026', '10581', '9321', '5695'}
→ Loading files '{'/home/jeffs/projects/def-dfuller/interact/permanent_archive/Montreal/Wave1/Ethica/montreal_en_01/raw/accelerometer.csv': 395, '/home/jeffs/projects/def-dfuller/interact/permanent_archive/Montreal/Wave1/Ethica/montreal_fr_01/raw/accelerometer.csv': 396}' into table 'tmpxl' of schema 'level_0'
→ Creating temp ingest table tmpxl
→ Creating destination table level_0.mtl_w1_eth_xls_raw_TOXIC
→ Ingesting raw CSV from /home/jeffs/projects/def-dfuller/interact/permanent_archive/Montreal/Wave1/Ethica/montreal_en_01/raw/accelerometer.csv
→ Ingesting raw CSV from /home/jeffs/projects/def-dfuller/interact/permanent_archive/Mon

In [11]:
# Validate the ingested table
# Count the number of ingested records and compare with expected number
desttablename = f"{city_prefix}_w1_eth_xls_raw_TOXIC"
fulldesttablename = f"{db_schema}.{desttablename}"

log(f"Validating {desttablename}...")
# count records produced by users who were not ingested
# First, generate the list of users whose telemetry is being culled
#culled_eids = set([])
#with open(linkage_file,'r') as fh:
#    reader = csv.DictReader(fh)
    # for each line, determine whether the record already exists
#    for record in reader:
#        if 'cull' in record['data_disposition'] and not 'ignore' in record['data_disposition']:
            # a record that is both culled and ignored would not have been counted in the expected row count
#            culled_eids.add(record['ethica_id'])
culled_eids = get_ids_to_be_culled(linkage_file)
ignored_eids = get_ids_to_be_ignored(linkage_file)
log(f"There were {len(culled_eids)} ethica_ids culled from the ingest.")
#log(f"Culled IDs: {culled_eids}")
log(f"There were {len(ignored_eids)} ethica_ids ignored from the ingest.")
#log(f"Ignored IDs: {ignored_eids}")

# for the purposes of which ids are expected to have absent data, we include the ignored ids as well as culled
culled_eids.update(ignored_eids)


with psycopg2.connect(user=db_user, host=db_host, port=db_host_port, database='interact_db') as conn:
    cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)

    # Next, count how many records are associated with those culled ids
    sql = f"""
           SELECT COUNT(1) AS total_samples, 
                  SUM(1) FILTER (WHERE user_id IN %s) AS num_culled_samples
           FROM {db_schema}.{tmptablename}"""
    cur.execute(sql, (tuple(culled_eids),))
    row = cur.fetchone()
    num_culled_samples = row['num_culled_samples'] if row['num_culled_samples'] else 0
    total_raw_samples = row['total_samples']
    
    # Count number of actual records ingested into final table
    sql = f"""SELECT SUM(num_recs) as num_recs, COUNT(1) as num_users
              FROM (
                     SELECT COUNT(1) as num_recs
                     FROM {fulldesttablename}
                     GROUP BY iid
                   ) as records_per_user
              """
    cur.execute(sql)
    row = cur.fetchone()
    data = [['total_raw_samples', total_raw_samples], 
            ['culled_samples', num_culled_samples],   
            ['expected_final_samples', total_raw_samples - num_culled_samples],
            ['actual_final_samples', row['num_recs']], ]

    
    if total_raw_samples - num_culled_samples == row['num_recs']:
        log("Ingest passes basic validation.")
    else:
        log("ERROR: INGEST VALIDATION FAILED")
        
    print(f"\nFigure: Validating table '{fulldesttablename}'\n")
    print(tabulate(data, floatfmt=',.0f', stralign='right'))    

→ Validating mtl_w1_eth_xls_raw_TOXIC...
→ Ethica IDs to be culled as per linkage file:
→ {'5699', '6091', '9409', '5974', '7441', '6227', '5819', '6044', '9321', '6376', '7820', '5795', '9005', '6402', '6377', '8236', '5677', '5891', '5865', '5930', '10581', '8889', '7712', '5597'}
→ Ethica IDs to be ignored as per linkage file:
→ {'5699', '5582', '5992', '9000', '9026', '10581', '9321', '5695'}
→ There were 24 ethica_ids culled from the ingest.
→ There were 8 ethica_ids ignored from the ingest.
→ Ingest passes basic validation.

Figure: Validating table 'level_0.mtl_w1_eth_xls_raw_TOXIC'

----------------------  -------------
     total_raw_samples  3,459,362,823
        culled_samples     21,305,557
expected_final_samples  3,438,057,266
  actual_final_samples  3,438,057,266
----------------------  -------------


#### Comparing population of both tables
As a final validation, verify that the GPS and XLS raw tables both have data from the same users.

In [12]:
with psycopg2.connect(user=db_user, host=db_host, port=db_host_port, database='interact_db') as conn:
    cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
   
    mismatch_sql = f"""
        SET SCHEMA '{db_schema}';
        DROP TABLE IF EXISTS tmpgpsiids;
        DROP TABLE IF EXISTS tmpxlsiids;
        CREATE TABLE tmpgpsiids AS SELECT DISTINCT iid FROM {city_prefix}_w1_eth_gps_raw_toxic ;
        CREATE TABLE tmpxlsiids AS SELECT DISTINCT iid FROM {city_prefix}_w1_eth_xls_raw_toxic ;
        SELECT tmpgpsiids.iid AS gps_iid, tmpxlsiids.iid AS xls_iid 
        FROM tmpgpsiids 
             RIGHT OUTER JOIN tmpxlsiids 
             ON tmpgpsiids.iid = tmpxlsiids.iid 
        WHERE tmpgpsiids.iid IS NULL 
           OR tmpxlsiids.iid IS NULL;
        """
    
    print(f"\nFigure: {city_name} users with data present in only one table\n")
    cur.execute(mismatch_sql)
    rows = cur.fetchall()
    if rows:
        print(tabulate(rows, headers='keys', stralign='right'))
    else:
        print('No mismatches found.')

    # Interact_id 302562672 is known to have no GPS data for SSK
    # Interact_id 201680208 is known to have no GPS data for VAN


Figure: Montreal users with data present in only one table

No mismatches found.


### Table *-xs-delduplrec 
This table will be extracted from the raw XLS data, with all record_time **duplicates** removed.

In [4]:
tablename = f"{city_prefix}_w1_eth_xls_delduplrec"
sourcetable = f"{city_prefix}_w1_eth_xls_raw_TOXIC"
with psycopg2.connect(user=db_user, host=db_host, port=db_host_port, database='interact_db') as conn:
    cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    log(f"Creating {tablename} from scratch...")
    sql = f"""
        DROP TABLE IF EXISTS {db_schema}.{tablename};
        CREATE TABLE {db_schema}.{tablename}
        AS
            SELECT iid, record_time, x, y, z
            FROM (
                SELECT count(1) as numrows, 
                       iid, 
                       record_time,
                       min(x) as x,
                       min(y) as y,
                       min(z) as z
                FROM {db_schema}.{sourcetable}
                GROUP BY iid, record_time
                ) as count_collisions
            WHERE numrows = 1; -- there are no duplicates at all
        CREATE UNIQUE INDEX {tablename}_idx ON {db_schema}.{tablename} (iid, record_time);
            """
    cur.execute(sql)
    conn.commit()
    log(f"Done")
    
    # Report basic stats on the newly created table
    print(f"\nFigure: Basic counts for table {tablename}\n")
    sql = f"""
        SELECT SUM(num_recs) as num_recs, COUNT(1) as num_users
        FROM (
             SELECT COUNT(1) as num_recs
             FROM {db_schema}.{tablename}
             GROUP BY iid
           ) as records_per_user
        """
    cur.execute(sql)
    row = cur.fetchone()
    data = [['num_recs',row['num_recs']], ['num_users',row['num_users']],]
    print(tabulate(data, floatfmt=',.0f', stralign='right'))


→ Creating mtl_w1_eth_xls_delduplrec from scratch...
→ Done

Figure: Basic counts for table mtl_w1_eth_xls_delduplrec

---------  -------------
 num_recs  3,012,538,959
num_users            545
---------  -------------


### Table *-gps-delduplrec 
This table will be extracted from the raw GPS data, with all record_time **duplicates** removed.

In [5]:
tablename = f'{city_prefix}_w1_eth_gps_delduplrec'
sourcetable = f"{city_prefix}_w1_eth_gps_raw_TOXIC"
with psycopg2.connect(user=db_user, host=db_host, port=db_host_port, database='interact_db') as conn:
    cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    log(f"Creating table {tablename} from scratch...")
    sql = f"""
        SET SCHEMA '{db_schema}';
        DROP TABLE IF EXISTS {tablename};
        CREATE TABLE {tablename}
        AS
            SELECT iid, record_time, satellite_time, minlat as lat, minlon as lon
            FROM (
                SELECT count(1) as numrows, 
                       iid, 
                       record_time, 
                       min(satellite_time) as satellite_time,
                       min(lat) as minlat, 
                       max(lat) as maxlat,
                       min(lon) as minlon,
                       max(lon) as maxlon
                FROM {sourcetable}
                GROUP BY iid, record_time
                ) as count_collisions
            WHERE numrows = 1; -- there are no duplicates at all
            CREATE UNIQUE INDEX {tablename}_idx ON {tablename} (iid, record_time);
            """
    cur.execute(sql)
    conn.commit()
    log(f"Done")
    
    # Report basic stats on the newly created table
    print(f"\nFigure: Basic counts for table {tablename}\n")
    sql = f"""
    SET SCHEMA '{db_schema}';
    SELECT SUM(num_recs) as num_recs, COUNT(1) as num_users
        FROM (
             SELECT COUNT(1) as num_recs
             FROM {tablename}
             GROUP BY iid
           ) as records_per_user
        """
    cur.execute(sql)
    row = cur.fetchone()
    data = [['num_recs',row['num_recs']], ['num_users',row['num_users']],]
    print(tabulate(data, floatfmt=',.0f', stralign='right'))

→ Creating table mtl_w1_eth_gps_delduplrec from scratch...
→ Done

Figure: Basic counts for table mtl_w1_eth_gps_delduplrec

---------  ----------
 num_recs  27,800,268
num_users         545
---------  ----------


### Table *-gps-delconflrec 
This table will be extracted from the raw GPS data, with all record_time **conflicts** removed.

In [6]:
tablename = f'{city_prefix}_w1_eth_gps_delconflrec'
sourcetable = f"{city_prefix}_w1_eth_gps_raw_TOXIC"
with psycopg2.connect(user=db_user, host=db_host, port=db_host_port, database='interact_db') as conn:
    cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    log(f"Creating table {tablename} from scratch...")
    sql = f"""
        SET SCHEMA '{db_schema}';
        DROP TABLE IF EXISTS {tablename};
        CREATE TABLE {tablename}
        AS
            SELECT iid, record_time, satellite_time, minlat as lat, minlon as lon
            FROM (
                SELECT count(1) as numrows, 
                       iid, 
                       record_time, 
                       min(satellite_time) as satellite_time,
                       min(lat) as minlat, 
                       max(lat) as maxlat,
                       min(lon) as minlon,
                       max(lon) as maxlon
                FROM {sourcetable}
                GROUP BY iid, record_time
                ) as count_collisions
            WHERE numrows = 1 -- there are no duplicates at all
               OR (minlon = maxlon AND minlat = maxlat); -- the duplicates are identical
        CREATE UNIQUE INDEX {tablename}_idx ON {tablename} (iid, record_time);
        """
    cur.execute(sql)
    conn.commit()
    log(f"Done")
    
    # Report basic stats on the newly created table
    print(f"\nFigure: Basic counts for table {tablename}\n")
    sql = f"""
    SET SCHEMA '{db_schema}';
    SELECT SUM(num_recs) as num_recs, COUNT(1) as num_users
        FROM (
             SELECT COUNT(1) as num_recs
             FROM {tablename}
             GROUP BY iid
           ) as records_per_user
        """
    cur.execute(sql)
    row = cur.fetchone()
    data = [['num_recs',row['num_recs']], ['num_users',row['num_users']],]
    print(tabulate(data, floatfmt=',.0f', stralign='right'))

→ Creating table mtl_w1_eth_gps_delconflrec from scratch...
→ Done

Figure: Basic counts for table mtl_w1_eth_gps_delconflrec

---------  ----------
 num_recs  28,065,265
num_users         545
---------  ----------


### Table *-xls-delconflrec 
This table will be extracted from the raw XL data, with all record_time **conflicts** removed.

In [7]:
tablename = f'{city_prefix}_w1_eth_xls_delconflrec'
sourcetable = f"{city_prefix}_w1_eth_xls_raw_TOXIC"
with psycopg2.connect(user=db_user, host=db_host, port=db_host_port, database='interact_db') as conn:
    cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    log(f"Creating table {tablename} from scratch...")
    sql = f"""
        SET SCHEMA '{db_schema}';
        DROP TABLE IF EXISTS {tablename};
        CREATE TABLE {tablename}
        AS
            SELECT iid, record_time, minx as x, miny as y, minz as z
            FROM (
                SELECT count(1) as numrows, 
                       iid, 
                       record_time,
                       min(x) as minx,
                       max(x) as maxx,
                       min(y) as miny,
                       max(y) as maxy,
                       min(z) as minz,
                       max(z) as maxz
                FROM {sourcetable}
                GROUP BY iid, record_time
                ) as count_collisions
            WHERE numrows = 1 -- there are no duplicates at all
               OR (minx = maxx AND miny = maxy AND minz = maxz); -- the duplicates are identical
            CREATE UNIQUE INDEX {tablename}_idx ON {tablename} (iid, record_time);
            """
    cur.execute(sql)
    conn.commit()
    log(f"Done")
    
    # Report basic stats on the newly created table
    print(f"\nFigure: Basic counts for table {tablename}\n")
    sql = f"""
    SET SCHEMA '{db_schema}';
    SELECT SUM(num_recs) as num_recs, COUNT(1) as num_users
        FROM (
             SELECT COUNT(1) as num_recs
             FROM {tablename}
             GROUP BY iid
           ) as records_per_user
        """
    cur.execute(sql)
    row = cur.fetchone()
    data = [['num_recs',row['num_recs']], ['num_users',row['num_users']],]
    print(tabulate(data, floatfmt=',.0f', stralign='right'))

→ Creating table mtl_w1_eth_xls_delconflrec from scratch...
→ Done

Figure: Basic counts for table mtl_w1_eth_xls_delconflrec

---------  -------------
 num_recs  3,020,062,703
num_users            545
---------  -------------


### Table *-gps-delduplsat
This table will be extracted from the raw GPS data, with all satellite_time **duplicates** removed.

In [8]:
tablename = f'{city_prefix}_w1_eth_gps_delduplsat'
sourcetable = f"{city_prefix}_w1_eth_gps_raw_TOXIC"
with psycopg2.connect(user=db_user, host=db_host, port=db_host_port, database='interact_db') as conn:
    cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    log(f"Creating table {tablename} from scratch...")
    sql = f"""
        SET SCHEMA '{db_schema}';
        DROP TABLE IF EXISTS {tablename};
        CREATE TABLE {tablename}
        AS
            SELECT iid, record_time, satellite_time, minlat as lat, minlon as lon
            FROM (
                SELECT count(1) as numrows, 
                       iid, 
                       min(record_time) as record_time, 
                       satellite_time,
                       min(lat) as minlat, 
                       max(lat) as maxlat,
                       min(lon) as minlon,
                       max(lon) as maxlon
                FROM {sourcetable}
                GROUP BY iid, satellite_time
                ) as count_collisions
            WHERE numrows = 1; -- there are no duplicates at all
        CREATE UNIQUE INDEX {tablename}_idx ON {tablename} (iid, satellite_time);
        """
    cur.execute(sql)
    conn.commit()
    log(f"Done")
    
    # Report basic stats on the newly created table
    print(f"\nFigure: Basic counts for table {tablename}\n")
    sql = f"""
    SET SCHEMA '{db_schema}';
    SELECT SUM(num_recs) as num_recs, COUNT(1) as num_users
        FROM (
             SELECT COUNT(1) as num_recs
             FROM {tablename}
             GROUP BY iid
           ) as records_per_user
        """
    cur.execute(sql)
    row = cur.fetchone()
    data = [['num_recs',row['num_recs']], ['num_users',row['num_users']],]
    print(tabulate(data, floatfmt=',.0f', stralign='right'))

→ Creating table mtl_w1_eth_gps_delduplsat from scratch...
→ Done

Figure: Basic counts for table mtl_w1_eth_gps_delduplsat

---------  ----------
 num_recs  24,944,462
num_users         544
---------  ----------


### Table *-gps-delconflsat
This table will be extracted from the raw GPS data, with all satellite_time **conflicts** removed.

In [9]:
tablename = f'{city_prefix}_w1_eth_gps_delconflsat'
sourcetable = f"{city_prefix}_w1_eth_gps_raw_TOXIC"
with psycopg2.connect(user=db_user, host=db_host, port=db_host_port, database='interact_db') as conn:
    cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    log(f"Creating table {tablename} from scratch...")
    sql = f"""
        SET SCHEMA '{db_schema}';
        DROP TABLE IF EXISTS {tablename};
        CREATE TABLE {tablename}
        AS
            SELECT iid, record_time, satellite_time, minlat as lat, minlon as lon
            FROM (
                SELECT count(1) as numrows, 
                       iid, 
                       min(record_time) as record_time, 
                       satellite_time,
                       min(lat) as minlat, 
                       max(lat) as maxlat,
                       min(lon) as minlon,
                       max(lon) as maxlon
                FROM {sourcetable}
                GROUP BY iid, satellite_time
                ) as count_collisions
            WHERE numrows = 1 -- there are no duplicates at all
               OR (minlon = maxlon AND minlat = maxlat); -- the duplicates are identical
        CREATE UNIQUE INDEX {tablename}_idx ON {tablename} (iid, satellite_time);
        """
    cur.execute(sql)
    conn.commit()
    log(f"Done")
    
    # Report basic stats on the newly created table
    print(f"\nFigure: Basic counts for table {tablename}\n")
    sql = f"""
    SET SCHEMA '{db_schema}';
    SELECT SUM(num_recs) as num_recs, COUNT(1) as num_users
        FROM (
             SELECT COUNT(1) as num_recs
             FROM {tablename}
             GROUP BY iid
           ) as records_per_user
        """
    cur.execute(sql)
    row = cur.fetchone()
    data = [['num_recs',row['num_recs']], ['num_users',row['num_users']],]
    print(tabulate(data, floatfmt=',.0f', stralign='right'))

→ Creating table mtl_w1_eth_gps_delconflsat from scratch...
→ Done

Figure: Basic counts for table mtl_w1_eth_gps_delconflsat

---------  ----------
 num_recs  27,282,423
num_users         545
---------  ----------


In [None]:
tablename = f'{city_prefix}_w1_eth_xls_delduplrec'
# Quick block that can be run at any time to validate counts on any of the above-created tables
with psycopg2.connect(user=db_user, host=db_host, port=db_host_port, database='interact_db') as conn:
    cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    
    # Collect the same basic stats as computed during ingest
    sql = f"""
    SET SCHEMA '{db_schema}';
    SELECT SUM(num_recs) as num_recs, COUNT(1) as num_users
        FROM (
             SELECT COUNT(1) as num_recs
             FROM {tablename}
             GROUP BY iid
           ) as records_per_user
        """
    cur.execute(sql)
    row = cur.fetchone()
    data = [['num_recs',row['num_recs']], ['num_users',row['num_users']],]
    print(f"Figure: Basic counts for table {tablename}")
    print(tabulate(data, floatfmt=',.0f', stralign='right'))

Here endeth the ingest.