In [1]:
import pandas as pd

In [2]:
import os
import sys
import re

from pyspark import SparkContext, SparkConf

from pyspark.sql.types import *
from pyspark.sql import Row
import pyspark.sql.functions as func

In [3]:
spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, spark_home + "/python")

# Add the py4j to the path
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))

# Initialize PySpark to predefine the SparkContext variable 'sc'
execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.1
      /_/

Using Python version 2.7.10 (default, Nov  7 2015 13:18:40)
SparkContext available as sc, HiveContext available as sqlContext.


In [4]:
comments_json = './data/results-20151009-103553.json'  # reddit comments
with open(comments_json) as cmnts:              # open the original file
    all_entries = cmnts.readlines()             # get all lines

parsable_data = "[" + ','.join(all_entries).replace('\n','') + "]" # join all entries with comma, 
                                                                   # replace newline chars 
                                                                   # and set [] around them

In [5]:
comments = pd.read_json(parsable_data,orient='columns',typ='frame',convert_dates=['created'])

In [6]:
comments.describe()

Unnamed: 0,avg_score,controversiality,downs,ups
count,10000.0,10000.0,10000,10000.0
mean,8.6828,0.0324,0,8.6828
std,81.068016,0.177069,0,81.068016
min,-180.0,0.0,0,-180.0
25%,1.0,0.0,0,1.0
50%,1.0,0.0,0,1.0
75%,3.0,0.0,0,3.0
max,3877.0,1.0,0,3877.0


In [7]:
sparkDF = sqlContext.createDataFrame(comments)

In [8]:
cached = sparkDF.persist()

In [9]:
cached.count()

10000

In [10]:
cached.columns

['author',
 'avg_score',
 'comment',
 'controversiality',
 'created',
 'distinguished',
 'downs',
 'name',
 'subr',
 'ups',
 'url']

In [11]:
cached.printSchema()

