In [1]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType
from pyspark.sql import SQLContext
from pyspark.sql import HiveContext
from pyspark import SparkContext

sc = SparkContext()
sqlContext = SQLContext(sc)

hc = HiveContext(sc)

In [2]:
"""
Load all articles
"""
schema_articles = StructType([
    StructField("id", IntegerType(), False), 
    StructField("title", StringType(), False)
])

data_articles = sqlContext.read.format("csv") \
    .schema(schema_articles) \
    .option("header", True) \
    .option("mode","DROPMALFORMED") \
    .load("data/web_data_management_public_article.csv")
    
data_articles.show()
print((data_articles.count(), len(data_articles.columns)))

data_articles.write.format("orc").mode('overwrite').saveAsTable("articles")

+------+--------------------+
|    id|               title|
+------+--------------------+
|   722|         Talk:Animal|
|  9972|           Talk:Evil|
| 19984|          Talk:Mr._T|
| 24726|    Talk:Pan-Slavism|
| 34510|         Talk:Zombie|
| 39384|       Talk:Distance|
| 47799|           Talk:Dice|
| 66742|         Talk:Taipei|
| 69917|Talk:Infant_educa...|
| 74559|  Talk:Stefano_Benni|
| 94060|          Talk:Somme|
| 94708|     Talk:Tulse_Hill|
| 99251| Talk:Dutch_language|
|101572|   Talk:Neoplatonism|
|102941|          Talk:Truth|
|140331|          Talk:Force|
|143566| Talk:Around_the_Fur|
|148085|  User_talk:Barootch|
|154420|          Talk:Coati|
|186660|     User_talk:Erich|
+------+--------------------+
only showing top 20 rows

(86511, 2)


In [3]:
"""
Load all user
"""
data_users = sqlContext.read.format("csv") \
    .option("header", True) \
    .option("mode","DROPMALFORMED") \
    .load("data/web_data_management_public_user.csv")

data_users = data_users.drop('edit_count', 'minor_edit_count')

data_users.show()
data_users.printSchema()
print((data_users.count(), len(data_users.columns)))

hc = HiveContext(sc)
data_users.write.format("orc").mode('overwrite').saveAsTable("users")

+-------+---------+
|     id| username|
+-------+---------+
|1011434|   AlainD|
| 103836|  MrSmart|
|    108|        0|
|1119645|  Vjochim|
| 118486|Downchuck|
|1227207|   Lekogm|
|1279957|   Agne27|
|  13016|     Punk|
|1305485|  RFabian|
|1315408|    Ab8uu|
| 137488|   Delboy|
| 142494| Tommycw1|
| 142971| Chuximus|
| 146359| Reflecks|
| 146802|   Cvalda|
| 174960| Nataliya|
| 178752|    Fooby|
|  18364|Gohmifune|
| 188357| JohnJohn|
|1910627|   Hilldu|
+-------+---------+
only showing top 20 rows

root
 |-- id: string (nullable = true)
 |-- username: string (nullable = true)

(251023, 2)


In [4]:
"""
Load all elections
"""
schema_elections = StructType([
    StructField("id", IntegerType(), False), 
    StructField("user_id", IntegerType(), False),
    StructField("promoted", BooleanType(), False)
])

data_elections = sqlContext.read.format("csv") \
    .option("header", True) \
    .option("mode","DROPMALFORMED") \
    .load("data/web_data_management_public_elections.csv")

data_elections.show()
data_elections.printSchema()
print((data_elections.count(), len(data_elections.columns)))

hc = HiveContext(sc)
data_elections.write.format("orc").mode('overwrite').saveAsTable("elections")

+---+-------+--------+
| id|user_id|promoted|
+---+-------+--------+
|  1|     30|   false|
|  2|     54|    true|
|  3|     61|    true|
|  4|      6|    true|
|  5|     38|    true|
|  6|      8|   false|
|  7|     33|    true|
|  8|     93|   false|
|  9|     10|   false|
| 10|     95|   false|
| 11|    130|    true|
| 12|    144|   false|
| 13|    147|    true|
| 14|    151|   false|
| 15|     50|    true|
| 16|    153|   false|
| 17|     89|    true|
| 18|    163|    true|
| 19|    167|   false|
| 20|    168|    true|
+---+-------+--------+
only showing top 20 rows

