In [1]:
import pandas as pd
import numpy as np
import logging
import time
from pathlib import Path

In [2]:
RES_PATH = "/dlabdata1/turkish_wiki"

In [3]:
DUMPS_PATH = '/dlabdata1/turkish_wiki'

In [4]:
DATA_PATH = '/dlabdata1/turkish_wiki'

In [5]:
column_names = pd.read_csv('column_names.csv')

## Combine the dumps of each year into one dataset

In [6]:
def combine_yearly_dumps(column_names, dtypes, lang ='tr',  path=DUMPS_PATH, ending='tsv.bz2', years= list(range(2002, 2022)) ):
    df_lang = pd.DataFrame()
    for year in years:
        start = time.time()
        try:
            df_lang = pd.concat([df_lang, pd.read_csv(f'{path}/{lang}-{year}.{ending}', sep='\t', names=list(column_names), dtype=dtypes, warn_bad_lines=True, error_bad_lines=False)])
            logging.warning(f'Loaded {lang}-{year} in {time.time() - start}')
        except:
            traceback.print_exc()
            logging.error(f'Error when processing {lang}-{year}')
    return df_lang

In [None]:
df_tr = combine_yearly_dumps(column_names, dtypes=str)

In [None]:
Path(f'{RES_PATH}').mkdir(parents=True, exist_ok=True)
df_tr.to_csv(f'{RES_PATH}/aggregated.tsv.gz', index=False, sep="\t", compression="gzip")

## Read the aggregated (raw) dumps

In [42]:
df = pd.read_csv(f'{RES_PATH}/aggregated.tsv.gz', sep="\t", dtype=str, error_bad_lines=False, warn_bad_lines=True, usecols= column_names.col_name[1:-1].values, compression = 'gzip')

KeyboardInterrupt: 

In [30]:
df.head()

Unnamed: 0,event_entity,event_type,event_timestamp,event_comment,event_user_id,event_user_text_historical,event_user_text,event_user_blocks_historical,event_user_blocks,event_user_groups_historical,...,revision_text_sha1,revision_content_model,revision_content_format,revision_is_deleted_by_page_deletion,revision_deleted_by_page_deletion_timestamp,revision_is_identity_reverted,revision_first_identity_reverting_revision_id,revision_seconds_to_identity_revert,revision_is_identity_revert,revision_is_from_before_page_creation
0,revision,create,2002-12-05 22:51:28.0,(moved from tr.wikipedia.com),,209.162.17.70,209.162.17.70,,,,...,8h2s3vbsvhk0xfyymbit06i0ef6j26s,,,False,,False,,,False,True
1,user,create,2002-12-05 22:54:39.0,,,,,,,,...,,,,,,,,,,
2,revision,create,2002-12-05 22:54:39.0,,1.0,Brion VIBBER,Brion VIBBER,,,,...,jevuozi5divb9m74s5x4gr3xrr12ch4,,,True,2016-10-07 18:22:17.0,False,,,False,False
3,revision,create,2002-12-05 23:39:38.0,"language links added - good luck, turkish wiki...",,80.128.44.46,80.128.44.46,,,,...,7s919m6k15itrhd1v3wwr8b1ci069on,,,False,,False,,,False,True
4,revision,create,2002-12-13 17:59:34.0,,,193.140.196.133,193.140.196.133,,,,...,8sqxjw60e25kh1siv8e19jozs6tj52s,,,False,,False,,,False,True


## Get column datatypes from scraped Wikipedia table.
https://wikitech.wikimedia.org/wiki/Analytics/Data_Lake/Edits/Mediawiki_history_dumps

In [6]:
def transform_data_type_to_dtype(col_name, data_type):
    if (col_name == 'event_entity') or (col_name == 'event_type'):
        return 'category'
    elif ('timestamp' in col_name):
        return 'datetime64[ns, UTC]'
    elif (data_type == 'string') or (data_type == 'array<string>'):
        return 'object'
    elif (data_type == 'bigint') or (data_type == 'int') :
        return 'Int64'
    elif (data_type == 'boolean'):
        return 'boolean'
    else:
        return 'object'

