In [30]:
import pandas as pd
import numpy as np

import pyspark.sql.functions as f
from pyspark.sql.types import *
from pyspark.sql import Window
from functools import reduce
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import udf


In [32]:
import mwcomments

In [33]:
from project_settings import *

In [34]:
cutoffs = pd.read_csv(os.path.join(data_dir,"ores_rcfilters_cutoffs.csv"))

In [35]:
cutoffs

Unnamed: 0,commitsha,damaging_hard,damaging_likelybad_max,damaging_likelybad_min,damaging_likelygood_max,damaging_likelygood_min,damaging_maybebad_max,damaging_maybebad_min,damaging_soft,damaging_softest,...,extension_status_default,rcfilters_watchlist_enabled_default,commitsha_default,has_extension,has_beta_extension,has_rcfilters,has_rcfilters_watchlist,commit_dt,deploy_dt,deploy_gap
0,c19a7d1dd4f8b720ef99cd5a33a9853a0555fa1c,,1.0,maximum recall @ prevision >= 0.45,maximum recall @ precision >= 0.997,0.0,,,,,...,,True,22f6ec8967a3b66019881f27bb37515eaae855bc,False,False,True,True,2018-05-09 16:56:42,2018-05-09 18:04:00,0 days 01:07:18.000000000
1,cb38f9b175d3effa7f6f8e058343592231f8f7f0,,,,,,,,,,...,,True,22f6ec8967a3b66019881f27bb37515eaae855bc,False,False,True,True,2018-07-09 11:45:34,2018-07-09 11:48:00,0 days 00:02:26.000000000
2,3904582dfdc4dbca73b9c9d1676915e40fe624c2,,,,,,,,,,...,,True,22f6ec8967a3b66019881f27bb37515eaae855bc,False,False,True,True,2018-05-09 16:13:28,2018-05-09 18:04:00,0 days 01:50:32.000000000
3,c1a19f963223599ed485f5282a5f25f1a1503b1a,0.45,,,,,,,0.80,0.88,...,,,83721dc27d3ecadb550a759e9004c794ad0cb792,False,False,False,False,2017-02-21 16:49:30,2017-02-22 00:33:00,0 days 07:43:30.000000000
4,487dbe5cc51da5a1b6c9b35ab80c43d1f092ce9c,0.45,,,,,,,0.80,0.88,...,on,,bfc85146c44bb7396fdda631c8eb9ea97e9691bf,True,False,True,False,2017-05-09 16:03:15,2017-05-09 23:54:00,0 days 07:50:45.000000000
5,5dfa70bd7e90d254e7aeb41055e428ad73ea8ef5,,,,,,,,,,...,,True,22f6ec8967a3b66019881f27bb37515eaae855bc,False,False,True,True,2018-03-14 16:09:06,2018-03-14 17:18:00,0 days 01:08:54.000000000
6,25d42eedf2f086480643810fd8c65a818eae7fd3,0.14,,,,,,,0.43,,...,,,83721dc27d3ecadb550a759e9004c794ad0cb792,False,False,False,False,2016-08-22 10:50:01,2016-08-22 13:09:00,0 days 02:18:59.000000000
7,868fbd422cd1f9a8be9b4b4c85f3a0981036a851,0.17,,,recall_at_precision(min_precision=0.99),0.0,1.0,recall_at_precision(min_precision=0.15),0.49,0.96,...,on,,bfc85146c44bb7396fdda631c8eb9ea97e9691bf,True,False,True,False,2017-04-27 16:13:51,2017-04-27 18:41:00,0 days 02:27:09.000000000
8,5dfa70bd7e90d254e7aeb41055e428ad73ea8ef5,,1.0,maximum recall @ precision >= 0.45,maximum recall @ precision >= 0.99,0.0,1.0,maximum recall @ precision >= 0.15,,,...,,True,22f6ec8967a3b66019881f27bb37515eaae855bc,False,False,True,True,2018-03-14 16:09:06,2018-03-14 17:18:00,0 days 01:08:54.000000000
9,27d82450bdba45256e571d0c791fd62663b0c91c,,,,,,,,,,...,,True,22f6ec8967a3b66019881f27bb37515eaae855bc,False,False,True,True,2018-02-28 16:10:27,2018-02-28 18:19:00,0 days 02:08:33.000000000