root
 |-- id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- promoted: string (nullable = true)

(2794, 3)


In [5]:
"""
Load all votes
"""
schema_votes = StructType([
    StructField("id", IntegerType(), False), 
    StructField("election_id", IntegerType(), False),
    StructField("vote", IntegerType(), False),
    StructField("user_id", IntegerType(), False),
    StructField("vote_time", StringType(), False),
    StructField("screen_name", StringType(), False)
])

data_votes = sqlContext.read.format("csv") \
    .schema(schema_votes) \
    .option("header", True) \
    .option("mode","DROPMALFORMED") \
    .load("data/web_data_management_public_votes.csv")

data_votes.show()
data_votes.printSchema()
print((data_votes.count(), len(data_votes.columns)))

hc = HiveContext(sc)
data_votes.write.format("orc").mode('overwrite').saveAsTable("votes")

+---+-----------+----+-------+--------------------+------------+
| id|election_id|vote|user_id|           vote_time| screen_name|
+---+-----------+----+-------+--------------------+------------+
|  1|          1|   1|      3|2004-09-14 16:26:...|    ludraman|
|  2|          1|  -1|     25|2004-09-14 16:53:...|   blankfaze|
|  3|          1|   1|      4|2004-09-14 17:08:...|gzornenplatz|
|  4|          1|   1|      5|2004-09-14 17:37:...|  orthogonal|
|  5|          1|   1|      6|2004-09-14 19:28:...|    andrevan|
|  6|          1|   1|      7|2004-09-14 19:37:...|     texture|
|  7|          1|   1|      8|2004-09-14 21:04:...|       lst27|
|  8|          1|   1|      9|2004-09-14 21:30:...|        mirv|
|  9|          1|   1|     10|2004-09-14 22:13:...|    an\\rion|
| 10|          1|   0|     26|2004-09-14 22:18:...|       grunt|
| 11|          1|   0|     27|2004-09-15 03:19:...|    slowking|
| 12|          1|   0|     28|2004-09-15 03:20:...|  neutrality|
| 13|          1|   1|   

In [6]:
"""
Load all user_talk
"""
schema_user_talk = StructType([
    StructField("article_id", IntegerType(), False), 
    StructField("user_id", StringType(), False),
    StructField("comment", StringType(), False),
    StructField("timestamp", StringType(), False),
    StructField("word_count", IntegerType(), False),
    StructField("related_pages", StringType(), False),
    StructField("category", StringType(), True),
    StructField("id", IntegerType(), True),
    StructField("minor", IntegerType(), True)
])

data_user_talk = sqlContext.read.format("csv") \
    .schema(schema_user_talk) \
    .option("header", True) \
    .option("mode","DROPMALFORMED") \
    .load("data/web_data_management_public_revision_user_talk.csv")
    
data_user_talk.show()
data_user_talk.printSchema()
print((data_user_talk.count(), len(data_user_talk.columns)))

hc = HiveContext(sc)
data_user_talk.write.format("orc").mode('overwrite').saveAsTable("user_talk")

+----------+----------------+-------+--------------------+----------+----------------+--------+-----+-----+
|article_id|         user_id|comment|           timestamp|word_count|   related_pages|category|   id|minor|
+----------+----------------+-------+--------------------+----------+----------------+--------+-----+-----+
|     36501|              39|   null|2002-06-02 16:13:...|         1|            null|    null|  213|    0|
|     36539| ip:203.28.13.47|   null|2004-04-21 01:00:...|         1|            null|    null| 1608|    0|
|     37103|             188|   null|2002-08-21 05:51:...|         1|            null|    null| 8028|    0|
|     37103|             188|   null|2002-09-23 21:19:...|         1|            null|    null| 8066|    0|
|     37103|             188|   null|2003-04-14 21:58:...|        16|            null|    null| 8515|    0|
|     37103|             188|   null|2003-11-25 21:03:...|        37|         Ed_Poor|    null| 8884|    1|
|     37103|             188

