In [1]:
import pyspark as ps
from pyspark.sql import HiveContext

In [2]:
sc = ps.SparkContext()

In [3]:
hive_context = HiveContext(sc)

In [4]:
import os

# need to get local path since we are reading local files
cwd = os.getcwd()

In [5]:
meals_rdd = sc.textFile('file://' + cwd + '/data/readychef/meals.txt')

In [6]:
events_rdd = sc.textFile('file://' + cwd + '/data/readychef/events.txt')

In [7]:
meals_rdd.take(5)

[u'meal_id;type;dt;price',
 u'1;french;2013-01-01;10',
 u'2;chinese;2013-01-01;13',
 u'3;mexican;2013-01-02;9',
 u'4;italian;2013-01-03;9']

In [8]:
events_rdd.take(5)

[u'dt;userid;meal_id;event',
 u'2013-01-01;3;18;bought',
 u'2013-01-01;7;1;like',
 u'2013-01-01;10;29;bought',
 u'2013-01-01;11;19;share']

In [9]:
header_meals = meals_rdd.first()

In [10]:
header_events = events_rdd.first()

In [11]:
meals_no_header = meals_rdd.filter(lambda row: row != header_meals)

In [12]:
events_no_header = events_rdd.filter(lambda row: row != header_events)

In [13]:
meals_no_header.first()

u'1;french;2013-01-01;10'

In [14]:
events_no_header.first()

u'2013-01-01;3;18;bought'

In [15]:
meals_json = meals_no_header.map(lambda row: row.split(';')) \
                            .map(lambda row_list: dict(zip(header_meals.split(';'), row_list)))

In [16]:
meals_json.take(1)

[{u'dt': u'2013-01-01', u'meal_id': u'1', u'price': u'10', u'type': u'french'}]

In [17]:
events_json = events_no_header.map(lambda row: row.split(';')) \
                              .map(lambda row_list: dict(zip(header_events.split(';'), row_list)))

In [18]:
events_json.take(1)

[{u'dt': u'2013-01-01',
  u'event': u'bought',
  u'meal_id': u'18',
  u'userid': u'3'}]

## Why go through the trouble

* Schema = table + column + types

* column names to index

* leverage SQL and relational theory

## Why not Schemas (SQL)?

* they make your data structured

* Fragility

In [19]:
meals_dataframe = hive_context.jsonRDD(meals_json)

In [20]:
meals_dataframe.take(1)

[Row(_corrupt_record=u"{u'type': u'french', u'dt': u'2013-01-01', u'meal_id': u'1', u'price': u'10'}")]

In [21]:
import json

def type_conversion(d, columns):
    for c in columns:
        d[c] = int(d[c])
        
    return d

In [22]:
meals_typed = meals_json.map(lambda j: json.dumps(type_conversion(j, ['meal_id', 'price'])))

In [23]:
events_typed = events_json.map(lambda j: json.dumps(type_conversion(j, ['meal_id', 'userid'])))

In [24]:
meals_typed.take(1)

['{"type": "french", "dt": "2013-01-01", "meal_id": 1, "price": 10}']

In [25]:
meals_dataframe = hive_context.jsonRDD(meals_typed)

In [26]:
events_dataframe = hive_context.jsonRDD(events_typed)

In [27]:
meals_dataframe.head()

Row(dt=u'2013-01-01', meal_id=1, price=10, type=u'french')

In [28]:
events_dataframe.head()

Row(dt=u'2013-01-01', event=u'bought', meal_id=18, userid=3)

In [29]:
meals_dataframe.printSchema()

root
 |-- dt: string (nullable = true)
 |-- meal_id: long (nullable = true)
 |-- price: long (nullable = true)
 |-- type: string (nullable = true)



In [30]:
events_dataframe.printSchema()

root
 |-- dt: string (nullable = true)
 |-- event: string (nullable = true)
 |-- meal_id: long (nullable = true)
 |-- userid: long (nullable = true)



## Show me the SQL

In [32]:
meals_dataframe.registerTempTable('meals')

In [33]:
events_dataframe.registerTempTable('events')

In [34]:
hive_context.sql('SELECT * FROM meals LIMIT 5').collect()

[Row(dt=u'2013-01-01', meal_id=1, price=10, type=u'french'),
 Row(dt=u'2013-01-01', meal_id=2, price=13, type=u'chinese'),
 Row(dt=u'2013-01-02', meal_id=3, price=9, type=u'mexican'),
 Row(dt=u'2013-01-03', meal_id=4, price=9, type=u'italian'),
 Row(dt=u'2013-01-03', meal_id=5, price=12, type=u'chinese')]

In [35]:
meals_dataframe.take(5)

[Row(dt=u'2013-01-01', meal_id=1, price=10, type=u'french'),
 Row(dt=u'2013-01-01', meal_id=2, price=13, type=u'chinese'),
 Row(dt=u'2013-01-02', meal_id=3, price=9, type=u'mexican'),
 Row(dt=u'2013-01-03', meal_id=4, price=9, type=u'italian'),
 Row(dt=u'2013-01-03', meal_id=5, price=12, type=u'chinese')]

In [36]:
# which cuisine sells the most

hive_context.sql("""
    SELECT type, COUNT(type) as cnt FROM
        meals 
    INNER JOIN 
        events on meals.meal_id = events.meal_id
    WHERE
        event = 'bought'
    GROUP BY
        type
    ORDER BY cnt DESC
""").collect()

[Row(type=u'italian', cnt=22575),
 Row(type=u'french', cnt=16179),
 Row(type=u'mexican', cnt=8792),
 Row(type=u'japanese', cnt=6921),
 Row(type=u'chinese', cnt=6267),
 Row(type=u'vietnamese', cnt=3535)]