In [7]:
col_to_dtype = column_names[['col_name', 'data_type']].set_index('col_name').to_dict()['data_type']

In [8]:
col_to_dtype = {k: transform_data_type_to_dtype(k, v) for k, v in col_to_dtype.items()}

In [9]:
category_cols = list({k  for k, v in col_to_dtype.items() if v == 'category' })
timestamp_cols = list({k  for k, v in col_to_dtype.items() if v == 'datetime64[ns, UTC]' })
numerical_cols = list({k  for k, v in col_to_dtype.items() if v == 'Int64' })
boolean_cols = list({k  for k, v in col_to_dtype.items() if v == 'boolean' })

## Preprocess the dataset to reduce size in memory

#### Convert low cardinality categorical columns

In [None]:
df[category_cols] = df[category_cols].astype("category")

#### Convert Timestamps

In [None]:
df[timestamp_cols] = df[timestamp_cols].apply(pd.to_datetime, utc =True, errors='coerce')

#### Convert numerical columns

In [None]:
df[numerical_cols] = df[numerical_cols].apply(pd.to_numeric, errors='coerce').convert_dtypes()

#### Convert booleans

In [62]:
df[boolean_cols] = df[boolean_cols].replace({'true':True,'false':False})
df[boolean_cols] = df[boolean_cols].where(df[boolean_cols].applymap(type) == bool)
df[boolean_cols] = df[boolean_cols].convert_dtypes()

In [63]:
df.to_csv(f'{RES_PATH}/cleaned_trwiki_1.tsv.gz', index=False, sep="\t", compression="gzip")

## Separate DataFrame for user, revision and page

In [85]:
user_df = df[df['event_entity'] == 'user'][['event_entity', 'event_type', 'event_timestamp', 'event_comment',
       'event_user_id', 'event_user_text_historical', 'event_user_text',
       'event_user_blocks_historical', 'event_user_blocks',
       'event_user_groups_historical', 'event_user_groups',
       'event_user_is_bot_by_historical', 'event_user_is_bot_by',
       'event_user_is_created_by_self', 'event_user_is_created_by_system',
       'event_user_is_created_by_peer', 'event_user_is_anonymous',
       'event_user_registration_timestamp', 'event_user_creation_timestamp',
       'event_user_first_edit_timestamp', 'event_user_revision_count',
       'event_user_seconds_since_previous_revision',
       'user_id', 'user_text_historical', 'user_text',
       'user_blocks_historical', 'user_blocks', 'user_groups_historical',
       'user_groups', 'user_is_bot_by_historical', 'user_is_bot_by',
       'user_is_created_by_self', 'user_is_created_by_system',
       'user_is_created_by_peer', 'user_is_anonymous',
       'user_registration_timestamp', 'user_creation_timestamp',
       'user_first_edit_timestamp']]

In [None]:
user_df.to_csv(f'{RES_PATH}/user_events.tsv.gz', index=False, sep="\t", compression="gzip")

In [None]:
page_df = df[df['event_entity'] == 'page'][['event_entity', 'event_type', 'event_timestamp', 'event_comment',
       'event_user_id', 'event_user_text_historical', 'event_user_text',
       'event_user_blocks_historical', 'event_user_blocks',
       'event_user_groups_historical', 'event_user_groups',
       'event_user_is_bot_by_historical', 'event_user_is_bot_by',
       'event_user_is_created_by_self', 'event_user_is_created_by_system',
       'event_user_is_created_by_peer', 'event_user_is_anonymous',
       'event_user_registration_timestamp', 'event_user_creation_timestamp',
       'event_user_first_edit_timestamp', 'event_user_revision_count',
       'event_user_seconds_since_previous_revision', 'page_id',
       'page_title_historical', 'page_title', 'page_namespace_historical',
       'page_namespace_is_content_historical', 'page_namespace',
       'page_namespace_is_content', 'page_is_redirect', 'page_is_deleted',
       'page_creation_timestamp', 'page_first_edit_timestamp',
       'page_revision_count', 'page_seconds_since_previous_revision']]