In [7]:
"""
Load all talk
"""
schema_talk = StructType([
    StructField("id", IntegerType(), False), 
    StructField("article_id", IntegerType(), False), 
    StructField("user_id", StringType(), False),
    StructField("comment", StringType(), False),
    StructField("timestamp", StringType(), False),
    StructField("word_count", IntegerType(), False),
    StructField("related_pages", StringType(), False),
    StructField("category", StringType(), True),
    StructField("minor", IntegerType(), True)
])

data_user_talk = sqlContext.read.format("csv") \
    .schema(schema_talk) \
    .option("header", True) \
    .option("mode","DROPMALFORMED") \
    .load("data/web_data_management_public_revision_talk.csv")
    
data_user_talk.show()
data_user_talk.printSchema()
print((data_user_talk.count(), len(data_user_talk.columns)))

hc = HiveContext(sc)
data_user_talk.write.format("orc").mode('overwrite').saveAsTable("talk")

+-----+----------+------------------+-------+--------------------+----------+--------------------+--------+-----+
|   id|article_id|           user_id|comment|           timestamp|word_count|       related_pages|category|minor|
+-----+----------+------------------+-------+--------------------+----------+--------------------+--------+-----+
| 2825|      1128|               100|   null|2001-11-06 09:16:...|        84|                null|    null|    0|
| 3061|      1172|               153|   null|2001-07-22 21:55:...|        33|                null|    null|    1|
| 3436|      1263|ip:209.168.192.225|   null|2005-05-23 16:00:...|      2252|                null|    null|    0|
| 4455|      1263|           2663665|   null|2007-05-17 21:54:...|         4|                null|    null|    1|
| 5345|      1390|  ip:216.185.69.65|   null|2005-04-13 15:55:...|         7|                null|    null|    0|
| 6174|      1454|  ip:195.194.166.1|   null|2006-10-12 13:17:...|         1|           

In [8]:
sqlContext.sql('show databases').show()

+------------+
|databaseName|
+------------+
|     default|
+------------+



In [9]:
sqlContext.sql('show tables').show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default| articles|      false|
| default|elections|      false|
| default|     talk|      false|
| default|user_talk|      false|
| default|    users|      false|
| default|    votes|      false|
+--------+---------+-----------+



In [259]:
"""
Can you become an admin by making only minor edits?
"""

query_total = sqlContext.sql("SELECT t.user_id, u.username, SUM(if(minor = 1, 1, 0)) as `minor_count`, SUM(if(minor = 0, 1, 0)) as `major_count`, COUNT(*) as `total`, SUM(if(minor = 0, 1, 0)) / SUM(if(minor = 1, 1, 0)) as `minor_major_ratio` " +
                             "FROM users as u " +
                             "INNER JOIN talk as t ON t.user_id = u.id " +
                             "WHERE user_id IN (SELECT users.id FROM users INNER JOIN elections ON users.id = elections.user_id WHERE promoted = TRUE) " + 
                             "GROUP BY t.user_id, u.username " +
                             "HAVING SUM(if(minor = 1, 1, 0)) > 0 " + 
                             "ORDER BY minor_major_ratio")
query_total.show(100)

query_admin = sqlContext.sql("SELECT AVG(minor_major_ratio) as `admin_avg` " + 
                       "FROM (" +
                           "SELECT t.user_id, u.username, SUM(if(minor = 1, 1, 0)) as `minor_count`, SUM(if(minor = 0, 1, 0)) as `major_count`, COUNT(*) as `total`, SUM(if(minor = 0, 1, 0)) / SUM(if(minor = 1, 1, 0)) as `minor_major_ratio` " +
                           "FROM users as u " +
                           "INNER JOIN talk as t ON t.user_id = u.id " +
                           "WHERE user_id IN (SELECT users.id FROM users INNER JOIN elections ON users.id = elections.user_id WHERE promoted = TRUE) " + 
                           "GROUP BY t.user_id, u.username " +
                           "HAVING SUM(if(minor = 1, 1, 0)) > 0 " +
                       ")")
query_admin.show()

