# Goal
1. functional refactor
1. plus num_edits_all_time_pre_treatment
plus has_email_configured (as proxy for emailable)

In [11]:
from itertools import islice

def window_seq(seq, n=2):
    "Returns a sliding window (of width n) over data from the iterable"
    "   s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...                   "
    it = iter(seq)
    result = tuple(islice(it, n))
    if len(result) == n:
        yield result
    for elem in it:
        result = result[1:] + (elem,)
        yield result
        
def wmftimestamp(bytestring):
    if bytestring:
        s = bytestring.decode('utf-8')
        return dt.strptime(s, '%Y%m%d%H%M%S')
    else:
        return bytestring
    
def to_wmftimestamp(date):
    return date.strftime('%Y%m%d%H%M%S')
    
    

def decode_or_nan(b):
    return b.decode('utf-8') if b else float('nan')


import json
import requests

from sqlalchemy import create_engine
from sqlalchemy.exc import ProgrammingError
from pymysql.err import InternalError, OperationalError
import sys
import os
import pandas as pd
import numpy as np

import mwclient
import mwviews
import mwapi

from datetime import datetime as dt
from datetime import timedelta as td

%pylab inline

langs_non_de = ['ar', 'fa', 'pl']
langs = langs_non_de + ['de']

# site = mwclient.Site(('https', f'{langcode}.wikipedia.org'), path = '/w/')

# os.makedirs(datadir, exist_ok=True)

constr = 'mysql+pymysql://{user}:{pwd}@{host}:{port}'.format(user=os.environ['MYSQL_USERNAME'],
                                                      pwd=os.environ['MYSQL_PASSWORD'],
                                                      host=os.environ['MYSQL_HOST'],
                                                    port=os.environ['MYSQL_PORT'],
                                                         charset='utf8',
                                                        use_unicode=True)

con = create_engine(constr, encoding='utf-8')


con.execute(f'use enwiki_p;')

#Tuesday Mar 6 2018
#Friday March 23 2018

sim_treatment_dates = (dt(2018,3,6), dt(2018,3, 23))
sim_treatment_date = sim_treatment_dates[0]

sim_experiment_end_date = sim_treatment_date + td(days=90)
sim_observation_start_date = sim_treatment_date - td(days=90)

# wikipedia_start_date = dt(2001,1,1)
wikipedia_start_date = dt(2015,1,1)

CACHE_ROOT = os.getenv('CACHE_DIR', './cache')
GRAT_ROOT = os.getenv('GRAT_DIR', '../gratitude/outputs/')

Populating the interactive namespace from numpy and matplotlib


In [12]:
import mwreverts.api
import mwapi

# We'll use the mwreverts API check.  In order to do that, we need an API session
de_session = mwapi.Session("https://de.wikipedia.org", 
                        user_agent="Max Klein Newcomer Quality <max@notconfusing.com>")

# for a single user, get their revisions and determine how many are flagged or not

def get_revisions_and_flagged_data(user_id, treatment_date):
    rev_flag_sql = """
        select rev_id, 
                rev_page, 
                page_namespace, 
                rev_timestamp, 
                fr_timestamp, 
                (select max(fr_timestamp) from flaggedrevs where fr_page_id=rev_page and fr_timestamp < {treatment_date}) max_fr_ts 
            from (
                  select rev_id, rev_page, rev_timestamp, page_namespace from revision_userindex
                    join page on page_id = rev_page where rev_user={user_id} 
                    and rev_timestamp < {treatment_date}
                    order by rev_timestamp desc limit 500) auser
            left join flaggedrevs on
                fr_page_id = rev_page and
                fr_rev_id = rev_id;
                    """.format(user_id=user_id, treatment_date=treatment_date.strftime('%Y%m%d%H%M%S'))
    con.execute('use dewiki_p;')
    rev_flag = pd.read_sql(rev_flag_sql, con)
    rev_flag['fr_timestamp'] = rev_flag['fr_timestamp'].apply(wmftimestamp)
    rev_flag['max_fr_ts'] = rev_flag['max_fr_ts'].apply(wmftimestamp)
    rev_flag['rev_timestamp'] = rev_flag['rev_timestamp'].apply(wmftimestamp)
    return rev_flag

def was_reverted(rev_id):
    try:
        _, reverted, reverted_to = mwreverts.api.check(
            de_session, rev_id, radius=3,  # most reverts within 5 edits
            window=48*60*60,  # 2 days
            rvprop={'user', 'ids'})  # Some properties we'll make use of
        return True if reverted else False
    except (KeyError, mwapi.session.APIError) as err:
        print('Error getting revert status for {rev_id}'.format(rev_id=rev_id))
        return True #because even if it was deleted from the DB for our purposes its still a bad edit 

def decide_flagged(row):
    """Was this revision flagged (or generally high quality)?"""
    namespace = row['page_namespace']
    rev_time = row['rev_timestamp']
    flagged_time = row['fr_timestamp']
    last_flagged_time = row['max_fr_ts']
    was_reverted = row['was_reverted']
    
    #namespace check
    if namespace != 0:
        return True # because often user-page edits are never approved   
    # check if explictly flagged
    elif pd.notnull(flagged_time):
        return True
    #check if reverted
    elif was_reverted:
        return False
    #check if the last flagged time is after the edit
    elif last_flagged_time:
        if rev_time < last_flagged_time:
            return True
        #the revision exists but it hasn't been flagged yet.
        else:
            return False
    else:
        #not sure what else would get to this stage , but...
        return False
        
