In [1]:
%load_ext autoreload
%autoreload 2

import sys
HOME = '/srv/home/christinedk/wp_internship/'
DATA_DIR = HOME + 'data/'
sys.path.append(HOME + 'collaboration/')
from config import TEMPLATES

In [2]:
import pandas as pd
import numpy as np
import matplotlib.pylab as plt
from time import time
from math import log2
import json

import dateutil 
from datetime import datetime
from dateutil.relativedelta import relativedelta

from pyspark.sql import functions as f
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, StringType, IntegerType, DoubleType, FloatType

In [3]:
def talk_article_ratio(page_namespace_list):
    page_namespace_list = np.array(page_namespace_list)
    talk = np.sum(page_namespace_list==1) + 1
    article = np.sum(page_namespace_list==0) + 1
    return float(article/talk)
udf_page_talk_ratio = udf(talk_article_ratio,FloatType())

def entropy(p):
    return -sum([p[i] * log2(p[i]) for i in range(len(p))])

def contribution_fracs(page_ids):
    counts = np.unique(page_ids,return_counts=True)[1]
    cf = counts/sum(counts)
    entropy_cf = float(entropy(cf))
    return entropy_cf
udf_contribution_frac = udf(contribution_fracs,FloatType())

def read_revisions(filename, rename=False):
    revisions = pd.read_json(filename,lines=True)
    if rename:
        revisions = revisions.rename(columns = {'revision_timestamp':'event_timestamp','user_id':'event_user_id'})
    revisions['event_timestamp'] = pd.to_datetime(revisions['event_timestamp'])
    revisions = revisions.sort_values(by='event_timestamp', ascending=True)
    return revisions

def np_encoder(object):
    if isinstance(object, np.generic):
        return object.item()

In [4]:
def get_editor_features(tag_user_histories):
    
    tag_features = tag_user_histories.groupby('event_user_id')\
         .agg(f.last('num_groups').alias('num_groups'),
           f.countDistinct('page_id').alias('num_articles'),
           f.count('revision_id').alias('num_edits'),
           f.last('num_blocks_historical').alias('num_past_blocks'),
           f.last('num_curr_blocks').alias('num_curr_blocks'),
           f.sum(col("is_revert_bool")).alias('num_reverts_by_others'),
           f.sum(col('is_reverted_bool')).alias('num_reverts_of_others'),
           f.last('days_since_registration').alias('time_since_registration'),
           udf_page_talk_ratio(f.collect_list('page_namespace')).alias('talk_article_ratio'),
           udf_contribution_frac(f.collect_list('page_id')).alias('contribution_frac_entropy') 
          )
    
    return tag_features

def get_user_interactions(user_histories_1year):
    user_page_revisions = user_histories_1year.select(col('page_id'),col('event_user_id'),
                                                      col('revision_id'),col('page_namespace'))\
                                .groupBy("page_id","event_user_id").agg(
                                f.count("revision_id").alias("revisions_count"),
                                f.first("page_namespace").alias('page_namespace'))
                                #f.first("concentration_ratio").alias('concentration_ratio'))

    self_join_df = user_page_revisions.toDF(*[c + '_r' for c in user_page_revisions.columns])
    editor_interactions = user_page_revisions.join(self_join_df,[user_page_revisions.page_id == self_join_df.page_id_r,
                           user_page_revisions.event_user_id != self_join_df.event_user_id_r]).drop('page_id_r')
    
    return editor_interactions

def calculate_concentration_ratios(user_histories):
    concentration_ratio = user_histories.groupby('page_id').agg(
                                    f.countDistinct('event_user_id').alias('num_editors'),
                                    f.count('event_user_id').alias('num_revisions'))\
                                    .withColumn('concentration_ratio',col('num_editors')/col('num_revisions'))

    user_histories = user_histories.join(concentration_ratio,on='page_id')
    return user_histories