query_normal = sqlContext.sql("SELECT AVG(minor_major_ratio) as `normal_avg` " + 
                       "FROM (" +
                           "SELECT t.user_id, u.username, SUM(if(minor = 1, 1, 0)) as `minor_count`, SUM(if(minor = 0, 1, 0)) as `major_count`, COUNT(*) as `total`, SUM(if(minor = 0, 1, 0)) / SUM(if(minor = 1, 1, 0)) as `minor_major_ratio` " +
                           "FROM users as u " +
                           "INNER JOIN talk as t ON t.user_id = u.id " +
                           "WHERE user_id IN (SELECT users.id FROM users INNER JOIN elections ON users.id = elections.user_id WHERE promoted = FALSE) " + 
                           "GROUP BY t.user_id, u.username " +
                           "HAVING SUM(if(minor = 1, 1, 0)) > 0 " +
                       ")")
query_normal.show()



+-------+----------------+-----------+-----------+-----+--------------------+
|user_id|        username|minor_count|major_count|total|   minor_major_ratio|
+-------+----------------+-----------+-----------+-----+--------------------+
|   1190|        Dlloader|          3|          0|    3|                 0.0|
|   8237|           Anobo|          4|          0|    4|                 0.0|
|    794|        Joeygoey|          1|          0|    1|                 0.0|
|   2597|        Nonenmac|          1|          0|    1|                 0.0|
|    813|           Tommy|          1|          0|    1|                 0.0|
|   7694|          TomK32|          1|          0|    1|                 0.0|
|   7414|        Blimpguy|         24|          1|   25|0.041666666666666664|
|    135|  Malcolm_Farmer|         71|          3|   74| 0.04225352112676056|
|   5753|       Vera_Cruz|        300|         16|  316| 0.05333333333333334|
|   4191|        Nevilley|        304|         17|  321| 0.05592

+----------------+
|      normal_avg|
+----------------+
|6.52105982736047|
+----------------+



In [260]:
"""
Does an increasing amount of interactions (through user talk pages) with an admin lead to an 
increased chance to become an admin?
"""

query_detailed = sqlContext.sql("SELECT users.*, total_talk, ROUND(u.admin_normal_avg, 2) as admin_normal_avg, ROUND(u.admin_normal_std, 2) as admin_normal_std, ROUND(u.admin_normal_min, 2) as admin_normal_min, ROUND(u.admin_normal_max, 2) as admin_normal_max, elections.promoted " + 
                       "FROM users " + 
                       "INNER JOIN ("
                           "SELECT user_id, AVG(admin_normal_ratio) as admin_normal_avg, MIN(admin_normal_ratio) as admin_normal_min, MAX(admin_normal_ratio) as admin_normal_max, STD(admin_normal_ratio) as admin_normal_std, COUNT(*) as total_talk " +
                           "FROM (" + 
                               "SELECT user_talk.article_id, user_talk.user_id, conv_name, admin_talk, normal_talk, admin_talk/normal_talk as admin_normal_ratio " +
                               "FROM user_talk " + 
                               "INNER JOIN (" +
                                   "SELECT " +
                                       "article_id, " +
                                       "REPLACE(articles.title, 'User_talk:', '') as conv_name, " + 
                                       "SUM(IF(elections.promoted = TRUE, 1, 0)) as admin_talk, " +
                                       "SUM(IF(elections.promoted = TRUE, 0, 1)) as normal_talk " + 
                                   "FROM user_talk " +
                                   "INNER JOIN elections ON user_talk.user_id = elections.user_id " +
                                   "INNER JOIN articles ON user_talk.article_id = articles.id " +
                                   "GROUP BY article_id, articles.title" +
                               ") t1 ON user_talk.article_id = t1.article_id " +      
                           ") " +
                           "GROUP BY user_id " +
                       ") u ON users.id = u.user_id " +
                       "INNER JOIN elections ON u.user_id = elections.user_id " +
                       "ORDER BY admin_normal_avg DESC"
                      )
# query_detailed.show(20, False)