def get_flagged_decision_df(user_id, treatment_date):
    rev_df = get_revisions_and_flagged_data(user_id, treatment_date)
    rev_df['was_reverted'] = rev_df.apply(lambda row: was_reverted(row['rev_id']) if pd.isnull(row['fr_timestamp']) else 'no_check', axis=1)
    rev_df['flagged'] = rev_df.apply(decide_flagged, axis=1)
    return rev_df


def get_flagged_revs(user_id, treatment_date, return_all=False):
    needed_columns = ['user_id', 'rev_id', 'rev_timestamp','was_flagged', 'was_reverted']
    rev_df = get_flagged_decision_df(user_id, treatment_date)
    rev_df['user_id'] = user_id
    rev_df['was_flagged'] = rev_df.apply(decide_flagged, axis=1)
    if return_all:
        return rev_df
    #limit to only flagged
    rev_df = rev_df[rev_df['was_flagged']==True]
    # limit ot non-talk pages
    rev_df = rev_df[rev_df['page_namespace'].apply(lambda ns: ns%2==0)]
    # Limit to 17
    if rev_df.empty:
        return pd.DataFrame(columns=needed_columns)
    rev_df = rev_df.sort_values('rev_timestamp', ascending=False)
    rev_df = rev_df.iloc[:17]

    return rev_df[needed_columns]

In [13]:
def make_cached_df(cache_sub_dir):
    #decorator factory
    def cached_df(df_returning_fn):
        #decorator
        def get_with_cache(*args, **kwargs):
            #wrapping function
            cache_dir = os.path.join(CACHE_ROOT, cache_sub_dir)
            if not os.path.exists(cache_dir):
                    os.makedirs(cache_dir, exist_ok=True)
            fname = f'{"_".join([str(a) for a in args])}'
            cache_key = os.path.join(cache_dir, fname)
            try:
                return pd.read_pickle(cache_key)
            except FileNotFoundError:
                df = df_returning_fn(*args, **kwargs)
                df.to_pickle(cache_key)
                return df
            
        return get_with_cache
    return cached_df

In [39]:
def get_active_users(lang, start_date, end_date, min_rev_id):
    """
    Rterun the first and last edits of only active users in `lang`wiki
    between the start_date and end_date.
    """
    con.execute(f'use {lang}wiki_p;')
    active_sql = """select distinct(rev_user) from revision 
    where {start_date} <= rev_timestamp and rev_timestamp <= {end_date}
    and rev_id > {min_rev_id}
    ;
                """.format(start_date=to_wmftimestamp(start_date),
           end_date=to_wmftimestamp(end_date),
           lang=lang, min_rev_id=min_rev_id)
    active_df = pd.read_sql(active_sql, con)
    return active_df

In [None]:
@make_cached_df('spans')
def get_users_edit_spans(lang, start_date, end_date):
    """
    Return the the first and last edits of all users in `lang`wiki
    between the start_date and end_date
    """
    db_prefix = f'{lang}wiki_p'
    con.execute(f'use {db_prefix};')
    reg_sql = '''select '{lang}' as lang, user_id, user_name, user_registration,
       (select min(rev_timestamp) from revision_userindex where rev_user=user_id and {start_date} <= rev_timestamp <= {end_date}) as first_edit, 
       (select max(rev_timestamp) from revision_userindex where rev_user=user_id and {start_date} <= rev_timestamp <= {end_date}) as last_edit
from user where coalesce(user_registration, 20010101000000) <= {end_date} 
     and 
                coalesce(user_registration, 20010101000000) >= {start_date};
'''.format(start_date=to_wmftimestamp(start_date),
           end_date=to_wmftimestamp(end_date),
           lang=lang)
    span_df = pd.read_sql(reg_sql, con)
    span_df['user_registration'] = span_df['user_registration'].apply(wmftimestamp)
    span_df['first_edit'] = span_df['first_edit'].apply(wmftimestamp)
    span_df['last_edit'] = span_df['last_edit'].apply(wmftimestamp)
    span_df['user_name'] = span_df['user_name'].apply(decode_or_nan)
    return span_df    

In [9]:
def make_populations(start_date=wikipedia_start_date, end_date=sim_treatment_date):
    """for every registered user get first and last edit (or not of those users didn't edit in the period)"""
    span_dfs = []
    for lang in langs:
        span_df = get_users_edit_spans(lang, start_date, end_date)
        span_dfs.append(span_df)
    return pd.concat(span_dfs)

NameError: name 'wikipedia_start_date' is not defined

In [8]:
def remove_inactive_users(df):
    """remove users who have no edits in the period"""
    return df[(pd.notnull(df['first_edit'])) | (pd.notnull(df['last_edit']))]

In [19]:
@make_cached_df('thanks')
def get_thanks_thanking_user(lang, user_name, start_date, end_date):
    con.execute(f"use {lang}wiki_p;")
    user_thank_sql = f"""
                    select thank_timestamp, sender, receiver, ru.user_id as receiver_id, su.user_id as sender_id from
                        (select log_timestamp as thank_timestamp, replace(log_title, '_', ' ') as receiver, log_user_text as sender
                        from logging_logindex where log_title = '{user_name.replace(' ', '')}'
                        and log_action = 'thank'
                        and {to_wmftimestamp(start_date)} <= log_timestamp <= {to_wmftimestamp(end_date)}) t
                    left join user ru on ru.user_name = t.receiver
                    left join user su on su.user_name = t.sender """
    df = pd.read_sql(user_thank_sql, con)
    df['thank_timestamp'] = df['thank_timestamp'].apply(wmftimestamp)
    df['sender'] = df['sender'].apply(decode_or_nan)
    df['receiver'] = df['receiver'].apply(decode_or_nan)
    return df
    