In [None]:
page_df.to_csv(f'{RES_PATH}/page_events.tsv.gz', index=False, sep="\t", compression="gzip")

In [None]:
revision_df = df[df['event_entity'] == 'revision'][['event_entity', 'event_type', 'event_timestamp', 'event_comment',
       'event_user_id', 'event_user_text_historical', 'event_user_text',
       'event_user_blocks_historical', 'event_user_blocks',
       'event_user_groups_historical', 'event_user_groups',
       'event_user_is_bot_by_historical', 'event_user_is_bot_by',
       'event_user_is_created_by_self', 'event_user_is_created_by_system',
       'event_user_is_created_by_peer', 'event_user_is_anonymous',
       'event_user_registration_timestamp', 'event_user_creation_timestamp',
       'event_user_first_edit_timestamp', 'event_user_revision_count',
       'event_user_seconds_since_previous_revision', 'page_id',
       'page_title_historical', 'page_title', 'page_namespace_historical',
       'page_namespace_is_content_historical', 'page_namespace',
       'page_namespace_is_content', 'page_is_redirect', 'page_is_deleted',
       'page_creation_timestamp', 'page_first_edit_timestamp',
       'page_revision_count', 'page_seconds_since_previous_revision',
       'revision_id', 'revision_parent_id',
       'revision_minor_edit', 'revision_deleted_parts',
       'revision_deleted_parts_are_suppressed', 'revision_text_bytes',
       'revision_text_bytes_diff', 'revision_text_sha1',
       'revision_content_model', 'revision_content_format',
       'revision_is_deleted_by_page_deletion',
       'revision_deleted_by_page_deletion_timestamp',
       'revision_is_identity_reverted',
       'revision_first_identity_reverting_revision_id',
       'revision_seconds_to_identity_revert', 'revision_is_identity_revert',
       'revision_is_from_before_page_creation']]

In [None]:
revision_df.to_csv(f'{RES_PATH}/revision_events.tsv.gz', index=False, sep="\t", compression="gzip")

## Get number of newcomers

In [11]:
user_df = pd.read_csv(f'{RES_PATH}/user_events.tsv.gz', sep="\t", error_bad_lines=False, warn_bad_lines=True, compression = 'gzip')

  interactivity=interactivity, compiler=compiler, result=result)


### Get all user creation events

In [13]:
try:
    
    user_df = user_df.convert_dtypes()

    user_timestamp_columns = [col for col in user_df.columns if 'timestamp' in col]
    user_df[user_timestamp_columns] = user_df[user_timestamp_columns].apply(pd.to_datetime, utc =True, errors='coerce')
    user_df["date"] = user_df.event_timestamp.dt.strftime("%Y-%m-%d")

    create_event_mask = (user_df.event_entity == 'user') & (user_df.event_type == 'create')
    no_bot_mask = (user_df['event_user_is_bot_by'].isna() | user_df['event_user_is_bot_by_historical'].isna())
    self_creation_mask = (user_df['event_user_is_created_by_self'] == True)
    no_anon_mask = (user_df['event_user_is_anonymous'] != True)

    # === get users by registration
    all_registrations = user_df[create_event_mask & no_anon_mask & no_bot_mask & self_creation_mask][['event_timestamp', 'event_user_id']]

    all_registrations.to_csv(f'{DATA_PATH}/processed_data/all_registrations.csv', index =False)
    
except Exception as e:
    logging.error(f'Error: {str(e)}')  


In [76]:
try:
    
    user_df = user_df.convert_dtypes()

    user_timestamp_columns = [col for col in user_df.columns if 'timestamp' in col]
    user_df[user_timestamp_columns] = user_df[user_timestamp_columns].apply(pd.to_datetime, utc =True, errors='coerce')
    user_df["date"] = user_df.event_timestamp.dt.strftime("%Y-%m-%d")

    create_event_mask = (user_df.event_entity == 'user') & (user_df.event_type == 'create')
    no_bot_mask = (user_df['event_user_is_bot_by'].isna() | user_df['event_user_is_bot_by_historical'].isna())
    self_creation_mask = (user_df['event_user_is_created_by_self'] == True)
    no_anon_mask = (user_df['event_user_is_anonymous'] != True)

    # === get users by registration
    group_creation = user_df[create_event_mask & no_anon_mask & no_bot_mask & self_creation_mask].groupby(['date'])['event_user_id'].size()

    group_creation = group_creation.reset_index()
    group_creation.columns = ['date', 'number_of_newcomers']
    group_creation.to_csv(f'{DATA_PATH}/processed_data/newcomers.csv', index =False)
    
