# Tools for verifying that every record is correctly processed and saved

## Initialize database files

Manually resets the master.db file which results are saved into

In [1]:
%cd twitteranalysis
from DataTools.SqliteTools import initialize_master_db, delete_master_db, initialize_word_map_db
import environment


(bookmark:twitteranalysis) -> /Users/adam/Dropbox/PainNarrativesLab/TwitterDataAnalysis
/Users/adam/Dropbox/PainNarrativesLab/TwitterDataAnalysis


In [None]:
delete_master_db()
initialize_master_db()

In [2]:
initialize_word_map_db(environment.TWEET_DB_NO_STOP)

In [None]:
test = '%s/user-test.db' % environment.LOG_FOLDER_PATH
initialize_word_map_db(test)

## Run the user parser

In [None]:
%cd twitteranalysis
%run -i Executables/process_user_descriptions_into_words2.py

## Manually clear server queue

This should not normally be needed. 

In [None]:
%cd twitteranalysis

from Servers.ClientSide import Client
c = Client()

# Each of the listening request handlers needs its queue flushed
c.send_flush_command()

#j = c.send_shutdown_command()

In [None]:
j

In [None]:
# add_indexes(environment.TWEET_DB_MASTER)

# Check integrity of saved data

In [1]:
%cd twitteranalysis
import environment
import sqlite3
from DataTools import SqliteDataTools as DT

(bookmark:twitteranalysis) -> /Users/adam/Dropbox/PainNarrativesLab/TwitterDataAnalysis
/Users/adam/Dropbox/PainNarrativesLab/TwitterDataAnalysis


In [None]:
actualUsers = 1328927
numberProcessed = 4352
numberEmpty = 332
expectedUsers = numberProcessed - numberEmpty
print('should have %s' % expectedUsers)

In [2]:
DT.count_rows(environment.TWEET_DB_NO_STOP)

58758327 rows in /Users/adam/Dropbox/PainNarrativesLab/private_data/tweet-databases/tweets-no-stop.db


58758327

In [None]:
%%time
count_words(environment.USER_DB_NO_STOP)
# environment.USER_DB_NO_STOP
# 11.4 without index
# 6.31 with index

In [None]:
DT.count_rows(environment.TWEET_DB_MASTER)

In [None]:
DT.count_tweets(environment.TWEET_DB_MASTER)

In [None]:
num_users = DT.count_users()

In [None]:
missing = expectedUsers - num_users
pct_problem = missing / numberProcessed
expected_missing = round(actualUsers * pct_problem)

print("%s users were not saved; this is %s pct of the total processed" %( missing, pct_problem))
print("Projecting %s problem cases" % expected_missing)

NB: 
    - numberUsers processed = 4352
    - users w empty descriptions = 332
    - users non-english = 129
    - expected users: 4020 (less empty)

5/15 6.06
    - master only 
    - 3984 unique ids based on master
    - 67500 rows in master
    - 36 missing
    - 10993 projected problems
    
5/15 1.36
    - lock added to sqlite writer
    - 4085 unique ids based on files
    - 3950 unique ids based on master
    - 67002 rows in master and based on files


5/15 11.12
    - 4059 unique ids based on files
    - 3923 unique ids based on master
    - 66507 rows in master and based on files

5/15 10.39
    - 3653 unique ids based on files
    - 3529 unique ids based on master
    - 60060 rows in master and based on files
    

5/14 9.00
    - 4014 unique ids based on files
    - 2510 unique ids based on master
    - 41973 rows in master and based on files


Before changed to class method w separate call to flush
    - 3591 Unique user ids
    - 47952 rows in master.db

After
    - 4014 Unique user ids
    - 67940 rows in master.db
    



### Figure out which users are missing

In [8]:
%cd twitteranalysis
import sqlite3
import environment
import DataTools.Cursors
cursor = DataTools.Cursors.WindowedUserCursor( language='en' )