In [10]:
def add_thanks(df, start_date, end_date, col_name):
    user_thank_count_dfs = []
    for lang in langs:
        user_names =  user_names = df[df['lang']==lang]['user_name'].values
        for user_name in user_names:
            user_thank_df = get_thanks_thanking_user(lang, user_name, start_date, end_date)
            user_thank_count_df = pd.DataFrame.from_dict({col_name:[len(user_thank_df)],
                                           'user_name':[user_name], 
                                           'lang':[lang]}, orient='columns')
            user_thank_count_dfs.append(user_thank_count_df)
    
    thank_counts_df = pd.concat(user_thank_count_dfs)
    df = pd.merge(df, thank_counts_df, how='left', on=['lang', 'user_name'])
    return df

def add_thanks_pre_treatment(df):
    return add_thanks(df, start_date=sim_observation_start_date, end_date=sim_treatment_date, col_name='num_thanks_received_pre_treatment')


In [29]:
def bin_from_td(delta):
    bins_log2 = (0, 90, 180, 365, 730, 1460, 2920, 5840)
    delta_days = delta.days
    prev_threshold=0
    for threshold in bins_log2:
        if delta_days > threshold:
            prev_threshold=threshold
            continue
        else:
            break
    return f'bin_{prev_threshold}'


def add_experience_bin(df):
    df['days_since_registration'] = sim_treatment_date - df['user_registration']
    df['experience_level_pre_treatment'] = df['days_since_registration'].apply(bin_from_td)
    del df['days_since_registration']
    return df

In [59]:
def output_bin_stats(df):
    bin_stats = pd.DataFrame(df.groupby(['experience_level_pre_treatment','lang']).size()).rename(columns={0:'users_with_at_least_days_experience'})
    bin_stats.to_csv('outputs/bin_stats_df_one_edit_min.csv', index=False)

# Subsetting representative sample

In [None]:
active_lang_bin_groups = full_df.groupby(['active_in_90_pre_treatment', 'lang', 'experience_level_pre_treatment'])

In [None]:
actual_sample_size = 200
guess_needed_sample_size = 100 # some of the subsamples will not be valid because after testing for quality they won't have sufficient
subsamples = []
for (is_active, lang, bin_name), group in active_lang_bin_groups:
    sample_size = guess_needed_sample_size
    if bin_name == 'bin_0':
        sample_size = sample_size * 3
    if len(group) < sample_size:
        sample_size = len(group)
#     print(group.shape, sample_size)
    subsample = group.sample(n=sample_size)
    subsamples.append(subsample)

In [None]:
sub_df = pd.concat(subsamples)

In [None]:
sub_df['lang'].value_counts()

In [None]:
sub_df['experience_level_pre_treatment'].value_counts()

In [None]:
sub_df['active_in_90_pre_treatment'].value_counts()

# Get Revisions of Editors

In [53]:
def get_90_stroke_50_recent_edits(userid, lang):
    '''this will get all the rev_ids for a user that occured less than 90 days before their last edit
    and no more than 50 edits in total '''
    con.execute('use {lang}wiki_p;'.format(lang=lang))
    revsql = '''
            select rev_user, ts, rev_id from
            (select a.rev_user as rev_user, timestamp(a.rev_timestamp) as ts, a.rev_id as rev_id, timestamp(b.mts) as mts
            from
            (select rev_user, rev_timestamp, rev_id from revision_userindex where rev_user = {userid}) a
            join
            (select rev_user, max(rev_timestamp) as mts from revision_userindex where rev_user = {userid})  b
            on a.rev_user = b.rev_user
            ) uhist
            where ts > date_sub(mts, interval 90 day)
            limit 50;'''.format(userid=userid)
    udf = pd.read_sql(revsql, con)
    return udf

def get_50_edits_before_sim_treatment(userid, lang):
    '''this will get all the rev_ids and timestamps for a user that occured before the simulated treatment date and just the 
        50 most recent of those '''
    con.execute('use {lang}wiki_p;'.format(lang=lang))
    revsql = '''select rev_id, rev_timestamp, page_id, page_namespace from (select rev_id, rev_timestamp, rev_page from revision_userindex where rev_user = {userid}
                and rev_timestamp < {datestr}
                order by rev_timestamp desc
                                            limit 50) revs
                                            join page
                                            on revs.rev_page = page.page_id;'''.format(datestr=sim_treatment_date.strftime('%Y%m%d%H%M%S'), userid=userid)
    udf = pd.read_sql(revsql, con)
    udf['rev_timestamp'] = udf['rev_timestamp'].apply(wmftimestamp)
    return udf.to_dict(orient='records')

In [54]:
sub_df['revisions_before_sim_treatment_50'] = sub_df.apply(lambda row: get_50_edits_before_sim_treatment(row['user_id'], row['lang']), axis=1)

NameError: name 'sub_df' is not defined

In [None]:
sub_df['len_revs'] = sub_df['revisions_before_sim_treatment_50'].apply(len)

In [None]:
sub_df['len_revs'].hist()

In [None]:
sub_df = sub_df[sub_df['len_revs']!=0]

In [None]:
sub_df.to_pickle('checkpoints/sub_df_revs_1.pickle')

In [None]:
sub_df = pd.read_pickle('checkpoints/sub_df_revs_1.pickle')

# Get ORES

In [None]:
ores_string = 'https://ores.wikimedia.org/v3/scores/{context}/{revid}/{model}'

def list_of_ores_scores(rev_ids, model, lang):
#     print('lang', lang)
    context = f'{lang}wiki'
    ores_scores = []
    ores_predicitons = []
    for rev_id in rev_ids:
        ores_req = ores_string.format(context=context, revid=rev_id, model=model)
#         print(f'context: {context}')
        # print(f'ores_req: {ores_req}')
        ores_resp = requests.get(url=ores_req)
        ores_data = ores_resp.json()
