In [1]:
import pyspark as ps
import json
from pyspark.sql import HiveContext

In [2]:
sc = ps.SparkContext(master='spark://127.0.0.1:7077', conf=ps.SparkConf().setAppName("Performance Tuning"))
hive_context = HiveContext(sc)

In [1]:
import os

# need to get local path since we are reading local files
cwd = os.getcwd()

In [3]:
file_rdd = sc.textFile('file://' + cwd + '/data/toy_data.txt')

In [4]:
# default OSX block size is 4kb
file_rdd.getNumPartitions()

2

In [5]:
cat data/toy_data.txt

{"Jane": "2"}
{"Jane": "1"}
{"Pete": "20"}
{"Tyler": "3"}
{"Duncan": "4"}
{"Yuki": "5"}
{"Duncan": "6"}
{"Duncan": "4"}
{"Duncan": "5"}


In [6]:
file_rdd.mapPartitionsWithIndex(lambda i, iterator: (i, list(iterator))).collect()

[0,
 [u'{"Jane": "2"}',
  u'{"Jane": "1"}',
  u'{"Pete": "20"}',
  u'{"Tyler": "3"}',
  u'{"Duncan": "4"}'],
 1,
 [u'{"Yuki": "5"}',
  u'{"Duncan": "6"}',
  u'{"Duncan": "4"}',
  u'{"Duncan": "5"}']]

In [7]:
file_rdd = sc.textFile('file:///Users/jonathandinu/spark-ds-applications/data/toy_data.txt', 10)

In [8]:
# some partitions are empty, can't split a single line
file_rdd.mapPartitionsWithIndex(lambda i, iterator: (i, list(iterator))).collect()

[0,
 [u'{"Jane": "2"}'],
 1,
 [u'{"Jane": "1"}'],
 2,
 [u'{"Pete": "20"}'],
 3,
 [u'{"Tyler": "3"}'],
 4,
 [u'{"Duncan": "4"}'],
 5,
 [u'{"Yuki": "5"}'],
 6,
 [u'{"Duncan": "6"}'],
 7,
 [u'{"Duncan": "4"}'],
 8,
 [],
 9,
 [u'{"Duncan": "5"}'],
 10,
 []]

In [9]:
tup_rdd = file_rdd.map(lambda line: json.loads(line)) \
                  .map(lambda d: (d.keys()[0], int(d.values()[0])))

out = tup_rdd.groupByKey().mapValues(lambda tup: max(tup.data)) 

In [10]:
tup_rdd.setName('toy_data_tuples')

toy_data_tuples PythonRDD[10] at RDD at PythonRDD.scala:43

In [11]:
tup_rdd.collect()

[(u'Jane', 2),
 (u'Jane', 1),
 (u'Pete', 20),
 (u'Tyler', 3),
 (u'Duncan', 4),
 (u'Yuki', 5),
 (u'Duncan', 6),
 (u'Duncan', 4),
 (u'Duncan', 5)]

In [12]:
print out.toDebugString()

(11) PythonRDD[11] at RDD at PythonRDD.scala:43 []
 |   MapPartitionsRDD[9] at mapPartitions at PythonRDD.scala:346 []
 |   ShuffledRDD[8] at partitionBy at NativeMethodAccessorImpl.java:-2 []
 +-(11) PairwiseRDD[7] at groupByKey at <ipython-input-9-5dd19bb9f251>:3 []
    |   PythonRDD[6] at groupByKey at <ipython-input-9-5dd19bb9f251>:3 []
    |   MapPartitionsRDD[4] at textFile at NativeMethodAccessorImpl.java:-2 []
    |   file:///Users/jonathandinu/Repositories/pearson/data-applications-sprk/clean_notebooks/data/toy_data.txt HadoopRDD[3] at textFile at NativeMethodAccessorImpl.java:-2 []


## Readychef

In [13]:
meals_rdd = sc.textFile('file:///Users/jonathandinu/spark-ds-applications/data/readychef/meals.txt')
events_rdd = sc.textFile('file:///Users/jonathandinu/spark-ds-applications/data/readychef/events.txt')

In [14]:
header_meals = meals_rdd.first()
header_events = events_rdd.first()

In [15]:
meals_no_header = meals_rdd.filter(lambda row: row != header_meals)
events_no_header = events_rdd.filter(lambda row: row != header_events)

In [16]:
meals_json = meals_no_header.map(lambda row: row.split(';')) \
                            .map(lambda row_list: dict(zip(header_meals.split(';'), row_list)))
    
events_json = events_no_header.map(lambda row: row.split(';')) \
                              .map(lambda row_list: dict(zip(header_events.split(';'), row_list)))

In [32]:
meals_json.cache()
events_json.cache()

PythonRDD[124] at RDD at PythonRDD.scala:43