In [36]:
# compare 14 days before and after the cutoff
# unless there's another cutoff less than 28 days away, in which case split the difference
by_wiki = cutoffs.groupby('wiki_db')

In [37]:
cutoffs['date'] = pd.to_datetime(cutoffs.deploy_dt)
cutoffs = cutoffs.drop("deploy_gap",1)
cutoffs = cutoffs.drop("deploy_dt",1)
cutoffs = cutoffs.drop("commit_dt",1)


In [38]:
def set_cutoff_period(df):
    df = df.sort_values(by=['date'])
    next_cutoff = df.shift(1)
    df['time_since_last_cutoff']  = df.date - df.shift(1).date
    df['time_till_next_cutoff']  = df.shift(-1).date - df.date
    df = df.reset_index()
    return df

In [39]:
cutoffs = cutoffs.groupby("wiki_db").apply(set_cutoff_period)

cutoffs = cutoffs.drop('wiki_db',1).reset_index()
cutoffs = cutoffs.drop("level_1",1)

select =[ 'wiki_db','has_ores','has_rcfilters','has_rcfilters_watchlist','time_since_last_cutoff','time_till_next_cutoff','date']

In [40]:
# We special case wikis where some issues lead to changes and deployments that we don't want to analyze. 
# fawiki: bug leads to cutoff disabling ores for 2 days. These won't show up in any other interval, so ignore them. 
cutoffs = cutoffs.loc[~((cutoffs.wiki_db == 'fawiki') & ( (cutoffs.date == pd.to_datetime("2017-12-09 11:19:00")) | (cutoffs.date == pd.to_datetime("2017-12-11 18:56:00"))))]

In [41]:
# etwiki, frwiki, and hewiki apparently turned on rcfilters 50 days after enabling ORES. This is OK. The periods overlap.

#frwiki and ruwiki experienced a bug on the deployment of rcfilters to watchlist. So let's ignore them for those messages.   

cutoffs = cutoffs.loc[~((cutoffs.wiki_db == 'frwiki') & (cutoffs.date >= pd.to_datetime("2017-11-09 14:35:00")))]

cutoffs = cutoffs.loc[~((cutoffs.wiki_db == 'ruwiki') & (cutoffs.date >= pd.to_datetime("2017-11-20 19:22:00") ))]

In [42]:
cutoffs.loc[(cutoffs.time_since_last_cutoff<=pd.Timedelta(60,'D')) | (cutoffs.time_till_next_cutoff<=pd.Timedelta(60,'D')), select]

Unnamed: 0,wiki_db,has_ores,has_rcfilters,has_rcfilters_watchlist,time_since_last_cutoff,time_till_next_cutoff,date
12,etwiki,True,False,False,NaT,50 days 05:26:00,2017-03-20 18:28:00
13,etwiki,True,True,False,50 days 05:26:00,308 days 17:24:00,2017-05-09 23:54:00
22,frwiki,False,False,False,NaT,55 days 07:02:00,2017-04-11 11:09:00
23,frwiki,True,True,False,55 days 07:02:00,156 days 20:24:00,2017-06-05 18:11:00
27,hewiki,True,False,False,NaT,29 days 10:38:00,2017-04-10 13:16:00
28,hewiki,True,True,False,29 days 10:38:00,308 days 17:24:00,2017-05-09 23:54:00
32,kowiki,False,True,False,NaT,0 days 00:00:00,2019-03-04 16:27:00
33,kowiki,True,True,True,0 days 00:00:00,NaT,2019-03-04 16:27:00
65,wikidatawiki,False,False,False,169 days 13:27:00,3 days 23:58:00,2017-10-26 13:21:00
66,wikidatawiki,True,True,True,3 days 23:58:00,28 days 05:52:00,2017-10-30 13:19:00


In [43]:
cutoffs.loc[cutoffs.wiki_db == 'arwiki']