#         print(f'ores_data: {ores_data}')
        try:
            ores_score = ores_data[context]['scores'][str(rev_id)][model]['score']['probability']['true']
            ores_prediction = ores_data[context]['scores'][str(rev_id)][model]['score']['prediction']
            ores_scores.append(ores_score)
            ores_predicitons.append(ores_prediction)
        except KeyError:
            ores_scores.append(float('nan'))
            ores_predicitons.append(float('nan'))
    return {'ores_scores':ores_scores, 'ores_predictions':ores_predicitons}

In [None]:
count = 0
def get_from_ores_with_cache(rev_ids, model, lang):
    global count
    count += 1
    if count % 100 == 0:
        print(count, dt.now())
    datadir = os.path.join('data', lang, model)
    os.makedirs(datadir, exist_ok=True)
    fname = os.path.join(datadir,f'revlist_starting_{rev_ids[0]}')
    if not os.path.exists(fname):
        ores_data = list_of_ores_scores(rev_ids, model, lang)
        json.dump(ores_data, open(fname, 'w'))
        return ores_data
    else:
        return json.load(open(fname,'r'))
    

In [None]:
# sub_sub_df = sub_df.iloc[:100]

In [None]:
# %%time
# sub_sub_df['ores_damaging_data'] = sub_sub_df.apply(lambda row: get_from_ores_with_cache(
#     rev_ids=[d['rev_id'] for d in row['revisions_before_sim_treatment_50']],
#     model='damaging',
#     lang=row['lang']),
#     axis=1)

In [None]:
# sub_sub_df

In [None]:
count = 0
sub_df['ores_damaging_data'] = sub_df.apply(lambda row: get_from_ores_with_cache(
    rev_ids=[d['rev_id'] for d in row['revisions_before_sim_treatment_50']],
    model='damaging',
    lang=row['lang']),
    axis=1)

In [None]:
sub_df.to_pickle('checkpoints/sub_df_revs_ores_damaging_1.pickle')

In [None]:
sub_df = pd.read_pickle('checkpoints/sub_df_revs_ores_damaging_1.pickle')

In [None]:
sub_df['num_non_damaging_pre_treatment'] = sub_df['ores_damaging_data'].apply(lambda d: len([pred for pred in d['ores_predictions'] if pred==False]))

In [None]:
count = 0
sub_df['ores_goodfaith_data'] = sub_df.apply(lambda row: get_from_ores_with_cache(
    rev_ids=[d['rev_id'] for d in row['revisions_before_sim_treatment_50']],
    model='goodfaith',
    lang=row['lang']),
    axis=1)

In [None]:
sub_df['num_goodfaith_pre_treatment'] = sub_df['ores_goodfaith_data'].apply(lambda d: len([pred for pred in d['ores_predictions'] if pred==True]))

In [None]:
sub_df.to_pickle('checkpoints/sub_df_revs_ores_damaging_goodfaith_1.pickle')

In [None]:
sub_df['num_goodfaith_pre_treatment'].mean()

In [None]:
sub_df['num_non_damaging_pre_treatment'].mean()

In [None]:
sub_df.to_pickle('checkpoints/sub_df_revs_ores_damaging_goodfaith_1.pickle')

In [None]:
sub_df.to_pickle('checkpoints/sub_df_with_de_without_rev_data.pickle')

# make data for batches for julia and co 

## now we split into the german and non-german sets

### for the non-german,
+ we'll get the 50 most recent edits, 
+ and pull the ORES data for them
+ and continue

### for the german,
+ use the flagged revisions method

In [None]:
sub_df = pd.read_pickle('checkpoints/sub_df_with_de_without_rev_data.pickle')

In [None]:
sub_df_orig = sub_df.copy()

In [None]:
sub_df_de = sub_df[sub_df['lang'] == 'de']

sub_df = sub_df[sub_df['lang'] != 'de']

In [None]:
#only active users
batch_eligble = sub_df[sub_df['active_in_90_pre_treatment']==True]
batch_eligble_de = sub_df_de[sub_df_de['active_in_90_pre_treatment']==True]

In [None]:
batch_eligble.groupby('lang').agg(len)

In [None]:
batch_eligble['newcomer_experienced'] = batch_eligble['experience_level_pre_treatment'].apply(lambda x: 'newcomer' if x=='bin_0' else 'experienced_90_days_plus')
batch_eligble_de['newcomer_experienced'] = batch_eligble_de['experience_level_pre_treatment'].apply(lambda x: 'newcomer' if x=='bin_0' else 'experienced_90_days_plus')

In [None]:
batch_eligble.groupby(by=['lang','newcomer_experienced']).agg(len)

In [None]:
batch_eligble_de.groupby(by=['lang','newcomer_experienced']).agg(len)

In [None]:
batches = batch_eligble.groupby(by=['lang','newcomer_experienced']).apply(lambda df: df.sample(n=200))
batches_de = batch_eligble_de.groupby(by=['lang','newcomer_experienced']).apply(lambda df: df.sample(n=200))

In [None]:
batches.groupby('lang').agg(len)

In [None]:
batches['revisions_before_sim_treatment_50'] = batches.apply(lambda row: get_50_edits_before_sim_treatment(row['user_id'], row['lang']), axis=1)

In [None]:
count = 0
batches['ores_damaging_data'] = batches.apply(lambda row: get_from_ores_with_cache(
    rev_ids=[d['rev_id'] for d in row['revisions_before_sim_treatment_50']],
    model='damaging',
    lang=row['lang']),
    axis=1)