def get_directed_features(paired_interactions):
    user_article_edits = paired_interactions.groupby('event_user_id')\
                             .agg(f.sum('num_common_articles').alias('editor_pages_total'),
                             f.sum('num_revisions_articles').alias('editor_revisions_total'))

    directed = paired_interactions\
        .join(user_article_edits.select('event_user_id','editor_pages_total','editor_revisions_total'), on="event_user_id")\
        .withColumn("coedit_ratio", (col("num_common_articles") / col("editor_pages_total")))\
        .withColumn('coedit_revisions_ratio', (col('num_revisions_articles') / col('editor_revisions_total')))\
        .select('event_user_id','event_user_id_r','coedit_ratio', 'coedit_revisions_ratio')  
    
    return directed


def get_undirected_features(paired_interactions, paired_interactions_articles):
    features_all = paired_interactions.withColumn('pair',f.array_sort(f.array(col('event_user_id'),col('event_user_id_r'))))\
                            .drop_duplicates(subset=['pair'])\
                            .select('pair','num_common_pages')
    features_articles = paired_interactions_articles.withColumn('pair',f.array_sort(f.array(col('event_user_id'),col('event_user_id_r'))))\
                                    .drop_duplicates(subset=['pair'])\
                                    .select('pair','num_common_articles')#,'mean_concentration_ratio')

    undirected = features_all.join(features_articles,on='pair')
    return undirected

def calculate_collaboration_features(editor_interactions):
    #editor_interactions = calculate_concentration_ratios(editor_interactions)
    
    paired_interactions_articles = editor_interactions.filter(col('page_namespace')==0)\
                            .groupby('event_user_id','event_user_id_r')\
                            .agg(f.count("page_id").alias('num_common_articles'),
                                 f.sum('revisions_count').alias('num_revisions_articles'))\
                            .cache()
                            #f.mean('concentration_ratio').alias('mean_concentration_ratio')
                            #.filter(col('num_common_articles')>=5)
    
    paired_interactions_all = editor_interactions.groupby('event_user_id','event_user_id_r')\
                                             .agg(f.count("page_id").alias('num_common_pages'))
    
    directed_features = get_directed_features(paired_interactions_articles)
    undirected_features = get_undirected_features(paired_interactions_all, paired_interactions_articles)
                                                  
    return directed_features,undirected_features

In [6]:
user_histories = spark.read.parquet('editors/user_histories.parquet')
user_histories = user_histories.withColumn('event_timestamp',
                                           f.to_timestamp(col('event_timestamp')))\
                                .withColumn('event_user_registration_timestamp',
                                           f.to_timestamp(col('event_user_registration_timestamp')))\
                                .withColumn('is_revert_bool',col("revision_is_identity_revert").cast("long"))\
                                .withColumn('is_reverted_bool',col("revision_is_identity_reverted").cast("long"))

user_histories = user_histories.orderBy(col('event_timestamp'))

In [None]:
for template in ['peacock']:
    print(template)
    t1=time()
    labels = pd.read_csv(DATA_DIR + 'labels/{}.csv'.format(template))
    labels['event_timestamp'] = pd.to_datetime(labels['event_timestamp'])

    revisions = read_revisions(DATA_DIR + 'page_history/page_history-{}-meta-info.json'.format(template))

    features = []
    pages = revisions.groupby('page_id')

    print('total: ',len(labels))
    i=0
    for tag_date, page_id in labels.values:
        print('processing :',i)
        i+=1

        page_revisions = pages.get_group(page_id)
        
        # I think this can be done in a better way
        tag_users = page_revisions[page_revisions.event_timestamp <= tag_date]\
                                    .event_user_id.dropna().unique().tolist()
        tag_user_histories = user_histories.filter(col('event_timestamp')<=tag_date)\
                                           .filter(col('event_user_id').isin(tag_users))\
                                           .withColumn('days_since_registration',f.datediff(f.lit(tag_date),col('event_user_registration_timestamp')))
        #tag_user_histores = calculate_concentration_ratios(tag_user_histories)\


        editor_features = get_editor_features(tag_user_histories).toPandas().to_dict('records')

        start_date = tag_date - relativedelta(years=1)
        user_histories_1year = tag_user_histories.filter(col("event_timestamp").between(start_date,tag_date))
        editor_interactions = get_user_interactions(user_histories_1year)    

        directed_features, undirected_features = calculate_collaboration_features(editor_interactions)
        collaboration = {'directed':directed_features.toPandas().to_dict('records'), 
                         'undirected':undirected_features.toPandas().to_dict('records')}

        features.append({'date':str(tag_date),'page_id':page_id,'editor':editor_features,
                         'collaboration':collaboration})
        
    with open(HOME +'features/editors'+template+'.json','w') as file:
        json.dump(features,file,default=np_encoder)
    
    print('time: ',int(time()-t1))