Unnamed: 0,wiki_db,index,commitsha,damaging_hard,damaging_likelybad_max,damaging_likelybad_min,damaging_likelygood_max,damaging_likelygood_min,damaging_maybebad_max,damaging_maybebad_min,...,extension_status_default,rcfilters_watchlist_enabled_default,commitsha_default,has_extension,has_beta_extension,has_rcfilters,has_rcfilters_watchlist,date,time_since_last_cutoff,time_till_next_cutoff
0,arwiki,0,c19a7d1dd4f8b720ef99cd5a33a9853a0555fa1c,,1.0,maximum recall @ prevision >= 0.45,maximum recall @ precision >= 0.997,0.0,,,...,,True,22f6ec8967a3b66019881f27bb37515eaae855bc,False,False,True,True,2018-05-09 18:04:00,NaT,NaT


In [44]:
# kowiki was enabled over two commits with the same deploy time
cutoffs = cutoffs.drop(32)

In [45]:
cutoffs.loc[cutoffs.wiki_db == 'wikidatawiki',select]

Unnamed: 0,wiki_db,has_ores,has_rcfilters,has_rcfilters_watchlist,time_since_last_cutoff,time_till_next_cutoff,date
63,wikidatawiki,True,False,False,NaT,321 days 08:41:00,2016-06-22 15:13:00
64,wikidatawiki,True,True,False,321 days 08:41:00,169 days 13:27:00,2017-05-09 23:54:00
65,wikidatawiki,False,False,False,169 days 13:27:00,3 days 23:58:00,2017-10-26 13:21:00
66,wikidatawiki,True,True,True,3 days 23:58:00,28 days 05:52:00,2017-10-30 13:19:00
67,wikidatawiki,False,False,False,28 days 05:52:00,0 days 00:00:00,2017-11-27 19:11:00
68,wikidatawiki,True,True,True,0 days 00:00:00,NaT,2017-11-27 19:11:00


In [46]:
# wikidatawiki had an issue on 2017-10-30 and 2017-11-27 with the move to default on watchlist so we'll ignore that cutoff

In [47]:
cutoffs = cutoffs.loc[~((cutoffs.wiki_db == 'wikidatawiki') & (cutoffs.date >= pd.to_datetime("2017-10-26 13:21:00")))]


In [48]:
cutoffs.loc[(cutoffs.time_since_last_cutoff<=pd.Timedelta(60,'D')) | (cutoffs.time_till_next_cutoff<=pd.Timedelta(60,'D')), select]

Unnamed: 0,wiki_db,has_ores,has_rcfilters,has_rcfilters_watchlist,time_since_last_cutoff,time_till_next_cutoff,date
12,etwiki,True,False,False,NaT,50 days 05:26:00,2017-03-20 18:28:00
13,etwiki,True,True,False,50 days 05:26:00,308 days 17:24:00,2017-05-09 23:54:00
22,frwiki,False,False,False,NaT,55 days 07:02:00,2017-04-11 11:09:00
23,frwiki,True,True,False,55 days 07:02:00,156 days 20:24:00,2017-06-05 18:11:00
27,hewiki,True,False,False,NaT,29 days 10:38:00,2017-04-10 13:16:00
28,hewiki,True,True,False,29 days 10:38:00,308 days 17:24:00,2017-05-09 23:54:00
33,kowiki,True,True,True,0 days 00:00:00,NaT,2019-03-04 16:27:00


In [49]:
# build a table of date intervals before and after cutoffs

In [50]:
cutoffs['period_start'] = cutoffs.date - pd.Timedelta(14,'d')
cutoffs['period_end'] = cutoffs.date + pd.Timedelta(14,'d')

In [51]:
# take a stratified sample of edits in the cutoffs
# stratify by wiki_db, is_newcomer, is_anon, is_reverted, and revert_tool
wmhist = spark.read.table("wmf.mediawiki_history")

wmhist = wmhist.filter(f.col("snapshot") == "2019-07")
# ok we're ready to fire up spark and make a stratified sample
# we only need the latest snapshot

wmhist = wmhist.filter((f.col("event_entity") == "revision"))

In [52]:
import pyspark.sql.functions as f

In [53]:
from spark_functions import build_wmhist_step1, process_reverts, broadcast_match_comment, add_revert_types

In [54]:
broadcast_match_comment(spark.sparkContext)

<function spark_functions.broadcast_match_comment.<locals>.my_match_comment>

In [55]:
wmhist = wmhist.filter(wmhist.page_namespace == 0)

In [56]:
wmhist = build_wmhist_step1(wmhist)