count = 0
batches['ores_goodfaith_data'] = batches.apply(lambda row: get_from_ores_with_cache(
    rev_ids=[d['rev_id'] for d in row['revisions_before_sim_treatment_50']],
    model='goodfaith',
    lang=row['lang']),
    axis=1)

## now we need to go to batches row-oriented

In [None]:
def max_17_edits_of_user(user_df):
    batch_edits_dfs = []
    rev_df = pd.DataFrame.from_dict(user_df['revisions_before_sim_treatment_50'].iloc[0])
    ores_damaging_scores = user_df['ores_damaging_data'].apply(lambda d:d['ores_scores']).values[0]
    ores_damaging_predictions = user_df['ores_damaging_data'].apply(lambda d:d['ores_predictions']).values[0]
    ores_goodfaith_scores = user_df['ores_goodfaith_data'].apply(lambda d:d['ores_scores']).values[0]
    ores_goodfaith_predictions = user_df['ores_goodfaith_data'].apply(lambda d:d['ores_predictions']).values[0]
    try:
        rev_df['ores_damaging_scores'] = ores_damaging_scores
        rev_df['ores_damaging_predictions'] = ores_damaging_predictions
        rev_df['ores_goodfaith_scores'] = ores_goodfaith_scores
        rev_df['ores_goodfaith_predictions'] = ores_goodfaith_predictions
    except ValueError: #occurs from some temporal mismatch where there are more ores data than revisions maybe some were deleted
        return None #skip this user

    rev_df = rev_df[rev_df['ores_damaging_predictions']==False]
    rev_df = rev_df[rev_df['ores_goodfaith_predictions']==True]
     
    # even namespaces
    # Limit to non-talk pages
    rev_df = rev_df[rev_df['page_namespace'].apply(lambda ns: ns%2==0)]
    # Limit to 17
    rev_df = rev_df.iloc[:17]
    
    if len(rev_df)==0:
        print('no nondamaging goodfaith for user: {}'.format(user_df['user_id'][0]))
        return None
    for row in rev_df.iterrows():
        edit_df = user_df.copy()
        edit_df['rev_id'] = row[1]['rev_id']
        edit_df['rev_timestamp'] = row[1]['rev_timestamp']
        edit_df['ores_damaging_score'] = row[1]['ores_damaging_scores']
        edit_df['ores_damaging_prediction'] = row[1]['ores_damaging_predictions']
        edit_df['ores_goodfaith_score'] = row[1]['ores_goodfaith_scores']
        edit_df['ores_goodfaith_prediction'] = row[1]['ores_goodfaith_predictions']
        edit_df['page_namespace'] = row[1]['page_namespace']

        batch_edits_dfs.append(edit_df)
    return pd.concat(batch_edits_dfs)

In [None]:
batches_edits = batches.groupby('user_id').apply(max_17_edits_of_user)

In [None]:
len(batches_edits), (len(batches)*17)/2 + (len(batches))

In [None]:
batches_edits.head()

In [None]:
batches_edits.to_pickle('checkpoints/batches_edits_non_de_1.pickle')

In [55]:
batches_edits = pd.read_pickle('checkpoints/batches_edits_non_de_1.pickle')

FileNotFoundError: [Errno 2] No such file or directory: 'checkpoints/batches_edits_non_de_1.pickle'

## german processing

In [None]:
con.execute('use dewiki_p;')

In [None]:
%%time
de_flagged_revs_users_d10 = [get_flagged_revs(user_id, sim_treatment_date) for user_id in batches_de.iloc[:10]['user_id']]

In [None]:
de_flagged_revs_users = [get_flagged_revs(user_id, sim_treatment_date) for user_id in batches_de['user_id']]

In [None]:
batches_edits_de = pd.concat(de_flagged_revs_users)

In [None]:
batches_edits_de.to_pickle('checkpoints/batches_edits_de_1.pickle')

In [None]:
batches_edits_de = pd.read_pickle('checkpoints/batches_edits_de_1.pickle')

## finally put german back together and  need to sample down to 100 users per lang-binexperience group

In [None]:
batches_edits_de['lang']='de'

In [None]:
de_user_id_experience = batches_de['user_id'].reset_index()[['user_id','newcomer_experienced']]

In [None]:
batches_edits_de['user_id'] = batches_edits_de['user_id'].apply(int)

In [None]:
batches_edits_de = batches_edits_de.merge(de_user_id_experience, how='left', on='user_id')

In [None]:
batches_edits_de.to_pickle('checkpoints/batches_edits_de_with_newcomer_experience1.pickle')

In [None]:
batches_edits_de = pd.read_pickle('checkpoints/batches_edits_de_with_newcomer_experience1.pickle')

In [None]:
common_cols = ['lang', 'user_id','rev_id', 'newcomer_experienced', 'rev_timestamp']
batches_edits_all = pd.concat([batches_edits[common_cols], batches_edits_de[common_cols]])

In [None]:
len(batches_edits), len(batches_edits_de), len(batches_edits_all)

In [None]:
#take the first 100 of each lang/experience group
be_user_ids = batches_edits_all.groupby(['lang','newcomer_experienced']).apply(lambda df: df['user_id'].unique()[:100]).reset_index().rename({0:'100_users'},axis=1)

In [None]:
batches_edits_all.head()

In [None]:
def is_lucky_100(user_id, lang, newcomer_experienced):
    lucky_uids = be_user_ids[(be_user_ids['lang']==lang) & (be_user_ids['newcomer_experienced']==newcomer_experienced)]['100_users'].iloc[0]
    return user_id in lucky_uids

batches_edits_all['lucky_100'] = batches_edits_all.apply(lambda row: is_lucky_100(row['user_id'], row['lang'], row['newcomer_experienced']) , axis=1)