except Exception as e:
    logging.error(f'Error: {str(e)}')  


## Get number of edits

In [14]:
revision_df = pd.read_csv(f'{RES_PATH}/revision_events.tsv.gz', sep="\t", usecols= ['event_type', 'page_namespace',  'event_entity', 'event_type', 'event_timestamp', 
       'event_user_id', 'event_user_text_historical', 'page_id', 'revision_minor_edit',
       'revision_is_identity_revert', 'revision_is_identity_reverted'], error_bad_lines=False, warn_bad_lines=True, compression = 'gzip')

  interactivity=interactivity, compiler=compiler, result=result)


In [83]:
try:
    
    revision_df = revision_df.convert_dtypes()

    create_revision_mask = (revision_df.event_entity=='revision') & (revision_df.event_type == 'create')
    ns_mask = revision_df.page_namespace == 0
    
    revision_df = revision_df[create_revision_mask & ns_mask]

    revision_df['revision_text_bytes'] = pd.to_numeric(revision_df['revision_text_bytes'], errors='coerce').fillna(0)
    revision_df['event_timestamp'] = pd.to_datetime(revision_df['event_timestamp'],  utc = True, errors = 'coerce')
    
    revision_df["date"] = revision_df.event_timestamp.dt.strftime("%Y-%m-%d")
    
    revision_df['user_kind'] = revision_df.apply(lambda row: 'anonymous' if pd.isna(row.event_user_id) else 'bot' if not pd.isna(row.event_user_is_bot_by) else 'account', axis=1)

    # group by date, page_id, user_kind

    dict_edits_byid = revision_df.groupby(['date', 'page_id', 'user_kind']).agg(
        {'event_user_id': 'size', 'revision_text_bytes': 'sum'})

except Exception as e:
    logging.error(f'Error: {str(e)}')  

In [84]:
dict_edits_byid.to_csv(f'{DATA_PATH}/processed_data/edits.csv')

In [17]:
try:
    
    revision_df = revision_df.convert_dtypes()

    create_revision_mask = (revision_df.event_entity=='revision') & (revision_df.event_type == 'create')
    ns_mask = revision_df.page_namespace == 0
    
    account_mask = (~revision_df.event_user_id.isna()) & (revision_df.event_user_is_bot_by.isna())
    
    revision_df = revision_df[create_revision_mask & ns_mask & account_mask]

    revision_df['revision_text_bytes'] = pd.to_numeric(revision_df['revision_text_bytes'], errors='coerce').fillna(0)
    revision_df['event_timestamp'] = pd.to_datetime(revision_df['event_timestamp'],  utc = True, errors = 'coerce')
    
    revision_df = revision_df[['event_type', 'event_timestamp', 
       'event_user_id', 'event_user_groups', 'event_user_revision_count',
       'event_user_seconds_since_previous_revision', 'page_id',
       'page_title', 'page_revision_count', 'revision_minor_edit',
       'revision_text_bytes', 'revision_text_bytes_diff','revision_is_identity_revert']]
    
    revision_df.to_csv(f'{DATA_PATH}/processed_data/account_edits.csv')

except Exception as e:
    logging.error(f'Error: {str(e)}')  

In [15]:
try:
    
    revision_df = revision_df.convert_dtypes()

    create_revision_mask = (revision_df.event_entity=='revision') & (revision_df.event_type == 'create')
    ns_mask = revision_df.page_namespace == 0
    
    
    revision_df = revision_df[create_revision_mask & ns_mask]

    revision_df['event_timestamp'] = pd.to_datetime(revision_df['event_timestamp'],  utc = True, errors = 'coerce')
    
    revision_df = revision_df[['event_type', 'event_timestamp', 
       'event_user_id', 'event_user_text_historical', 'page_id', 'revision_minor_edit',
       'revision_is_identity_revert', 'revision_is_identity_reverted']]
    
    revision_df.to_csv(f'{DATA_PATH}/processed_data/all_edits.csv')

