In [2]:
from __future__ import division, print_function
from bz2 import BZ2File
import ujson

with BZ2File('./RC_2015-01.bz2') as f:
    line = f.readline()
ujson.loads(line)

{'archived': False,
 'author': 'YoungModern',
 'author_flair_css_class': None,
 'author_flair_text': None,
 'body': 'Most of us have some family members like this. *Most* of my family is like this. ',
 'controversiality': 0,
 'created_utc': '1420070400',
 'distinguished': None,
 'downs': 0,
 'edited': False,
 'gilded': 0,
 'id': 'cnas8zv',
 'link_id': 't3_2qyr1a',
 'name': 't1_cnas8zv',
 'parent_id': 't3_2qyr1a',
 'retrieved_on': 1425124282,
 'score': 14,
 'score_hidden': False,
 'subreddit': 'exmormon',
 'subreddit_id': 't5_2r0gj',
 'ups': 14}

In [3]:
from pandas import Timestamp, NaT, DataFrame
from toolz import dissoc


def to_json(line):
    """Convert a line of json into a cleaned up dict."""
    blob = ujson.loads(line)
    
    # Convert timestamps into Timestamp objects
    date = blob['created_utc']
    blob['created_utc'] = Timestamp.utcfromtimestamp(int(date))
    edited = blob['edited']
    blob['edited'] = Timestamp.utcfromtimestamp(int(edited)) if edited else NaT
    
    # Convert deleted posts into `None`s (missing text data)
    if blob['author'] == '[deleted]':
        blob['author'] = None
    if blob['body'] == '[deleted]':
        blob['body'] = None
        
    # Remove 'id', and 'subreddit_id' as they're redundant
    # Remove 'retrieved_on' as it's irrelevant
    return dissoc(blob, 'id', 'subreddit_id', 'retrieved_on')


columns = ['archived', 'author', 'author_flair_css_class', 'author_flair_text',
           'body', 'controversiality', 'created_utc', 'distinguished', 'downs',
           'edited', 'gilded', 'link_id', 'name', 'parent_id',
           'removal_reason', 'score', 'score_hidden', 'subreddit', 'ups']


def to_df(batch):
    """Convert a list of json strings into a dataframe"""
    blobs = map(to_json, batch)
    df = DataFrame.from_records(blobs, columns=columns)
    return df.set_index('created_utc')

In [5]:
from castra import Castra
from toolz import peek, partition_all

categories = ['distinguished', 'subreddit', 'removal_reason']

with BZ2File('RC_2015-01.bz2') as f:
    batches = partition_all(200000, f)
    df, frames = peek(map(to_df, batches))
    castra = Castra('reddit_data.castra', template=df, categories=categories)
    castra.extend_sequence(frames, freq='3h')

ValueError: Opening a castra with a template, yet this castra
already exists.  Filename: reddit_data.castra

In [6]:
import dask.dataframe as dd
from dask.diagnostics import ProgressBar

# Start a progress bar for all computations
pbar = ProgressBar()
pbar.register()

# Load data into a dask dataframe:
df = dd.from_castra('reddit_data.castra/')
df.head(3)

[########################################] | 100% Completed |  0.6s


Unnamed: 0_level_0,archived,author,author_flair_css_class,author_flair_text,body,controversiality,distinguished,downs,edited,gilded,link_id,name,parent_id,removal_reason,score,score_hidden,subreddit,ups
created_utc,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
2015-01-01,False,YoungModern,,,Most of us have some family members like this....,0.0,,0.0,NaT,0.0,t3_2qyr1a,t1_cnas8zv,t3_2qyr1a,,14.0,False,exmormon,14.0
2015-01-01,False,RedCoatsForever,on,Ontario,But Mill's career was way better. Bentham is l...,0.0,,0.0,NaT,0.0,t3_2qv6c6,t1_cnas8zw,t1_cnas2b6,,3.0,False,CanadaPolitics,3.0
2015-01-01,False,vhisic,,,"Mine uses a strait razor, and as much as i lov...",0.0,,0.0,NaT,0.0,t3_2qxefp,t1_cnas8zx,t3_2qxefp,,1.0,False,AdviceAnimals,1.0


In [7]:
df.ups.count().compute()

[########################################] | 100% Completed |  1.7s


53851542