In [None]:
# sample down
batches_edits_100per = batches_edits_all[batches_edits_all['lucky_100'] == True]

In [None]:
batches_edits_100per.groupby(['lang']).agg(len)

In [None]:
batches_edits_100per.to_pickle('checkpoints/batches_edits_100per.pickle')

In [None]:
from mwapi.errors import APIError

cnt = 0
def get_diff_html_dict(lang, revid):
    global cnt
    cnt += 1
    if cnt%100==0:
        print(cnt)
    try:
        mwapisession = mwapi.Session(host=f'https://{lang}.wikipedia.org', user_agent='civilservant datagathering <max@notconfusing.com>')
        ret = mwapisession.get(action='compare', fromrev=revid, torelative='prev', prop='diff|user|parsedcomment|comment|size|rel|title')
        ret_comp = ret['compare']
    except APIError:
        ret_comp = {'*':'Deleted revision','tosize':0,'toparsedcomment':'Deleted revision'}
    #keys are: tosize, toparsedcomment, *
    return ret_comp

In [None]:
%%capture
batches_edits_100per['diff_info'] = batches_edits_100per.apply(lambda row: get_diff_html_dict(row['lang'], row['rev_id']), axis=1)

In [None]:
batches_edits_100per['html_blob'] = batches_edits_100per['diff_info'].apply(lambda d: d['*'])
batches_edits_100per['diff_size'] = batches_edits_100per['diff_info'].apply(lambda d: d['tosize'])
batches_edits_100per['edit_comment'] = batches_edits_100per['diff_info'].apply(lambda d: d['toparsedcomment'])

In [None]:
def difflink(lang,rev_id):
    basestr = 'https://{lang}.wikipedia.org/w/index.php?oldid={rev_id}'.format(lang=lang, rev_id=rev_id)
    return basestr

In [None]:
batches_edits_100per['diff_link'] = batches_edits_100per.apply(lambda row: difflink(row['lang'], row['rev_id']), axis=1)

In [None]:
batches_edits_100per.to_pickle('checkpoints/batches_edits_2.pickle')

In [None]:
batches_edits_100per = pd.read_pickle('checkpoints/batches_edits_2.pickle')

In [None]:
be_save_columns = ['user_id', 'lang', 'newcomer_experienced',
                   'rev_id', 'edit_comment',
                   'diff_size', 'html_blob', 'diff_link',
                   'rev_timestamp'
                  ]

In [None]:
be_for_out = batches_edits_100per[be_save_columns]

In [None]:
be_for_out.head()

In [None]:
be_for_out.to_csv('outputs/user_edit_diffs_nondamaging_goodfaith_max17_evennamespaces_100_per_experiencelevel_per_lang_with_de_ts.csv', index=False, encoding='utf-8')

# Make labour sessions

In [None]:
def get_timestamps_90_before_after_sim_treatment(userid, lang, before_after_sim):
    '''this will get all the timestamps of edits for a user that occured before or after 90 days before, upto the  simulated treatment date'''
    if before_after_sim=='before':
        start, end = sim_observation_start_date, sim_treatment_date
    elif before_after_sim=='after':
        start, end = sim_treatment_date, sim_experiment_end_date
        
    con.execute('use {lang}wiki_p;'.format(lang=lang))
    revsql = '''select rev_timestamp from revision_userindex where rev_user = {userid}
                and rev_timestamp >= {datestr_start} and rev_timestamp < {datestr_end} 
                order by rev_timestamp
                '''.format(datestr_end=end.strftime('%Y%m%d%H%M%S'), 
                           datestr_start=start.strftime('%Y%m%d%H%M%S'),
                           userid=userid)
    udf = pd.read_sql(revsql, con)
    udf['rev_timestamp'] = udf['rev_timestamp'].apply(wmftimestamp)
    return list(udf['rev_timestamp'])

In [None]:
sub_df['timestamps_90_before_sim_treatment'] = sub_df.apply(lambda row: get_timestamps_90_before_after_sim_treatment(row['user_id'], row['lang'], 'before'), axis=1) 
sub_df['timestamps_90_after_sim_treatment'] = sub_df.apply(lambda row: get_timestamps_90_before_after_sim_treatment(row['user_id'], row['lang'], 'after'), axis=1) 

## more german power data finishing

In [None]:
sub_df_de = pd.read_pickle('checkpoints/sub_df_de_with_thanks.pickle')

In [None]:
sub_df_de['timestamps_90_before_sim_treatment'] = sub_df_de.apply(lambda row: get_timestamps_90_before_after_sim_treatment(row['user_id'], row['lang'], 'before'), axis=1) 
sub_df_de['timestamps_90_after_sim_treatment'] = sub_df_de.apply(lambda row: get_timestamps_90_before_after_sim_treatment(row['user_id'], row['lang'], 'after'), axis=1) 

In [None]:
flagged_count = 0 
def num_flagged_in_last_50(user_id, sim_treatment_date):
    global flagged_count
    if flagged_count % 100 == 0:
        print(flagged_count)
    flagged_count += 1
    deuser_pickle_fname = f'data/deusers/{user_id}.pickle'
    if not os.path.exists(deuser_pickle_fname):
        deuser = get_flagged_revs(user_id, sim_treatment_date, return_all=True)
        deuser.to_pickle(deuser_pickle_fname)
    else:
        deuser = pd.read_pickle(deuser_pickle_fname)    
    
    num_considerations = 50
    deuser_short = deuser.sort_values('rev_timestamp', ascending=False).iloc[:num_considerations]
    num_flagged = len(deuser_short[deuser_short['flagged']==True])
    return num_flagged

