In [1]:
%load_ext autoreload
%autoreload 2
%matplotlib inline
import warnings
warnings.filterwarnings('ignore')
import pandas as pd
from db_utils import query_hive_ssh
import re
import copy
from diff_utils import *
import time
import numpy as np
import os
import multiprocessing as mp

In [10]:
datasets = [  
    {
        'table':'blocked_talk_diff_no_admin',
        'partition':'ns=article', 
        'ns': 'article',
        'name': 'all_blocked_user',
    },
    {
        'table':'blocked_talk_diff_no_admin',
        'partition':'ns=user', 
        'ns': 'user',
        'name': 'all_blocked_user',
    },
    {
        'table':'user_talk_diff_no_admin_sample',
        'partition':'', 
        'ns': 'user',
        'name': 'talk_diff_no_admin_sample',
    },
    {
        'table':'article_talk_diff_no_admin_sample',
        'partition':'', 
        'ns': 'article',
        'name': 'talk_diff_no_admin_sample',
    },
    {
        'table':'talk_diff_no_admin',
        'partition':'ns=article/year=2015', 
        'ns': 'article',
        'name': 'talk_diff_no_admin_2015',
    },
    {
        'table':'talk_diff_no_admin',
        'partition':'ns=user/year=2015', 
        'ns': 'user',
        'name': 'talk_diff_no_admin_2015',
    }
]

In [11]:
def transfer_partition(params, dry = False):
    hdfs_path = '/user/hive/warehouse/enwiki.db/%(table)s/%(partition)s' % params
    stat2_path = '/home/ellery/talk_page_abuse/wikipedia/data/samples/%(ns)s/%(name)s' % params
    local_path = '/Users/ellerywulczyn/talk_page_abuse/wikipedia/data/samples/%(ns)s/%(name)s/' % params

    if not dry:
        # transfer from HDFS to stat2
        cmd = "ssh stat1002.eqiad.wmnet 'rm -rf %s'" % stat2_path
        os.system(cmd)
        cmd = "ssh stat1002.eqiad.wmnet 'hadoop fs -copyToLocal %s %s '" % (hdfs_path, stat2_path)
        os.system(cmd)
        #transfer from stat2 to local
        cmd = 'rm -rf %s' % local_path
        os.system(cmd)
        cmd = 'rsync -avz  stat1002.eqiad.wmnet:%s/* %s' % (stat2_path, local_path)
        os.system(cmd)
    
    return local_path

In [12]:
dirs = []
for p in datasets:
    dirs.append(transfer_partition(p, dry = True))

__Cleaning and Filtering__

In [13]:
def cf_helper(path, k = 5):
    names = ['rev_comment', 'insertion', 'insert_only', 'rev_id', 'page_id', 'page_title', 'rev_timestamp', 'user_id', 'user_text']
    df = pd.read_csv(path, sep = '\t', quoting = 3, encoding = 'utf-8', header = None, usecols=range(len(names)))
    if df.shape[0] ==0:
        return pd.DataFrame(columns = names)
    if df.shape[1] != len(names):
        print(path)
        print(df.shape)
        return pd.DataFrame(columns = names)
    df.columns = names
    df = df.assign(key = lambda x: np.random.randint(0, high=5*k, size=x.shape[0]))
    dfs = [e[1] for e in df.groupby('key')]
    p = mp.Pool(k)
    dfs = p.map(clean_and_filter, dfs)
    p.close()
    p.join()
    return pd.concat(dfs)

In [17]:
def clean_and_filter_parallel(path, k = 7):
    files = []
    for root, dirnames, filenames in os.walk(path):
        for filename in filenames:
            if '_0' in filename:
                files.append(os.path.join(root, filename))
    dfs = [cf_helper(p, k = k) for p in files]
    df = pd.concat(dfs)
    del df['key']
    return df

In [18]:
for path in dirs:
    clean_and_filter_parallel(path).to_csv(path[:-1] +'.tsv', sep = '\t', index = False)

/Users/ellerywulczyn/talk_page_abuse/wikipedia/data/v4/user/talk_diff_no_admin_2015/000008_0
/Users/ellerywulczyn/talk_page_abuse/wikipedia/data/v4/user/talk_diff_no_admin_2015/000017_0
/Users/ellerywulczyn/talk_page_abuse/wikipedia/data/v4/user/talk_diff_no_admin_2015/000055_0
/Users/ellerywulczyn/talk_page_abuse/wikipedia/data/v4/user/talk_diff_no_admin_2015/000056_0
/Users/ellerywulczyn/talk_page_abuse/wikipedia/data/v4/user/talk_diff_no_admin_2015/000066_0
/Users/ellerywulczyn/talk_page_abuse/wikipedia/data/v4/user/talk_diff_no_admin_2015/000068_0
/Users/ellerywulczyn/talk_page_abuse/wikipedia/data/v4/user/talk_diff_no_admin_2015/000103_0
/Users/ellerywulczyn/talk_page_abuse/wikipedia/data/v4/user/talk_diff_no_admin_2015/000113_0
/Users/ellerywulczyn/talk_page_abuse/wikipedia/data/v4/user/talk_diff_no_admin_2015/000137_0
/Users/ellerywulczyn/talk_page_abuse/wikipedia/data/v4/user/talk_diff_no_admin_2015/000141_0
/Users/ellerywulczyn/talk_page_abuse/wikipedia/data/v4/user/talk_diff_