In [57]:
??process_reverts

In [58]:
#wmhist.show()

In [59]:
reverts = process_reverts(wmhist,spark)

In [60]:
# select only the columns we need from reverts
reverts = reverts.select(['wiki_db_l','revert_timestamp','reverted_revision_id',f.col('role_type').alias("revert_role_type"),f.col('anon_new_established').alias('reverted_anon_new_established'),'is_damage','time_to_revert','revert_comment','revert_user_Nreverts_past_month','revert_user_text','revert_user_id'])


In [61]:
# for the time to revert analysis, we only want damaging ones, but for is_reverted we want all reverts
wmhist = wmhist.join(reverts, on =[wmhist.wiki_db == reverts.wiki_db_l, wmhist.revision_id == reverts.reverted_revision_id],how='left_outer')

In [62]:
#wmhist = wmhist.join(cutoffs, on=[wmhist.wiki_db==cutoffs.wiki_db_l, f.unix_timestamp(wmhist.event_timestamp) >= f.unix_timestamp(cutoffs.period_start), f.unix_timestamp(wmhist.event_timestamp) <= f.unix_timestamp(cutoffs.period_end)],how='right_outer')

In [63]:
#wmhist.show()

In [64]:
#wmhist = wmhist.withColumn("sec_to_cutoff", (f.unix_timestamp(f.col("event_timestamp")) - f.unix_timestamp(f.col("date"))) / 1000)

In [65]:
wmhist = add_revert_types(wmhist, comment_column='revert_comment')

In [66]:
wmhist = wmhist.cache()

In [67]:
cutoffs_df = spark.createDataFrame(cutoffs[['wiki_db','period_start','period_end','date']])
cutoffs_df = cutoffs_df.withColumnRenamed('wiki_db','wiki_db_l')
cutoffs_df = f.broadcast(cutoffs_df)

join_cond = (wmhist.wiki_db == cutoffs_df.wiki_db_l) & f.unix_timestamp(wmhist.event_timestamp).between(f.unix_timestamp(cutoffs_df.period_start),f.unix_timestamp(cutoffs_df.period_end))

In [68]:
#wmhist = wmhist.repartition(10000)

In [69]:
wmhist = wmhist.join(cutoffs_df,on=join_cond, how='inner')

In [70]:
# ores_cutoff_cond = None
# rcfilters_cutoff_cond = None
# rcfilters_watchlist_cutoff_cond = None
# for _, cutoff in cutoffs.iterrows():
#     cond = ((wmhist.wiki_db == cutoff.wiki_db) & (f.unix_timestamp(wmhist.event_timestamp) >= cutoff.period_start.timestamp()) & (f.unix_timestamp(wmhist.event_timestamp) <= cutoff.period_end.timestamp()))
            
#     if ores_cutoff_cond is None:
#         ores_cutoff_cond = (cond & f.lit(cutoff.ores_cutoff == True))
#         rcfilters_cutoff_cond = (cond & f.lit(cutoff.rcfilters_cutoff == True))
#         rcfilters_watchlist_cutoff_cond = (cond & f.lit(cutoff.watchlist_cutoff == True))
#     else:
#         ores_cutoff_cond = ores_cutoff_cond | (cond & f.lit(cutoff.ores_cutoff == True))
#         rcfilters_cutoff_cond = rcfilters_cutoff_cond | (cond & f.lit(cutoff.rcfilters_cutoff == True))
#         rcfilters_watchlist_cutoff_cond = rcfilters_watchlist_cutoff_cond | (cond & f.lit(cutoff.watchlist_cutoff == True))


In [71]:
# wmhist = wmhist.withColumn("cutoff_type", f.when(ores_cutoff_cond,'has_ores').otherwise(f.when(rcfilters_cutoff_cond,'has_rcfilters').otherwise(f.when(rcfilters_watchlist_cutoff_cond,'has_rcfilters_watchlist').otherwise(None))))

In [72]:
wmhist = wmhist.withColumn("pre_cutoff",f.unix_timestamp(wmhist.event_timestamp) <= f.unix_timestamp(wmhist.date))

In [73]:
wmhist = wmhist.withColumn('strata',f.concat_ws('_',wmhist.wiki_db,wmhist.date,wmhist.pre_cutoff,wmhist.revert_tool,wmhist.reverted_anon_new_established,wmhist.revert_role_type,wmhist.revision_is_identity_reverted))