In [None]:
sub_df_de['num_flagged_revisions_pre_treatment'] = sub_df_de['user_id'].apply(lambda u: num_flagged_in_last_50(u, sim_treatment_date))

In [None]:
def make_sessions(ts_list):
    #these structures store the timestamps
    edit_sessions = []
    curr_edit_session = []

    #initialize prev to the earliest data possible
    prev_timestamp = datetime.datetime(year=2001, month=1, day=1)

    for index, ts in enumerate(ts_list):
#         print('index:', index)
        curr_timestamp = ts
        #if curr timestamp within 1 hour of last then append
        if curr_timestamp - prev_timestamp < datetime.timedelta(hours=1):
            curr_edit_session.append(curr_timestamp)
        # else start a new edit session
        else:
            #if there's a pre-existing session save it to the return
            if curr_edit_session:
                edit_sessions.append(curr_edit_session)
            # and start a new session
            curr_edit_session = [curr_timestamp]
        # this is before
        if index < len(ts_list)-1:
            prev_timestamp = curr_timestamp
        # this is the last item save this session too.
        else:
#             print('this is the end')
            edit_sessions.append(curr_edit_session)
        
    return edit_sessions

def labour_hours(ts_list):
    sessions = make_sessions(ts_list)
    total_labour_hours = 0
    for session in sessions:
        if len(session) == 1:
            total_labour_hours += 1
        else:
            session_duration = max(session) - min(session)
            session_seconds = session_duration.seconds
            session_hours = session_seconds / (60*60)
            session_hours += 1 # for this session
            total_labour_hours += session_hours
    return total_labour_hours

def ts_in_week(ts_list, date_start, date_end):
    in_week = []
    for ts in ts_list:
        if ts > date_start and ts <= date_end:
            in_week.append(ts)
    return in_week

In [None]:
sub_df_de['labour_hours_90_pre_treatment'] = sub_df_de['timestamps_90_before_sim_treatment'].apply(labour_hours)

sub_df_de['num_edits_90_pre_treatment'] = sub_df_de['timestamps_90_before_sim_treatment'].apply(len)

sub_df_de['labour_hours_90_post_treatment'] = sub_df_de['timestamps_90_after_sim_treatment'].apply(labour_hours)

sub_df_de['num_edits_90_post_treatment'] = sub_df_de['timestamps_90_after_sim_treatment'].apply(len)

sub_df_de[['num_edits_90_pre_treatment', 'num_edits_90_post_treatment']].plot()

sub_df_de[['labour_hours_90_pre_treatment', 'labour_hours_90_post_treatment']].plot()

sub_df_de.to_pickle('checkpoints/sub_df_de_revs_ts_labour_hours_1.pickle')

sub_df_de = pd.read_pickle('checkpoints/sub_df_de_revs_ts_labour_hours_1.pickle')

In [None]:
sub_df['labour_hours_90_pre_treatment'] = sub_df['timestamps_90_before_sim_treatment'].apply(labour_hours)

sub_df['num_edits_90_pre_treatment'] = sub_df['timestamps_90_before_sim_treatment'].apply(len)

sub_df['labour_hours_90_post_treatment'] = sub_df['timestamps_90_after_sim_treatment'].apply(labour_hours)

sub_df['num_edits_90_post_treatment'] = sub_df['timestamps_90_after_sim_treatment'].apply(len)

sub_df[['num_edits_90_pre_treatment', 'num_edits_90_post_treatment']].plot()

sub_df[['labour_hours_90_pre_treatment', 'labour_hours_90_post_treatment']].plot()

sub_df.to_pickle('checkpoints/sub_df_revs_ts_labour_hours_1.pickle')

sub_df = pd.read_pickle('checkpoints/sub_df_revs_ts_labour_hours_1.pickle')

inactive = sub_df[sub_df['active_in_90_pre_treatment']==False]
active = sub_df[sub_df['active_in_90_pre_treatment']==True]

active['num_edits_90_pre_treatment'].mean()

active['num_edits_90_post_treatment'].mean()

In [None]:
edit_col_f = 'num_edits_week_{ind}_post_treatment'
anye_col_f = 'any_edits_week_{ind}_post_treatment'
labh_col_f = 'labour_hours_week_{ind}_post_treatment'
anyl_col_f = 'any_labour_hours_week_{ind}_post_treatment'

col_fn ={edit_col_f: len,
         anye_col_f: lambda x: len(x)>0,
         labh_col_f: lambda x: labour_hours(x),
         anyl_col_f: lambda x: labour_hours(x)>0}

sub_df_list = []
try:
    sub_df_list.append(sub_df)
except NameError:
    pass
try:
    sub_df_list.append(sub_df_de)
except NameError:
    pass


for col_f, fn in col_fn.items():
    for ind, (week_n_start, week_n_end) in enumerate(window_seq(range(13))):
        ind = ind+1
        days_after_treat_start = week_n_start * 7 
        days_after_treat_end = week_n_end * 7
        date_start = sim_treatment_date + td(days=days_after_treat_start)
        date_end = sim_treatment_date + td(days=days_after_treat_end)

        col = col_f.format(ind=ind)
        print(col)

        ts_week = f'ts_in_week_{ind}'
        
        for sub_df in sub_df_list:
            sub_df[ts_week] = sub_df['timestamps_90_after_sim_treatment'].apply(lambda x: ts_in_week(x, date_start, date_end))
            sub_df[col] = sub_df[ts_week].apply(fn)


            del sub_df[ts_week]

In [61]:
@make_cached_df('disablemail')
def get_user_disablemail_properties(lang, user_id):
    con.execute(f"use {lang}wiki_p;")
    user_prop_sql = f"""select * from user_properties where up_user = {user_id}
                        and up_property = 'disablemail';"""
    df = pd.read_sql(user_prop_sql, con)
    return df