root
 |-- author: string (nullable = true)
 |-- avg_score: long (nullable = true)
 |-- comment: string (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created: long (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- downs: long (nullable = true)
 |-- name: string (nullable = true)
 |-- subr: string (nullable = true)
 |-- ups: long (nullable = true)
 |-- url: string (nullable = true)



In [12]:
cached.write.parquet("./data/results-20151009-103553.parquet")

In [13]:
parquet = sqlContext.read.parquet("./data/results-20151009-103553.parquet")

In [14]:
parquet.columns

['author',
 'avg_score',
 'comment',
 'controversiality',
 'created',
 'distinguished',
 'downs',
 'name',
 'subr',
 'ups',
 'url']

In [15]:
parquet.printSchema()

root
 |-- author: string (nullable = true)
 |-- avg_score: long (nullable = true)
 |-- comment: string (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created: long (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- downs: long (nullable = true)
 |-- name: string (nullable = true)
 |-- subr: string (nullable = true)
 |-- ups: long (nullable = true)
 |-- url: string (nullable = true)



In [16]:
employeeSchema = StructType([
                       StructField('id', IntegerType(), nullable=False),
                       StructField('first_name', StringType(), nullable=False),
                       StructField('last_name', StringType(), nullable=False),
                       StructField('email', StringType(), nullable=False),
                       StructField('phone', StringType(), nullable=False),
                       StructField('department', StringType(), nullable=False)
                      ])

In [17]:
employeeData = sc.parallelize([
                                Row(1,'John','Doe','john.doe@company.com','555-12345', 'Sales'),
                                Row(2,'Johnny','Wishbone','johnny.wishbone@company.com','555-01534','IT'),
                                Row(3,'Max','Mustermann','max.mustermann@company.com','555-45678','IT'),
                                Row(4,'Helga','Musterfrau','helga.musterfrau@company.com','555-0963','Business'),
                                Row(5,'George','Schmidt','george.schmidt@company.com','555-67232','Business'),
                                Row(6,'Bob','Moore','bob.moore@company.com','555-78123','Sales'),
                                Row(6,'Alex','Warny','alex.warny@company.com','555-4567','Operations'),
                                Row(6,'Fred','Bernoulli','fred.bernoulli@company.com','555-0984','Operations'),
                                Row(6,'Martha','Richards','martha.richards@company.com','555-45123','Acquisitions'),
                                Row(6,'Helena','Cartier','helena.cartier@company.com','555-34908','Acquisitions')
                             ])

In [18]:
employeeDF = sqlContext.createDataFrame(employeeData, employeeSchema)

In [19]:
employeeDF.count()

10

In [20]:
employeeDF.first()

Row(id=1, first_name=u'John', last_name=u'Doe', email=u'john.doe@company.com', phone=u'555-12345', department=u'Sales')

In [21]:
selected = employeeDF.select('first_name','last_name')
selected.show()

+----------+----------+
|first_name| last_name|
+----------+----------+
|      John|       Doe|
|    Johnny|  Wishbone|
|       Max|Mustermann|
|     Helga|Musterfrau|
|    George|   Schmidt|
|       Bob|     Moore|
|      Alex|     Warny|
|      Fred| Bernoulli|
|    Martha|  Richards|
|    Helena|   Cartier|
+----------+----------+



In [22]:
employeeDF.select('email').take(2)

[Row(email=u'john.doe@company.com'), Row(email=u'johnny.wishbone@company.com')]

In [23]:
names = employeeDF.filter(func.col('last_name') == 'Musterfrau')

In [24]:
names.show()

+---+----------+----------+--------------------+--------+----------+
| id|first_name| last_name|               email|   phone|department|
+---+----------+----------+--------------------+--------+----------+
|  4|     Helga|Musterfrau|helga.musterfrau@...|555-0963|  Business|
+---+----------+----------+--------------------+--------+----------+



In [25]:
employeeDF.select(func.col('department').alias('group')).show()

+------------+
|       group|
+------------+
|       Sales|
|          IT|
|          IT|
|    Business|
|    Business|
|       Sales|
|  Operations|
|  Operations|
|Acquisitions|
|Acquisitions|
+------------+



In [26]:
employeeDF.filter(func.col('department') == 'Business').groupBy('email').count().show()

+--------------------+-----+
|               email|count|
+--------------------+-----+
|george.schmidt@co...|    1|
|helga.musterfrau@...|    1|
+--------------------+-----+



In [27]:
pandas_df = parquet.where(func.col('avg_score') > 10).toPandas()

In [28]:
pandas_df

Unnamed: 0,author,avg_score,comment,controversiality,created,distinguished,downs,name,subr,ups,url
0,Keundrum,3877,I don't know why they would possibly tell peop...,0,1439958069000000000,,0,t1_cu7x0ut,worldnews,3877,http://reddit.com/r/worldnews/comments/3hidw9/...
1,McBjure,13,He wont earn shit because it's against youtube...,0,1439230955000000000,,0,t1_cty7slu,videos,13,http://reddit.com/r/videos/comments/3ggt2v/c/c...
2,Egsession,37,They weren't dating that long though -- not ev...,0,1438453918000000000,,0,t1_cto3vte,movies,37,http://reddit.com/r/movies/comments/3fe8ru/c/c...
3,fresh_prince_,13,kek,0,1439654076000000000,,0,t1_cu3ync9,DotA2,13,http://reddit.com/r/DotA2/comments/3h30x7/c/cu...
4,djohn_14,11,"I think DS2's DLC had AMAZING music, especiall...",0,1438713706000000000,,0,t1_ctrdvzs,Games,11,http://reddit.com/r/Games/comments/3fr6dt/c/ct...
5,MissyPie,30,"Thank you! ^ ^\n\nI'm sorry, my heart belongs ...",0,1440596771000000000,,0,t1_cug8byb,anime,30,http://reddit.com/r/anime/comments/3igs0f/c/cu...
6,Nilwx,23,"""Spaceship""",0,1439068042000000000,,0,t1_ctw8zz5,GlobalOffensive,23,http://reddit.com/r/GlobalOffensive/comments/3...
7,Mike_Gainer,78,Mastodon: Pendulous Skin\nhttps://www.youtube....,0,1440167085000000000,,0,t1_cuao760,Music,78,http://reddit.com/r/Music/comments/3hu1mf/c/cu...
8,nopooq,11,Yes! This happens SO often. Thank you for shar...,0,1440174263000000000,,0,t1_cuaswna,AskReddit,11,http://reddit.com/r/AskReddit/comments/3hu28g/...
9,LutherDingle,11,The article characterizes Trump as an outsider...,0,1438699354000000000,,0,t1_ctr482t,politics,11,http://reddit.com/r/politics/comments/3fqr96/c...


In [29]:
pandas_df.describe()

Unnamed: 0,avg_score,controversiality,created,downs,ups
count,976.0,976.0,976.0,976,976.0
mean,73.846311,0.003074,1.439703e+18,0,73.846311
std,250.121285,0.055385,773806400000000.0,0,250.121285
min,11.0,0.0,1.438387e+18,0,11.0
25%,15.0,0.0,1.439068e+18,0,15.0
50%,23.0,0.0,1.439686e+18,0,23.0
75%,50.0,0.0,1.440365e+18,0,50.0
max,3877.0,1.0,1.441064e+18,0,3877.0


In [30]:
def dt_conversion(timestamp):
    return pd.to_datetime(timestamp)

In [31]:
dt_conv = func.udf(dt_conversion, TimestampType())
new_df = parquet.withColumn('created', dt_conv(parquet.created))

In [32]:
new_df.registerTempTable("test")

In [33]:
rows = sqlContext.table('test').show()

+--------------------+---------+--------------------+----------------+--------------------+-------------+-----+----------+---------------+----+--------------------+
|              author|avg_score|             comment|controversiality|             created|distinguished|downs|      name|           subr| ups|                 url|
+--------------------+---------+--------------------+----------------+--------------------+-------------+-----+----------+---------------+----+--------------------+
|            Keundrum|     3877|I don't know why ...|               0|2015-08-19 04:21:...|         null|    0|t1_cu7x0ut|      worldnews|3877|http://reddit.com...|
|  _My_Angry_Account_|        1|¯\\_(ツ)_/¯ "¿Porq...|               0|2015-08-19 04:21:...|         null|    0|t1_cu7x18u|  todayilearned|   1|http://reddit.com...|
|        amandadebbie|        3|This is unprofess...|               0|2015-08-17 01:48:...|         null|    0|t1_cu5fyau|TwoXChromosomes|   3|http://reddit.com...|
|         

In [34]:
sqlContext.udf.register('dt_conv_udf', dt_conversion, TimestampType())

In [35]:
sqlContext.sql("select dt_conv_udf(created) from test").count()

10000