In [33]:
def type_conversion(d, columns):
    for c in columns:
        d[c] = int(d[c])
        
    return d

In [34]:
meals_typed = meals_json.map(lambda j: json.dumps(type_conversion(j, ['meal_id', 'price'])))

In [35]:
events_typed = events_json.map(lambda j: json.dumps(type_conversion(j, ['meal_id', 'userid'])))

In [20]:
# no pre-filter
sc.setJobGroup('PySpark -- nofilter', "PySpark nofilter join performance")

meals_json.keyBy(lambda x: x['meal_id']) \
            .join(events_json.keyBy(lambda x: x['meal_id'])) \
            .filter(lambda tup: tup[1][1]['event'] == 'bought') \
            .groupBy(lambda tup: tup[1][0]['type']) \
            .mapValues(lambda val: len(val)) \
            .sortBy(lambda g: g[1], ascending=False) \
            .collect()

[(u'italian', 22575),
 (u'french', 16179),
 (u'mexican', 8792),
 (u'japanese', 6921),
 (u'chinese', 6267),
 (u'vietnamese', 3535)]

In [37]:
# pre-filter before join
sc.setJobGroup('PySpark -- prefilter', "PySpark cached RDD join performance")

meals_json.keyBy(lambda x: x['meal_id']) \
            .join(events_json.filter(lambda x: x['event'] == 'bought') \
                             .keyBy(lambda x: x['meal_id']) \
                 ) \
            .filter(lambda tup: tup[1][1]['event'] == 'bought') \
            .groupBy(lambda tup: tup[1][0]['type']) \
            .mapValues(lambda val: len(val)) \
            .sortBy(lambda g: g[1], ascending=False) \
            .collect()

[(u'italian', 22575),
 (u'french', 16179),
 (u'mexican', 8792),
 (u'japanese', 6921),
 (u'chinese', 6267),
 (u'vietnamese', 3535)]

In [40]:
# reduceByKey
sc.setJobGroup('PySpark -- prefilter', "PySpark cached RDD join with reduce by key performance")

meals_json.keyBy(lambda x: x['meal_id']) \
            .join(events_json.filter(lambda x: x['event'] == 'bought') \
                             .keyBy(lambda x: x['meal_id']) \
                 ) \
            .filter(lambda tup: tup[1][1]['event'] == 'bought') \
            .map(lambda tup: (tup[1][0]['type'], 1)) \
            .reduceByKey(lambda a, b: a + b) \
            .sortBy(lambda g: g[1], ascending=False) \
            .collect()

[(u'italian', 22575),
 (u'french', 16179),
 (u'mexican', 8792),
 (u'japanese', 6921),
 (u'chinese', 6267),
 (u'vietnamese', 3535)]

In [60]:
joined_rdd.take(5)