def add_has_email_currently(df):
    user_prop_dfs = []
    for lang in langs:
        user_ids =  user_ids = df[df['lang']==lang]['user_id'].values
        for user_id in user_ids:
            user_prop_df = get_user_disablemail_properties(lang, user_id)
            has_email = False if len(user_prop_df) >=1 else True #the property disables email, if it doesn't exist the default its that it's on
            user_prop_dfs.append(pd.DataFrame.from_dict({'has_email':[has_email],
                                           'user_id':[user_id], 
                                           'lang':[lang]}, orient='columns'))
    
    users_prop_df = pd.concat(user_prop_dfs)
    df = pd.merge(df, users_prop_df, how='left', on=['lang', 'user_id'])
    return df

In [85]:
@make_cached_df('total_edits')
def get_total_user_edits(lang, user_id, start_date, end_date):
    con.execute(f"use {lang}wiki_p;")
    user_edit_sql = f"""select count(*) as edits_pre_treatment from revision_userindex 
                where rev_user = {user_id} 
                and {to_wmftimestamp(start_date)} <= rev_timestamp <= {to_wmftimestamp(end_date)};
                """
    df = pd.read_sql(user_edit_sql, con)
    return df


def add_total_edits(df, start_date, end_date):
    user_edit_dfs = []
    for lang in langs:
        user_ids =  user_ids = df[df['lang']==lang]['user_id'].values
        for user_id in user_ids:
            user_edit_df = get_total_user_edits(lang, user_id, start_date, end_date)
            user_edit_df['user_id'] = user_id
            user_edit_df['lang'] = lang
            user_edit_dfs.append(user_edit_df)
    
    users_edit_df = pd.concat(user_edit_dfs)
    df = pd.merge(df, users_edit_df, how='left', on=['lang', 'user_id'])
    return df

def add_total_edits_pre_treatment(df):
    return add_total_edits(df, start_date=wikipedia_start_date, end_date=sim_treatment_date)

In [87]:
def make_data(subsample=None):
    print('starting to make data')
    df = make_populations()
    df = remove_inactive_users(df)
    if subsample:
        print(f'subsetting to {subsample} samples')
        df = df.sample(n=subsample, random_state=1854)
    
    df = add_thanks_pre_treatment(df)
    df = add_experience_bin(df)
    if not subsample:
        output_bin_stats(df)
    
    df = add_total_edits_pre_treatment(df)
    df = add_has_email_currently(df)

    return df

if __name__ == "__main__":
    subsample = os.getenv('subsample', 100)
    df = make_data(subsample)

starting to make data
subsetting to 100 samples


In [None]:
def refresh_edits(user_df):

In [88]:
df.head()

Unnamed: 0,lang,user_id,user_name,user_registration,first_edit,last_edit,num_thanks_received_pre_treatment,experience_level_pre_treatment,edits_pre_treatment,has_email
0,de,2844726,Tvaktuell,2018-02-01 15:08:52,2018-02-01 15:29:22,2018-02-01 16:02:31,0,bin_0,6,0
1,ar,1108371,Sohaib el hidouri,2015-12-08 20:57:18,2015-12-21 13:18:17,2016-01-14 21:02:47,0,bin_730,30,0
2,de,2748865,Clarundius,2017-10-05 10:24:33,2017-10-12 13:36:35,2017-10-13 12:35:21,0,bin_90,25,0
3,pl,867217,Jinowolski,2017-12-29 23:13:57,2017-12-29 23:19:16,2017-12-29 23:19:16,0,bin_0,1,0
4,de,2805566,Jdfisher,2017-12-13 08:13:31,2006-11-27 17:16:11,2009-06-27 15:14:41,0,bin_0,4,0


In [89]:
df['has_email'].min()

0

In [47]:
def get_a_minimum_rev_id(lang, start_date):
    con.execute(f'use {lang}wiki_p;')
    min_sql = f"""select * from revision where rev_timestamp <= {to_wmftimestamp(start_date)} order by rev_timestamp desc limit 1;"""
    min_df = pd.read_sql(min_sql, con)
    return min_df['rev_id'].iloc[0]

In [49]:
min_rev_ids = {}
for lang in langs:
    min_rev_ids[lang] = get_a_minimum_rev_id(lang, sim_observation_start_date)

In [51]:
min_rev_ids

{'ar': 25877148, 'fa': 21737205, 'pl': 51165195, 'de': 171711383}

###  add has email to another dataset

In [56]:
pa_dir = '/home/paprika/Tresors/CivilServant/projects/wikipedia-integration/gratitude-study/datasets/power_analysis/'
fa_ar_pl_csv = 'gratitude_power-analysis_dataset_sim_date_20180306_v1.csv'
de_csv = 'de_gratitude_power-analysis_dataset_sim_date_20180306_v1.csv'
thankees = pd.read_csv(os.path.join(pa_dir, fa_ar_pl_csv), index_col=0)
thankees_de = pd.read_csv(os.path.join(pa_dir, de_csv), index_col=0)

thankees = add_has_email_currently(thankees)

thankees_de = add_has_email_currently(thankees_de)

In [65]:
thankees['has_email'].value_counts()

True     8416
False     171
Name: has_email, dtype: int64

In [66]:
thankees_de['has_email'].value_counts()

True     1759
False      41
Name: has_email, dtype: int64

In [69]:
thankees.to_csv(os.path.join(pa_dir, fa_ar_pl_csv.split('v1.csv')[0]+"with_email.csv"))

In [70]:
thankees_de.to_csv(os.path.join(pa_dir, de_csv.split('v1.csv')[0]+"with_email.csv"))