query = sqlContext.sql("SELECT promoted, ROUND(AVG(admin_normal_avg), 2) as admin_normal_avg, ROUND(STD(admin_normal_avg), 2) as admin_normal_std, ROUND(MIN(admin_normal_avg), 2) as admin_normal_min, ROUND(MAX(admin_normal_avg), 2) as admin_normal_max " + 
                       "FROM users " + 
                       "INNER JOIN ("
                           "SELECT user_id, AVG(admin_normal_ratio) as admin_normal_avg, MIN(admin_normal_ratio) as admin_normal_min, MAX(admin_normal_ratio) as admin_normal_max, STD(admin_normal_ratio) as admin_normal_std, COUNT(*) as total_talk " +
                           "FROM (" + 
                               "SELECT user_talk.article_id, user_talk.user_id, conv_name, admin_talk, normal_talk, admin_talk/normal_talk as admin_normal_ratio " +
                               "FROM user_talk " + 
                               "INNER JOIN (" +
                                   "SELECT " +
                                       "article_id, " +
                                       "REPLACE(articles.title, 'User_talk:', '') as conv_name, " + 
                                       "SUM(IF(elections.promoted = TRUE, 1, 0)) as admin_talk, " +
                                       "SUM(IF(elections.promoted = TRUE, 0, 1)) as normal_talk " + 
                                   "FROM user_talk " +
                                   "INNER JOIN elections ON user_talk.user_id = elections.user_id " +
                                   "INNER JOIN articles ON user_talk.article_id = articles.id " +
                                   "GROUP BY article_id, articles.title" +
                               ") t1 ON user_talk.article_id = t1.article_id " +      
                           ") " +
                           "GROUP BY user_id " +
                       ") u ON users.id = u.user_id " +
                       "INNER JOIN elections ON u.user_id = elections.user_id " +
                       "GROUP BY promoted" 
                      )
query.show(20, False)


+--------+----------------+----------------+----------------+----------------+
|promoted|admin_normal_avg|admin_normal_std|admin_normal_min|admin_normal_max|
+--------+----------------+----------------+----------------+----------------+
|false   |0.44            |0.59            |0.0             |5.78            |
|true    |3.32            |6.62            |0.07            |50.87           |
+--------+----------------+----------------+----------------+----------------+



In [215]:
"""
Does interacting with a certain admin lead to them voting in favour of your request for adminship?
"""

query_old = sqlContext.sql("SELECT s.user_id as user_id, conv_user_id, conv_count, vote " +
                       "FROM votes " + 
                       "INNER JOIN (" +
                           "SELECT u.user_id AS user_id, users.id AS conv_user_id, MAX(conversation_count) AS conv_count " +
                           "FROM users " +
                           "INNER JOIN votes ON users.id = votes.user_id " +
                           "INNER JOIN ( " +
                               "SELECT user_id, REPLACE(title, 'User_talk:', '') AS conv_username, COUNT(title) AS conversation_count " +
                               "FROM user_talk " + 
                               "INNER JOIN articles ON user_talk.article_id = articles.id " + 
                               "GROUP BY user_id, title " + 
                               "ORDER BY user_id ASC, conversation_count DESC " + 
                           ") u ON u.conv_username = votes.screen_name " + 
                           "GROUP BY u.user_id, conv_user_id " + 
                           "ORDER BY user_id ASC " +
                       ") s ON s.conv_user_id = votes.user_id " +
                       "INNER JOIN elections ON elections.id = votes.election_id " +
                       "WHERE elections.user_id = s.user_id AND votes.user_id = s.conv_user_id " +
                       "ORDER BY user_id ASC")

query = sqlContext.sql("SELECT 'votes (admin interaction)', SUM(IF(vote = 1, 1, 0)) as favor_count, SUM(IF(vote = -1, 1, 0)) AS no_favor_count " + 
                       "FROM votes " +
                       "INNER JOIN talk ON votes.user_id = talk.user_id " +
                       "INNER JOIN (" + 
                           "SELECT "
                               "DISTINCT (article_id, elections.id), " + 
                               "article_id, " + 
                               "elections.id AS election " +
                           "FROM talk " + 
                           "INNER JOIN elections ON elections.user_id = talk.user_id " +
                           "WHERE elections.promoted = TRUE " +
                       ") t ON t.article_id = talk.article_id AND votes.election_id = election " + 
                       "UNION " + 
                       "SELECT 'votes (all)', SUM(IF(vote = 1, 1, 0)) as favor_count, SUM(IF(vote = -1, 1, 0)) AS no_favor_count " + 
                       "FROM votes"
                      )

query.show(20, False)