except Exception as e:
    logging.error(f'Error: {str(e)}')  

In [16]:
revision_df

Unnamed: 0,event_type,event_timestamp,event_user_id,event_user_text_historical,page_id,revision_minor_edit,revision_is_identity_revert,revision_is_identity_reverted
0,create,2002-12-05 22:51:28+00:00,,209.162.17.70,2740662,True,False,False
1,create,2002-12-05 22:54:39+00:00,1,Brion VIBBER,5,False,False,False
2,create,2002-12-05 23:39:38+00:00,,80.128.44.46,2740662,False,False,False
3,create,2002-12-13 17:59:34+00:00,,193.140.196.133,2740662,False,False,False
4,create,2002-12-13 18:01:20+00:00,,193.140.196.133,2740662,False,False,False
...,...,...,...,...,...,...,...,...
24670417,create,2021-02-01 15:55:06+00:00,1175541,Abdullah Göçük,3075,False,False,False
24670418,create,2021-02-01 15:55:14+00:00,466504,HastaLaVi2,2908622,False,False,False
24670419,create,2021-02-01 15:56:02+00:00,466504,HastaLaVi2,2861149,False,False,False
24670420,create,2021-02-01 15:56:08+00:00,1221913,Foldereid,747973,True,False,False


revision_dfts

In [100]:
try:
    
#   get reverts per day as well as reverted
    df_reverted = revision_df[revision_df['revision_is_identity_reverted'] == True].groupby(['date', 'user_kind'])['revision_is_identity_reverted'].size()
    df_reverts = revision_df[revision_df['revision_is_identity_revert'] == True].groupby(['date', 'user_kind'])['revision_is_identity_revert'].size()

    # reindex so all dates are filled
    df_reverted = df_reverted.reindex(
        pd.MultiIndex.from_product([revision_df.date.unique(), df_reverted.index.levels[1]], names=['date', 'user_kind']), fill_value=0)
    df_reverts = df_reverts.reindex(
        pd.MultiIndex.from_product([revision_df.date.unique(), df_reverts.index.levels[1]], names=['date', 'user_kind']), fill_value=0)


except Exception as e:
    logging.error(f'Error: {str(e)}')  

In [101]:
df_reverted.to_csv(f'{DATA_PATH}/processed_data/df_reverted.csv')
df_reverts.to_csv(f'{DATA_PATH}/processed_data/df_reverts.csv')

In [102]:
try:
    
    # get reverts per day as well as reverted
    df_reverted_pid = revision_df[revision_df['revision_is_identity_reverted'] == True].groupby(['date','page_id', 'user_kind'])['revision_is_identity_reverted'].size()
    df_reverts_pid = revision_df[revision_df['revision_is_identity_revert'] == True].groupby(['date', 'page_id','user_kind'])['revision_is_identity_revert'].size()

    # reindex so all dates are filled
    df_reverted_pid = df_reverted_pid.reindex(
        pd.MultiIndex.from_product([revision_df.date.unique(), df_reverted_pid.index.levels[1]], names=['date', 'page_id', 'user_kind']), fill_value=0)
    df_reverts_pid = df_reverts_pid.reindex(
        pd.MultiIndex.from_product([revision_df.date.unique(), df_reverts_pid.index.levels[1]], names=['date','page_id', 'user_kind']), fill_value=0)


except Exception as e:
    logging.error(f'Error: {str(e)}')  

ERROR:root:Error: Length of names must match number of levels in MultiIndex.


In [103]:
df_reverted_pid.to_csv(f'{DATA_PATH}/processed_data/df_reverted_by_pageid.csv')
df_reverts_pid.to_csv(f'{DATA_PATH}/processed_data/df_reverts_by_pageid.csv')