In [1]:
%pylab inline
sc.version

Populating the interactive namespace from numpy and matplotlib


u'1.3.1'

In [1]:
from xmltodict import parse as parse_xml
from pyspark.sql import SQLContext, Row
from dateutil.parser import parse as parse_date
import re
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from collections import Counter

In [8]:
def xml_to_rdd(file_name):
    def infer_type(input):
        int_re = re.compile('^[\-]?[0-9]+$')
        date_re = re.compile('^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3}$')
        int_type = (int_re.match, int)
        date_type = (date_re.match, parse_date)
        default_type = (lambda x: True, lambda x: x)

        for f, c in [int_type, date_type, default_type]:
            if f(input):
                return c(input)

    def clean_xml_dict(xd):
        return {
                key[1:]: infer_type(value.strip())
                for key, value in xd.items() if key.startswith('@')
            }

    return sc.textFile(file_name)\
    .filter(lambda line: line.strip().startswith('<row'))\
    .map(lambda line: parse_xml(line))\
    .map(lambda xml_dict: xml_dict['row'])\
    .map(clean_xml_dict)

def extract_fields(row, wanted):
    return {
        k: row.get(k, None)
        for k in wanted
    }

def rdd_to_df(rdd, columns, sql_context, schema=None):
    rows = rdd.map(lambda r: extract_fields(r, columns))\
    .map(lambda r: Row(**r))
    
    return sql_context.createDataFrame(rows, schema, 0.1)

def add_to_dict(input, key, value):
    input[key] = value
    return input

data_folder = '/Users/ivoeverts/data/goto2015/se/'

In [7]:
sql_context = SQLContext(sc)
post_age = udf(lambda dt: (datetime(2014,9,14) - dt).days, IntegerType())

In [9]:
posts = xml_to_rdd(data_folder + 'Posts.xml')
posts.take(1)

[{u'AcceptedAnswerId': 393,
  u'AnswerCount': 4,
  u'Body': u"<p>My fianc\xe9e and I are looking for a good Caribbean cruise in October and were wondering which islands are best to see and which Cruise line to take?</p>\n\n<p>It seems like a lot of the cruises don't run in this month due to Hurricane season so I'm looking for other good options.</p>\n\n<p><strong>EDIT</strong> We'll be travelling in 2012.</p>",
  u'ClosedDate': datetime.datetime(2013, 2, 25, 23, 52, 47, 953000),
  u'CommentCount': 4,
  u'CreationDate': datetime.datetime(2011, 6, 21, 20, 19, 34, 730000),
  u'FavoriteCount': 1,
  u'Id': 1,
  u'LastActivityDate': datetime.datetime(2012, 5, 24, 14, 52, 14, 760000),
  u'LastEditDate': datetime.datetime(2011, 12, 28, 21, 36, 43, 910000),
  u'LastEditorUserId': 101,
  u'OwnerUserId': 9,
  u'PostTypeId': 1,
  u'Score': 8,
  u'Tags': u'<caribbean><cruising><vacation>',
  u'Title': u'What are some Caribbean cruises for October?',
  u'ViewCount': 309}]

In [10]:
posts_df = rdd_to_df(posts,
                     ['Id', 'Score', 'CreationDate'],
                     sql_context)

In [11]:
votes = xml_to_rdd(data_folder + 'Votes.xml')
votes.take(1)

[{u'CreationDate': datetime.datetime(2011, 6, 21, 0, 0),
  u'Id': 1,
  u'PostId': 1,
  u'VoteTypeId': 2}]

In [12]:
votes_df = rdd_to_df(votes,
                     ['VoteTypeId', 'CreationDate', 'PostId'],
                     sql_context).withColumnRenamed('CreationDate', 'VoteDate')

In [13]:
posts_votes_df = posts_df.join(votes_df, posts_df.Id == votes_df.PostId, 'inner')

In [14]:
posts_votes_df.show()

CreationDate         Id  Score VoteDate             PostId VoteTypeId
2011-06-21 20:48:... 31  8     2011-06-21 00:00:... 31     2         
2011-06-21 20:48:... 31  8     2011-06-21 00:00:... 31     2         
2011-06-21 20:48:... 31  8     2011-06-21 00:00:... 31     16        
2011-06-21 20:48:... 31  8     2011-06-22 00:00:... 31     2         
2011-06-21 20:48:... 31  8     2011-06-27 00:00:... 31     2         
2011-06-21 20:48:... 31  8     2011-10-31 00:00:... 31     2         
2011-06-21 20:48:... 31  8     2011-11-25 00:00:... 31     2         
2011-06-21 20:48:... 31  8     2011-12-01 00:00:... 31     2         
2011-06-21 20:48:... 31  8     2013-03-16 00:00:... 31     2         
2011-06-21 20:48:... 31  8     2013-12-06 00:00:... 31     2         
2011-06-21 20:48:... 31  8     2013-12-06 00:00:... 31     3         
2011-06-22 17:43:... 231 4     2011-06-22 00:00:... 231    2         
2011-06-22 17:43:... 231 4     2011-06-23 00:00:... 231    2         
2011-06-22 17:43:...