In [None]:
len(features)

In [None]:
with open(HOME +'features/editors'+template+'.json','w') as file:
    json.dump(features,file,default=np_encoder)

In [None]:
# restart

In [5]:
user_histories = spark.read.parquet('editors/user_histories.parquet')
user_histories = user_histories.withColumn('event_timestamp',
                                           f.to_timestamp(col('event_timestamp')))\
                                .withColumn('event_user_registration_timestamp',
                                           f.to_timestamp(col('event_user_registration_timestamp')))\
                                .withColumn('is_revert_bool',col("revision_is_identity_revert").cast("long"))\
                                .withColumn('is_reverted_bool',col("revision_is_identity_reverted").cast("long"))

user_histories = user_histories.orderBy(col('event_timestamp'))

In [6]:
template='peacock'

with open(HOME +'features/editors'+template+'_v2.json','rb') as file:
    features = json.load(file)
    
len(features)

4286

In [7]:
print(template)
t1=time()
labels = pd.read_csv(DATA_DIR + 'labels/{}.csv'.format(template))
labels['event_timestamp'] = pd.to_datetime(labels['event_timestamp'])

revisions = read_revisions(DATA_DIR + 'page_history/page_history-{}-meta-info.json'.format(template))
pages = revisions.groupby('page_id')

peacock


In [8]:
len(labels)

5174

In [None]:
i=len(features)
restart = len(features)

for tag_date, page_id in labels.values[restart:]:
    print('processing :',i)
    i+=1

    page_revisions = pages.get_group(page_id)

    # I think this can be done in a better way
    tag_users = page_revisions[page_revisions.event_timestamp <= tag_date]\
                                .event_user_id.dropna().unique().tolist() 
    tag_user_histories = user_histories.filter(col('event_timestamp')<=tag_date)\
                                       .filter(col('event_user_id').isin(tag_users))\
                                       .withColumn('days_since_registration',f.datediff(f.lit(tag_date),col('event_user_registration_timestamp')))
    #tag_user_histories = calculate_concentration_ratios(tag_user_histories)\

    editor_features = get_editor_features(tag_user_histories).toPandas().to_dict('records')

    start_date = tag_date - relativedelta(years=1)
    user_histories_1year = tag_user_histories.filter(col("event_timestamp").between(start_date,tag_date))
    editor_interactions = get_user_interactions(user_histories_1year)    

    directed_features, undirected_features = calculate_collaboration_features(editor_interactions)
    collaboration = {'directed':directed_features.toPandas().to_dict('records'), 
                     'undirected':undirected_features.toPandas().to_dict('records')}

    features.append({'date':str(tag_date),'page_id':page_id,'editor':editor_features,
                     'collaboration':collaboration})
        
with open(HOME +'features/editors'+template+'_v2.json','w') as file:
    json.dump(features,file,default=np_encoder)

processing : 4286
processing : 4287
processing : 4288
processing : 4289
processing : 4290
processing : 4291
processing : 4292
processing : 4293
processing : 4294
processing : 4295
processing : 4296
processing : 4297
processing : 4298


In [12]:
len(features)

4286

In [13]:
with open(HOME +'features/editors'+template+'_v2.json','w') as file:
    json.dump(features,file,default=np_encoder)