### Download `block_events` and `blocked_users`

In [19]:
query = """
SELECT 
      *
FROM
    enwiki.block_events
"""

block_events_df = query_hive_ssh(query, '../../data/block_events.tsv', priority = True, quoting=3, delete=False)
block_events_df.columns = [c.split('.')[1] for c in block_events_df.columns]

In [20]:
query = """
SELECT 
      *
FROM
    enwiki.blocked_user
"""

blocked_user_df = query_hive_ssh(query, '../../data/blocked_user.tsv', priority = True, quoting=3, delete=False)
blocked_user_df.columns = [c.split('.')[1] for c in blocked_user_df.columns]

### Download NPA warnings

In [7]:
query = """
SELECT 
      *
FROM
    enwiki.npa_warnings
"""

npa_warnings_df = query_hive_ssh(query, '../../data/npa_warnings.tsv', priority = True, quoting=3, delete=False)
npa_warnings_df.columns = [c.split('.')[1] for c in npa_warnings_df.columns]

### Download Long term Users

In [3]:
query = """
SELECT
    user_text,
    COUNT(*) AS num_days
FROM
    (SELECT
        user_text,
        day
    FROM
        (SELECT 
            rev_user_text AS user_text,
            SUBSTR(rev_timestamp,0,8) AS day
        FROM
            enwiki.revision
        WHERE
            rev_user != 0
            AND rev_timestamp <= '2015-01-01'
        ) a
    GROUP BY
        user_text,
        day ) b
GROUP BY
    user_text
HAVING
    COUNT(*) > 7 
"""

long_term_users_df = query_hive_ssh(query, '../../data/long_term_users.tsv', priority = True, quoting=3, delete=False)

### Download Gender

In [None]:
## Annotate users by gender
query = """
SELECT
    user_id,
    user_name as user_text,
    up_value as gender
FROM
    enwiki.user_properties p,
    enwiki.user u
WHERE 
    p.up_user = u.user_id
    AND up_property = 'gender'
"""
d_gender = query_analytics_store(query, {})
d_gender.to_csv('../../data/genders.tsv', sep = '\t', index = False)

# Onionize `all_blocked_user`

We want to get the k posts before and after each block event for different values of [k1, k2, ..kn]. In order for us to grow k as we please without labeling headaches, we will create a file containing the k_i-1 through k_i posts for each block event that we have not yet labeled.

In [29]:
ns = 'article'
rel_path = '../../data/samples'
infile = os.path.join(rel_path, ns, 'all_blocked_user.tsv')
out_dir = os.path.join(rel_path, ns, 'blocked_user_onion')
df = pd.read_csv(infile, sep = '\t')

In [31]:
users = list(set(df['user_text']))
print(len(users))

k_prev = 0
ks = [5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 150, 200, 250, 300, 500, 1000]
dfs = {k:[] for k in ks}

t1 = time.time()
for i, user in enumerate(users):
    if i % 1000 ==0:
        print (i)
        print(time.time()-t1)
        t1 = time.time()
    df_user = df[df['user_text'] == user].sort_values(by='rev_timestamp')
    if df_user.shape[0] == 0:
        continue

    block_events_df_user = block_events_df[block_events_df['user_text']==user]
    seen_ids = set()


    for i,r in block_events_df_user.iterrows():
        ts = r['timestamp']
        for k in ks:
            df_user_pre = df_user[df_user['rev_timestamp'] <= ts][-k:]

            if df_user_pre.shape[0] > 0:
                df_user_pre = df_user_pre[df_user_pre['rev_id'].apply(lambda x: x not in seen_ids )]
                if df_user_pre.shape[0] > 0:
                    seen_ids.update(tuple(df_user_pre['rev_id']))
                    dfs[k].append(df_user_pre)

            df_user_post = df_user[df_user['rev_timestamp'] > ts][:k]
            if df_user_post.shape[0] > 0:
                df_user_post = df_user_post[df_user_post['rev_id'].apply(lambda x: x not in seen_ids ) ]
                if df_user_post.shape[0] > 0:
                    seen_ids.update(tuple(df_user_post['rev_id']))
                    dfs[k].append(df_user_post)

dfs = {k: pd.concat(v) for k,v in dfs.items()}

sizes = [(k, len(v)) for k,v in dfs.items()]
sizes.sort(key=lambda x: x[0])
print(sizes)


os.system('rm -rf %s' % out_dir)
os.system('mkdir  %s' % out_dir)

for k, v in dfs.items():
    v.iloc[np.random.permutation(len(v))].to_csv(out_dir +'/%d.tsv' % k, sep = '\t', index=False)

6437
0
0.0004057884216308594
1000
95.09861302375793
2000
91.08967995643616
3000
102.37120795249939
4000
94.57161092758179
5000
88.98756098747253
6000
93.17013096809387
[(5, 33047), (10, 22033), (20, 34280), (30, 28379), (40, 24505), (50, 22110), (60, 20376), (70, 18933), (80, 17751), (90, 16756), (100, 15943), (150, 71089), (200, 60239), (250, 52713), (300, 46206), (500, 145013), (1000, 228234)]


FileNotFoundError: [Errno 2] No such file or directory: '../../data/v4/article/blocked_user_onion/100.tsv'