+-------------------------+-----------+--------------+
|votes (admin interaction)|favor_count|no_favor_count|
+-------------------------+-----------+--------------+
|votes (admin interaction)|2177       |166           |
|votes (all)              |83962      |23118         |
+-------------------------+-----------+--------------+



In [229]:
"""
Does your chance of becoming an admin increase if you’re making edits across multiple categories?
"""

query_detailed = sqlContext.sql("SELECT elections.id, elections.user_id, t.count_categories, t.count_edits, elections.promoted, CAST(t.count_edits AS FLOAT) / CAST((t.count_categories + 1) AS FLOAT) AS ratio " +
                   "FROM elections " +
                   "INNER JOIN (" +
                       "SELECT s.user_id, LENGTH(categories) - LENGTH(REPLACE(categories, ' ', '')) AS count_categories, edits AS count_edits " + 
                       "FROM (" + 
                           "SELECT user_id, concat_ws(' ', COLLECT_LIST(DISTINCT category)) AS categories, COUNT(id) AS edits " +
                           "FROM talk " + 
                           "GROUP BY user_id " +
                       ") s " +
                   ") t ON elections.user_id = t.user_id " +
                   "ORDER BY count_edits DESC, ratio ASC")

query = sqlContext.sql("SELECT promoted, AVG(count_categories) as categories_avg, AVG(count_edits) as edits_avg, AVG(t.count_edits/t.count_categories) as ratio_avg " +
                       "FROM elections " +
                       "INNER JOIN (" +
                           "SELECT s.user_id, LENGTH(categories) - LENGTH(REPLACE(categories, ' ', '')) AS count_categories, edits AS count_edits " + 
                           "FROM (" + 
                               "SELECT user_id, concat_ws(' ', COLLECT_LIST(DISTINCT category)) AS categories, COUNT(id) AS edits " +
                               "FROM talk " + 
                               "GROUP BY user_id " +
                           ") s " +
                       ") t ON elections.user_id = t.user_id " +
                       "GROUP BY promoted "
                       )
query.show()
                   

+--------+------------------+------------------+-----------------+
|promoted|    categories_avg|         edits_avg|        ratio_avg|
+--------+------------------+------------------+-----------------+
|   false| 1.517391304347826|100.00434782608696| 95.6013297486595|
|    true|1.4186046511627908| 107.4186046511628|62.37871701312104|
+--------+------------------+------------------+-----------------+



In [262]:
"""
If you edit more actively maintained pages, does your chance to become an admin increase?
"""

query = sqlContext.sql("SELECT SUM(if(promoted = TRUE, 1, 0)) as `promoted`, SUM(if(promoted = FALSE, 1, 0)) as `not_promoted`, ROUND(SUM(if(promoted = TRUE, 1, 0)) / COUNT(*) ,2) as `promoted_prob` " +
                       "FROM elections " + 
                       "WHERE user_id IN (" +
                           "SELECT DISTINCT user_id " + 
                           "FROM talk " + 
                           "INNER JOIN (" + 
                               "SELECT article_id, COUNT(article_id) AS edits " + 
                               "FROM talk " + 
                               "GROUP BY article_id " + 
                               "ORDER BY edits DESC " + 
                               "LIMIT 100 " +
                           ") art ON art.article_id = talk.article_id " + 
                       ") "
                       "UNION " + 
                       "SELECT SUM(if(promoted = TRUE, 1, 0)) as `promoted`, SUM(if(promoted = FALSE, 1, 0)) as `not_promoted`, ROUND(SUM(if(promoted = TRUE, 1, 0)) / COUNT(*), 2) as `promoted_prob` " + 
                       "FROM elections " + 
                       "WHERE user_id NOT IN (" +
                           "SELECT DISTINCT user_id " + 
                           "FROM talk " + 
                           "INNER JOIN (" +
                               "SELECT article_id, COUNT(article_id) AS edits " + 
                               "FROM talk " + 
                               "GROUP BY article_id " + 
                               "ORDER BY edits DESC " + 
                               "LIMIT 100 " + 
                           ") art ON art.article_id = talk.article_id " +
                       ")")

query.show()

+--------+------------+-------------+
|promoted|not_promoted|promoted_prob|
+--------+------------+-------------+
|      50|          57|         0.47|
|    1197|        1490|         0.45|
+--------+------------+-------------+