(bookmark:twitteranalysis) -> /Users/adam/Dropbox/PainNarrativesLab/TwitterDataAnalysis
/Users/adam/Dropbox/PainNarrativesLab/TwitterDataAnalysis
connection ready


In [9]:
cursor.next()

ProgrammingError: (mysql.connector.errors.ProgrammingError) 1054 (42S22): Unknown column 'users.userID' in 'field list' [SQL: 'SELECT users.`userID` AS `users_userID`, users.screen_name AS users_screen_name, users.id_str AS users_id_str, users.name AS users_name, users.description AS users_description, users.lang AS users_lang, users.utc_offset AS users_utc_offset, users.verified AS users_verified, users.followers_count AS users_followers_count, users.friends_count AS users_friends_count, users.url AS users_url, users.time_zone AS users_time_zone, users.created_at AS users_created_at, users.entities AS users_entities, users.favourites_count AS users_favourites_count, users.statuses_count AS users_statuses_count, users.id AS users_id, users.location AS users_location, users.is_translation_enabled AS users_is_translation_enabled \nFROM users \nWHERE users.`userID` > %(userID_1)s ORDER BY users.`userID` \n LIMIT %(param_1)s'] [parameters: {'userID_1': 0, 'param_1': 4}] (Background on this error at: http://sqlalche.me/e/f405)

In [None]:

def find_missing_users():
    """Finds users which were not saved to master.db"""
    missing = []
    try:
        conn = sqlite3.connect(environment.MASTER_DB)
        curs = conn.cursor()  # Connect a cursor
    
        while True:
            user = cursor.next()
            q = "select * from word_map_deux where user_id = %s" % user.userID
            r1 = curs.execute(q)
            r = r1.fetchone()
            if r is None:
                missing.append(user.userID)
    except StopIteration:
        curs.close()
        conn.close()
        return missing

missing = find_missing_users()


print(len(missing))

In [None]:
import pandas as pd
from DataTools.DataConnections import MySqlConnection, DAO
conn = MySqlConnection(environment.CREDENTIAL_FILE)
conn._make_engine()

def get_description_for_id(userId):
    """Loads the description from master.db
    Returns a tuple (userId, description)
    """
    q = 'select description from users where userID = %s' % userId
    v = pd.read_sql_query(q, conn.engine).iloc[0].values[0]
    return (userId, v)

In [None]:
# figure out which users were not missing due to an empty profile

descripts = []
# get the descriptions for each user
for userId in missing:
    descripts.append(get_description_for_id(userId))
# determine which are substantive problems
substantive = [x for x in descripts if x[1] != '']
substantive_ids = [x[0] for x in descripts if x[1] != '']
print("%s users had non-empty profiles but were not saved. These are 'substantive errors'" % len(substantive))

In [None]:
substantive

In [None]:
names=['timestamp', 'userid', 'note']

# when a user was enqued for processing on client 
proc = pd.read_csv(environment.PROCESSING_ENQUE_LOG_FILE, header=None, names=names)
# when a user was enqued for saving on the client
enq = pd.read_csv(environment.CLIENT_ENQUE_LOG_FILE, header=None, names=names)
# when a user was sent to the server
sent = pd.read_csv(environment.CLIENT_SEND_LOG_FILE, header=None, names=names)
# when the server received each request
srv = pd.read_csv(environment.SERVER_RECEIVE_LOG_FILE, header=None, names=names)

print("%s users processed; %s users received by server" % (len(proc), len(set(srv.userid.tolist()))))

In [None]:
def h(frame, userId):
    try:
        return frame[frame.userid == userId].index[0]
    except:
        return None

def get_indexes(userId):
    d = {'id': userId}
    d['processed'] = h(proc, userId)
    d['clientEnque'] = h(enq, userId)
    d['sent'] = h(sent, userId)
    d['received'] = h(srv, userId)
    return d
    

In [None]:
get_indexes(1956700424)

In [None]:
proc[proc.userid == 1956700424].index[0]

In [None]:
sent[sent.userid == 1956700424].index

# figure out where in the process the substantive errors happened

## substantive errors which were enqued in processing

In [None]:

p = proc[proc.userid.isin(substantive_ids)]
# sub 
s = srv[srv.userid.isin(substantive_ids)]
snt = sent[sent.userid.isin(substantive_ids)]
ceq = enq[enq.userid.isin(substantive_ids)]

processed_ids = set(p.userid.tolist())
server_received_ids = set(s.userid.tolist())
sent_ids = set(snt.userid.tolist())
client_enq_ids = set(ceq.userid.tolist())

print("The following concerns the flow of users with substative errors through the system")
print("%s were enqueued for processing" % (len(processed_ids)))
print("%s were enqueued on the client to be sent to the server" % len(client_enq_ids))
print("%s were sent to the server" % len(sent_ids))
print("%s were received by the server" % len(server_received_ids))

### Processed but not enqueued for saving

In [None]:
proc_not_enq = [p for p in processed_ids if p not in client_enq_ids]
proc_not_enq

In [None]:
[get_description_for_id(id) for id in proc_not_enq]

### Enqueued on client but not sent

In [None]:
not_sent = [p for p in client_enq_ids if p not in sent_ids ]
not_sent

In [None]:
[get_description_for_id(id) for id in not_sent]

In [None]:
d = pd.DataFrame([get_indexes(id) for id in not_sent])
d.set_index('id', inplace=True)
d.sort_values('processed')

### Are these the same users each time?

In [None]:
prev_run = [1956700424.0, 1965229962.0,
 1943096588.0,
 2150423437.0,
 2163358988.0,
 1943901734.0,
 2163604778.0,
 1946121392.0,
 1958085936.0,
 2147790896.0,
 2167298995.0,
 2148304566.0,
 2151409467.0,
 2177120316.0,
 1966904126.0,
 1977458240.0,
 1978158529.0,
 2168963268.0,
 1967229895.0,
 1952156365.0,
 1974223567.0,
 1961129809.0,
 1947484375.0,
 2157188568.0,
 1942653919.0,
 2187999841.0,
 2153422184.0,
 2153945834.0,
 2148022776.0,
 1971054716.0]

In [None]:
[x for x in not_sent if x in prev_run]

In [None]:
processed_and_received_sub_errors = processed_ids.intersection(server_received_ids)
len(processed_and_received_sub_errors)

In [None]:
# these were processed by client and received by server
# but were not recorded
j = [x for x in substantive if x[0] in processed_and_received_sub_errors]
j

EXCLUSIVE
    - 30 seconds
    - no server side queue induced errors

IMMEDIATE
    - 30 seconds
    - no server side queue induced errors

DEFERRED
    - 30 seconds
    - no server side queue induced errors

Default (bare BEGIN)
    - 29 seconds
    - 60 server side errors



Autocommit
    - Long
    - 258 server side errors

In [None]:
len(sent_ids)

In [None]:
client_enq_ids

In [None]:
def c(row):
    return p.i[row.index + 1].timestamp - row.timestamp
j = p.apply(lambda x: c(x))

In [None]:
p

uids = []
rows = []
print("Unique user ids; rows")
for db in otherDbNames:
    dbPath = '%s/%s' % (environment.DB_FOLDER, db)
    conn = sqlite3.connect( dbPath )  # Connect to the main database
    curs = conn.cursor()  # Connect a cursor
    r1 = conn.execute("select count( distinct user_id) from word_map_deux")
    v =  r1.fetchone()
    uids.append(v[0])
    r2 = conn.execute("select count( word) from word_map_deux")
    v2 =  r2.fetchone()
    rows.append(v2[0])

    print("%s : %s; %s " % (db, v[0], v2[0]))
    conn.close()