[(u'164',
  ({u'dt': u'2013-01-31',
    u'meal_id': u'164',
    u'price': u'15',
    u'type': u'italian'},
   {u'dt': u'2013-01-27',
    u'event': u'bought',
    u'meal_id': u'164',
    u'userid': u'76'})),
 (u'164',
  ({u'dt': u'2013-01-31',
    u'meal_id': u'164',
    u'price': u'15',
    u'type': u'italian'},
   {u'dt': u'2013-01-29',
    u'event': u'bought',
    u'meal_id': u'164',
    u'userid': u'31'})),
 (u'164',
  ({u'dt': u'2013-01-31',
    u'meal_id': u'164',
    u'price': u'15',
    u'type': u'italian'},
   {u'dt': u'2013-01-29',
    u'event': u'bought',
    u'meal_id': u'164',
    u'userid': u'88'})),
 (u'164',
  ({u'dt': u'2013-01-31',
    u'meal_id': u'164',
    u'price': u'15',
    u'type': u'italian'},
   {u'dt': u'2013-01-29',
    u'event': u'bought',
    u'meal_id': u'164',
    u'userid': u'203'})),
 (u'164',
  ({u'dt': u'2013-01-31',
    u'meal_id': u'164',
    u'price': u'15',
    u'type': u'italian'},
   {u'dt': u'2013-01-30',
    u'event': u'bought',
    u'meal_id

In [39]:
print ordered_list.toDebugString()

(4) PythonRDD[90] at collect at <ipython-input-37-1cdbd71232ce>:1 []
 |  MapPartitionsRDD[89] at mapPartitions at PythonRDD.scala:346 []
 |  ShuffledRDD[88] at partitionBy at NativeMethodAccessorImpl.java:-2 []
 +-(4) PairwiseRDD[87] at sortBy at <ipython-input-36-8c75a2437747>:1 []
    |  PythonRDD[86] at sortBy at <ipython-input-36-8c75a2437747>:1 []
    |  MapPartitionsRDD[83] at mapPartitions at PythonRDD.scala:346 []
    |  ShuffledRDD[82] at partitionBy at NativeMethodAccessorImpl.java:-2 []
    +-(4) PairwiseRDD[81] at groupBy at <ipython-input-35-cfb2ef5806c3>:1 []
       |  PythonRDD[80] at groupBy at <ipython-input-35-cfb2ef5806c3>:1 []
       |  MapPartitionsRDD[79] at mapPartitions at PythonRDD.scala:346 []
       |  ShuffledRDD[78] at partitionBy at NativeMethodAccessorImpl.java:-2 []
       +-(4) PairwiseRDD[77] at join at <ipython-input-34-5071b7146a50>:4 []
          |  PythonRDD[76] at join at <ipython-input-34-5071b7146a50>:4 []
          |  UnionRDD[75] at union at N

### Dataframes

In [22]:
sc.setJobGroup('Spark SQL', "spark sql performance")

In [23]:
meals_dataframe = hive_context.jsonRDD(meals_typed)
events_dataframe = hive_context.jsonRDD(events_typed)

In [24]:
meals_dataframe.registerTempTable('meals')
events_dataframe.registerTempTable('events')

In [25]:
# which cuisine sells the most

hive_context.sql("""
    SELECT type, COUNT(type) as cnt FROM
        meals 
    INNER JOIN 
        events on meals.meal_id = events.meal_id
    WHERE
        event = 'bought'
    GROUP BY
        type
    ORDER BY cnt DESC
""").collect()

[Row(type=u'italian', cnt=22575),
 Row(type=u'french', cnt=16179),
 Row(type=u'mexican', cnt=8792),
 Row(type=u'japanese', cnt=6921),
 Row(type=u'chinese', cnt=6267),
 Row(type=u'vietnamese', cnt=3535)]

## Airlines

In [26]:
import numpy as np

sc.setJobGroup('Airline Data', "Airline Dataset")

link = 's3n://[AWS_ACCESS_KEY]:[AWS_SECRET]@mortar-example-data/airline-data'
airline = sc.textFile(link)

airline_no_quote = airline.map(lambda line: line.replace('\'', '').replace('\"', '').strip(','))

header_line = airline_no_quote.first()
header_list = header_line.split(',')

airline_no_header = airline_no_quote.filter(lambda row: row != header_line)

def make_row(row):
    row_list = row.split(',')
    
    d = dict(zip(header_list, row_list))
    
    return d

airline_rows = airline_no_header.map(make_row)

In [27]:
sc.setJobGroup('Airline Data', "no filter")

destination_rdd = airline_rows.map(lambda row: (row['DEST_AIRPORT_ID'], row))
origin_rdd = airline_rows.map(lambda row: (row['ORIGIN_AIRPORT_ID'], row))

mean_delays_dest = destination_rdd.groupByKey() \
                                  .mapValues(lambda delays: \
                                             np.mean(map(lambda row: \
                                                             float(row['ARR_DELAY']) if row['ARR_DELAY'] else 0, \
                                                         delays.data)))
    
mean_delays_origin = origin_rdd.groupByKey()
                               .mapValues(lambda delays: \
                                          np.mean(map(lambda row: \
                                                          float(row['DEP_DELAY']) if row['DEP_DELAY'] else 0, \
                                                      delays.data)))

print mean_delays_origin.sortBy(lambda t: t[1], ascending=True).take(10)

[(u'12129', -6.7547169811320753), (u'15991', -6.0978441127694856), (u'12888', -5.9056603773584904), (u'14113', -5.3462002412545235), (u'10779', -5.1457627118644069), (u'13127', -5.0891265597147948), (u'14633', -4.9087677725118484), (u'10739', -4.666666666666667), (u'15897', -4.6107142857142858), (u'11274', -4.6034482758620694)]


In [28]:
sc.setJobGroup('Airline Data', "filtered first")

destination_rdd = airline_rows.map(lambda row: (row['DEST_AIRPORT_ID'], \
                                               float(row['ARR_DELAY'] if row['ARR_DELAY'] else 0)))

origin_rdd = airline_rows.map(lambda row: (row['ORIGIN_AIRPORT_ID'], \
                                          float(row['DEP_DELAY']) if row['DEP_DELAY'] else 0))

mean_delays_dest = destination_rdd.groupByKey().mapValues(lambda delays: np.mean(delays.data))
mean_delays_origin = origin_rdd.groupByKey().mapValues(lambda delays: np.mean(delays.data))

print mean_delays_origin.sortBy(lambda t: t[1], ascending=True).take(10)

[(u'12129', -6.7547169811320753), (u'15991', -6.0978441127694856), (u'12888', -5.9056603773584904), (u'14113', -5.3462002412545235), (u'10779', -5.1457627118644069), (u'13127', -5.0891265597147948), (u'14633', -4.9087677725118484), (u'10739', -4.666666666666667), (u'15897', -4.6107142857142858), (u'11274', -4.6034482758620694)]


In [29]:
airline_rows.getNumPartitions()

11

In [31]:
print mean_delays_origin.toDebugString()

(11) PythonRDD[122] at RDD at PythonRDD.scala:43 []
 |   MapPartitionsRDD[114] at mapPartitions at PythonRDD.scala:346 []
 |   ShuffledRDD[113] at partitionBy at NativeMethodAccessorImpl.java:-2 []
 +-(11) PairwiseRDD[112] at groupByKey at <ipython-input-28-824c2b202e95>:6 []
    |   PythonRDD[111] at groupByKey at <ipython-input-28-824c2b202e95>:6 []
    |   MapPartitionsRDD[90] at textFile at NativeMethodAccessorImpl.java:-2 []
    |   s3n://AKIAJRN6IWDBI5XT3M6Q:3iOWT533UlP1gzzJeIYiixCZLDcYeUA9g0wAWGET@mortar-example-data/airline-data HadoopRDD[89] at textFile at NativeMethodAccessorImpl.java:-2 []


In [41]:
sc.setJobGroup('Airline Data -- filtered', "reduceByKey + filtered")

destination_rdd = airline_rows.map(lambda row: (row['DEST_AIRPORT_ID'], \
                                               float(row['ARR_DELAY'] if row['ARR_DELAY'] else 0)))

origin_rdd = airline_rows.map(lambda row: (row['ORIGIN_AIRPORT_ID'], \
                                          float(row['DEP_DELAY']) if row['DEP_DELAY'] else 0))

mean_delays_dest = destination_rdd.mapValues(lambda row: (row, 1)) \
                                  .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
    
mean_delays_origin = origin_rdd.mapValues(lambda row: (row, 1)) \
                               .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

print mean_delays_origin.sortBy(lambda t: t[1], ascending=True).take(10)

[(u'12402', (-6601.0, 6001)), (u'11898', (-5898.0, 1510)), (u'14113', (-4432.0, 829)), (u'14633', (-4143.0, 844)), (u'11648', (-3880.0, 1832)), (u'15991', (-3677.0, 603)), (u'12280', (-3646.0, 2664)), (u'10779', (-3036.0, 590)), (u'13127', (-2855.0, 561)), (u'11695', (-2795.0, 1892))]


In [43]:
airline_rows.cache()

PythonRDD[218] at RDD at PythonRDD.scala:43

In [44]:
sc.setJobGroup('Airline Data -- filtered', "cache first run")

destination_rdd = airline_rows.map(lambda row: (row['DEST_AIRPORT_ID'], \
                                               float(row['ARR_DELAY'] if row['ARR_DELAY'] else 0)))

origin_rdd = airline_rows.map(lambda row: (row['ORIGIN_AIRPORT_ID'], \
                                          float(row['DEP_DELAY']) if row['DEP_DELAY'] else 0))

mean_delays_dest = destination_rdd.mapValues(lambda row: (row, 1)) \
                                  .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
    
mean_delays_origin = origin_rdd.mapValues(lambda row: (row, 1)) \
                               .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

print mean_delays_origin.sortBy(lambda t: t[1], ascending=True).take(10)

[(u'12402', (-6601.0, 6001)), (u'11898', (-5898.0, 1510)), (u'14113', (-4432.0, 829)), (u'14633', (-4143.0, 844)), (u'11648', (-3880.0, 1832)), (u'15991', (-3677.0, 603)), (u'12280', (-3646.0, 2664)), (u'10779', (-3036.0, 590)), (u'13127', (-2855.0, 561)), (u'11695', (-2795.0, 1892))]


In [45]:
sc.setJobGroup('Airline Data -- filtered', "cache second run")

destination_rdd = airline_rows.map(lambda row: (row['DEST_AIRPORT_ID'], \
                                               float(row['ARR_DELAY'] if row['ARR_DELAY'] else 0)))

origin_rdd = airline_rows.map(lambda row: (row['ORIGIN_AIRPORT_ID'], \
                                          float(row['DEP_DELAY']) if row['DEP_DELAY'] else 0))

mean_delays_dest = destination_rdd.mapValues(lambda row: (row, 1)) \
                                  .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
    
mean_delays_origin = origin_rdd.mapValues(lambda row: (row, 1)) \
                               .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

print mean_delays_origin.sortBy(lambda t: t[1], ascending=True).take(10)

[(u'12402', (-6601.0, 6001)), (u'11898', (-5898.0, 1510)), (u'14113', (-4432.0, 829)), (u'14633', (-4143.0, 844)), (u'11648', (-3880.0, 1832)), (u'15991', (-3677.0, 603)), (u'12280', (-3646.0, 2664)), (u'10779', (-3036.0, 590)), (u'13127', (-2855.0, 561)), (u'11695', (-2795.0, 1892))]