In [17]:
posts_votes_df.groupBy('VoteTypeId').count().show()

VoteTypeId count 
1          4661  
2          143734
3          3033  
5          4207  
6          407   
7          35    
8          409   
9          404   
10         29    
11         58    
12         1     
15         952   
16         2800  


In [15]:
posts_votes_df.rdd.take(5)

[Row(CreationDate=datetime.datetime(2011, 6, 21, 20, 48, 29, 23000), Id=31, Score=8, VoteDate=datetime.datetime(2011, 6, 21, 0, 0), PostId=31, VoteTypeId=2),
 Row(CreationDate=datetime.datetime(2011, 6, 21, 20, 48, 29, 23000), Id=31, Score=8, VoteDate=datetime.datetime(2011, 6, 21, 0, 0), PostId=31, VoteTypeId=2),
 Row(CreationDate=datetime.datetime(2011, 6, 21, 20, 48, 29, 23000), Id=31, Score=8, VoteDate=datetime.datetime(2011, 6, 21, 0, 0), PostId=31, VoteTypeId=16),
 Row(CreationDate=datetime.datetime(2011, 6, 21, 20, 48, 29, 23000), Id=31, Score=8, VoteDate=datetime.datetime(2011, 6, 22, 0, 0), PostId=31, VoteTypeId=2),
 Row(CreationDate=datetime.datetime(2011, 6, 21, 20, 48, 29, 23000), Id=31, Score=8, VoteDate=datetime.datetime(2011, 6, 27, 0, 0), PostId=31, VoteTypeId=2)]

In [18]:
def prepare_post_ft( (post_id, rows) ):
    cnt = Counter()
    cnt.update([
            row.VoteTypeId
            for row in rows if (row.VoteDate - row.CreationDate).days <= 7
        ])
    
    return add_to_dict(dict(cnt), 'PostId', post_id)
    

ft = posts_votes_df.rdd.groupBy(lambda r: r.PostId).map(prepare_post_ft)

In [20]:
ft.take(10)

[{2: 5, 'PostId': 12800},
 {2: 4, 'PostId': 25600},
 {2: 3, 'PostId': 21000},
 {2: 1, 'PostId': 16400},
 {2: 1, 'PostId': 600},
 {1: 1, 2: 1, 'PostId': 24600},
 {2: 10, 'PostId': 7200},
 {2: 4, 'PostId': 7600},
 {2: 4, 'PostId': 2600},
 {2: 9, 'PostId': 5200}]

In [23]:
ft_fields = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13','15','16', 'PostId']
ft_df = rdd_to_df(ft,
                 ft_fields,
                 sql_context,
                 [StructField(f, LongType(), True) for f in ft_fields]).na.fill(0)
ft_df.show()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 69.0 failed 1 times, most recent failure: Lost task 0.0 in stage 69.0 (TID 1033, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/ivoeverts/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/worker.py", line 101, in main
    process()
  File "/Users/ivoeverts/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/worker.py", line 96, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/ivoeverts/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/serializers.py", line 236, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/ivoeverts/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py", line 1220, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-8-d017e36e3f87>", line 28, in <lambda>
TypeError: __new__() keywords must be strings

	at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)
	at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:176)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
	at org.apache.spark.scheduler.Task.run(Task.scala:64)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:744)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


In [100]:
?sql_context.createDataFrame

In [79]:
votes_ft = sql_context.sql("""
select PostId, VoteTypeId, count(*) as Cnt, max(CreationDate) as MaxVoteDate
from votes group by PostId, VoteTypeId""")

In [80]:
posts_votes_ft = posts_df.join(votes_ft, posts_df.Id == votes_ft.PostId, 'inner')

In [78]:
posts_votes_ft.show()

CreationDate         Id   Score PostId VoteTypeId Cnt MaxDate             
2011-06-21 20:48:... 31   8     31     2          9   2013-12-06 00:00:...
2011-06-21 20:48:... 31   8     31     3          1   2013-12-06 00:00:...
2011-06-21 20:48:... 31   8     31     16         1   2011-06-21 00:00:...
2011-06-22 17:43:... 231  4     231    2          4   2013-01-18 00:00:...
2011-06-24 20:10:... 431  4     431    2          5   2011-10-30 00:00:...
2011-06-24 20:10:... 431  4     431    3          1   2011-06-26 00:00:...
2011-06-29 13:35:... 631  3     631    1          1   2011-06-29 00:00:...
2011-06-29 13:35:... 631  3     631    2          3   2011-07-15 00:00:...
2011-07-06 10:45:... 831  7     831    2          7   2014-08-11 00:00:...
2011-07-16 11:49:... 1031 11    1031   2          11  2014-05-20 00:00:...
2011-07-16 11:49:... 1031 11    1031   5          2   2012-01-19 00:00:...
2011-07-24 16:24:... 1231 5     1231   1          1   2011-08-15 00:00:...
2011-07-24 16:24:... 1231