In [74]:
wmhist = wmhist.cache()

In [75]:
#wmhist = wmhist.repartition(1200,f.col("strata"))

In [76]:
wmhist_out = wmhist.select(['wiki_db','event_timestamp','page_id','page_title','user_id','user_text','event_user_isbot1','event_user_isbot2','revision_id','revision_is_identity_reverted','anon_new_established','event_user_is_newcomer','revert_tool','time_to_revert','period_start','period_end','date','pre_cutoff'])

In [79]:
wmhist_out = wmhist_out.repartition(1)

In [None]:
wmhist_out.write.csv("/user/nathante/ores_bias_data/cutoff_revisions.csv",mode='overwrite',compression="None",header=True)

In [88]:
# get the proportion of observations in each strata
strata_count = wmhist.groupby(f.col('strata')).count()
#all_count = wmhist.count()


In [85]:
#strata_count = strata_count.collect()

In [89]:
strata_count

DataFrame[strata: string, count: bigint]

In [90]:
strata_count = strata_count.withColumn("fraction", f.when( 5000 < f.col("count"),5000/f.col("count")).otherwise(1))
strata_count = strata_count.withColumn("weight",1/strata_count.fraction)


In [91]:
samp_design = strata_count.collect()

In [92]:
#fractions = samp_design.locj['strata','fraction']
fractions = {r.strata:r.fraction for r in samp_design}

In [93]:
sample = wmhist.sampleBy("strata",fractions=fractions)

In [94]:
sample = sample.join(strata_count,on='strata')

In [95]:
sample = sample.select(['wiki_db','event_timestamp','page_id','page_title','user_id','user_text','event_user_isbot1','event_user_isbot2','revision_id','revision_is_identity_reverted','time_to_revert','anon_new_established','event_user_is_newcomer','revert_tool','period_start','period_end','date','pre_cutoff','fraction','weight'])

In [96]:
sample = sample.repartition(1)

In [97]:
sample.write.csv("/user/nathante/ores_bias_data/cutoff_revisions_sample.csv",mode='overwrite',compression='None',header=True)

In [98]:
strata_count.write.csv("/user/nathante/ores_bias_data/threshhold_strata_counts.csv",mode='overwrite',compression="None",header=True)

In [None]:
conf = spark.sparkContext.getConf()

In [None]:
conf.getAll()

In [None]:
sc.

In [None]:
edits2 = edits.select(['wiki_db','event_timestamp','event_user_is_anonymous','event_user_is_anonymous','revision_id','revision_is_identity_reverted','rcfilters_cutoff','week','sec_to_cutoff'])

In [None]:
edits2.show()

In [None]:
spark.catalog.listDatabases()

In [None]:
ores_scores = spark.read.table("ores.revision_score_public")

In [None]:
ores_scores = ores_scores.filter((f.col("model")=="damaging") & f.col("model_version") == "0.3.2")

In [None]:
edits = edits.join(ores_scores,on=[edits.wiki_db == ores_scores.wiki, edits.revision_id == ores_scores.rev_id])

In [None]:
edits2 = edits2.withColumn("wikiweek",f.concat_ws(' ', edits.wiki_db, f.date_format(edits.week,'MM-dd-yyyy')))
by_wiki_week = edits.groupby('wikiweek')

In [None]:
by_wiki_week = edits2.groupby(['wikiweek'])


In [None]:
# take a sample stratified of N = 5000 by wiki and week

samp_design = by_wiki_week.count()
samp_design = samp_design.withColumn("fraction", f.when( 5000 < f.col("count"),5000/f.col("count")).otherwise(1))
samp_design = samp_design.withColumn("weight",1/samp_design.fraction)


In [None]:
samp_design

In [None]:
fractions = samp_design.select(['wikiweek','fraction']).collect()

In [None]:
fractions = {r.wikiweek:r.fraction for r in fractions}

In [None]:
fractions

In [None]:
sample = edits2.sampleBy("wikiweek",fractions=fractions)

In [30]:
?edits2.sampleBy

Object `edits2.sampleBy` not found.


In [None]:
pddf_sample = sample.toPandas()