In [101]:
'''
  This is a rough workbook, used only for exploring data.
  Please execute sales_by_state.py script to print the actual report
  like so,
  spark-submit sales_by_state.py
'''
import findspark
findspark.init()
from pyspark import SparkContext
from collections import namedtuple
from datetime import datetime
from datetime import timezone
sc = SparkContext('local', 'sales_by_state')

In [193]:
CustomerState = namedtuple('CustomerState', ('cid', 'state'), rename=False)

def parseCustomers(line):
    # 123#AAA Inc#1 First Ave	Mountain View CA#94040
    parts = line.split('#')
    cid = int(parts[0])
    state = parts[-2][-2:]
    return (cid, state)

def printList(rows):
    for row in rows:
        print(row)

customersRdd = sc.textFile('customers.dat').map(parseCustomers)

# Cache only if number of customers is small
customersDat = customersRdd.partitionBy(1).cache().collect()
customersBC = sc.broadcast(customersDat)
printList(customersBC.value)

(123, 'CA')
(456, 'AK')
(789, 'AL')
(101112, 'OR')
(124, 'CA')


In [103]:
import datetime as dtmod
Sales = namedtuple('Sales', ('cid', 'dttm', 'amount'), rename=False)

def parseSales(line):
    # 1454313600#123#123456
    parts = line.split('#')
    d1 = datetime.fromtimestamp(int(parts[0]))
    # roundup date to the hour tz=timezone.utc
    dttm = d1 + dtmod.timedelta(hours = 1, minutes = -d1.minute, seconds = -d1.second, microseconds = -d1.microsecond)
    cid = int(parts[1])
    amount = int(parts[2])
    return (cid, (dttm, amount))

salesRdd = sc.textFile('sales.dat').map(parseSales)
salesRdd.take(1)

[(123, (datetime.datetime(2016, 2, 1, 9, 0), 123456))]

In [104]:
salesRdd.join(customersRdd).take(1)

[(456, ((datetime.datetime(2016, 8, 1, 11, 0), 123458), 'AK'))]

In [105]:
salesRdd.join(customersRdd).take(1)[0][1]

((datetime.datetime(2016, 8, 1, 11, 0), 123458), 'AK')

In [106]:
salesRdd.join(customersRdd).take(1)[0][1][0][0]

datetime.datetime(2016, 8, 1, 11, 0)

In [107]:
salesRdd.join(customersRdd).take(1)[0][1][1]

'AK'

In [73]:
salesRdd.join(customersRdd).take(1)[0][1][0][1]

123458

In [184]:
hourRdd = salesRdd.join(customersRdd) \
    .map(lambda x: ((x[1][1], x[1][0][0]), x[1][0][1])) \
    .reduceByKey(lambda a, b: a + b) \
    .map(lambda x: (x[0][0], x[0][1].year, x[0][1].month, x[0][1].day, x[0][1].hour, x[1]))
hourRdd.take(10)

[('AK', 2016, 8, 1, 11, 123458),
 ('CA', 2016, 2, 1, 9, 246912),
 ('AL', 2016, 8, 1, 12, 123459),
 ('OR', 2016, 2, 1, 9, 123456),
 ('AL', 2017, 8, 1, 10, 123457)]

In [185]:
stateRdd = hourRdd.map(lambda x: (x[0], x[5])) \
    .reduceByKey(lambda a, b: a + b) \
    .map(lambda x: (x[0], 0, 0, 0, 0, x[1]))
stateRdd.take(10)

[('AK', 0, 0, 0, 0, 123458),
 ('CA', 0, 0, 0, 0, 246912),
 ('AL', 0, 0, 0, 0, 246916),
 ('OR', 0, 0, 0, 0, 123456)]

In [172]:
hourRdd.union(stateRdd).sortBy(lambda x: x[0]).take(10)

[('AK', 2016, 8, 1, 11, 123458),
 ('AK', 0, 0, 0, 0, 123458),
 ('AL', 2016, 8, 1, 12, 123459),
 ('AL', 2017, 8, 1, 10, 123457),
 ('AL', 0, 0, 0, 0, 246916),
 ('CA', 2016, 2, 1, 9, 246912),
 ('CA', 0, 0, 0, 0, 246912),
 ('OR', 2016, 2, 1, 9, 123456),
 ('OR', 0, 0, 0, 0, 123456)]

In [186]:
dayRdd = hourRdd.map(lambda x: ((x[0], x[1], x[2], x[3], 0), x[5])) \
    .reduceByKey(lambda a, b: a + b) \
    .map(lambda x: (x[0][0], x[0][1], x[0][2], x[0][3], x[0][4], x[1]))
dayRdd.take(10)

[('AK', 2016, 8, 1, 0, 123458),
 ('CA', 2016, 2, 1, 0, 246912),
 ('AL', 2017, 8, 1, 0, 123457),
 ('AL', 2016, 8, 1, 0, 123459),
 ('OR', 2016, 2, 1, 0, 123456)]

In [187]:
monthRdd = dayRdd.map(lambda x: ((x[0], x[1], x[2], 0, 0), x[5])) \
    .reduceByKey(lambda a, b: a + b) \
    .map(lambda x: (x[0][0], x[0][1], x[0][2], x[0][3], x[0][4], x[1]))
monthRdd.take(10)

[('AL', 2016, 8, 0, 0, 123459),
 ('OR', 2016, 2, 0, 0, 123456),
 ('AK', 2016, 8, 0, 0, 123458),
 ('CA', 2016, 2, 0, 0, 246912),
 ('AL', 2017, 8, 0, 0, 123457)]

In [188]:
yearRdd = dayRdd.map(lambda x: ((x[0], x[1], 0, 0, 0), x[5])) \
    .reduceByKey(lambda a, b: a + b) \
    .map(lambda x: (x[0][0], x[0][1], x[0][2], x[0][3], x[0][4], x[1]))
yearRdd.take(10)

[('AL', 2016, 0, 0, 0, 123459),
 ('OR', 2016, 0, 0, 0, 123456),
 ('AK', 2016, 0, 0, 0, 123458),
 ('CA', 2016, 0, 0, 0, 246912),
 ('AL', 2017, 0, 0, 0, 123457)]

In [192]:
combinedRdd = hourRdd.union(dayRdd) \
    .union(monthRdd) \
    .union(yearRdd) \
    .union(stateRdd) \
    .sortBy(lambda x: (x[0], -x[1], -x[2], -x[3], -x[4]), True)
combinedRdd.take(100)

[('AK', 2016, 8, 1, 11, 123458),
 ('AK', 2016, 8, 1, 0, 123458),
 ('AK', 2016, 8, 0, 0, 123458),
 ('AK', 2016, 0, 0, 0, 123458),
 ('AK', 0, 0, 0, 0, 123458),
 ('AL', 2017, 8, 1, 10, 123457),
 ('AL', 2017, 8, 1, 0, 123457),
 ('AL', 2017, 8, 0, 0, 123457),
 ('AL', 2017, 0, 0, 0, 123457),
 ('AL', 2016, 8, 1, 12, 123459),
 ('AL', 2016, 8, 1, 0, 123459),
 ('AL', 2016, 8, 0, 0, 123459),
 ('AL', 2016, 0, 0, 0, 123459),
 ('AL', 0, 0, 0, 0, 246916),
 ('CA', 2016, 2, 1, 9, 246912),
 ('CA', 2016, 2, 1, 0, 246912),
 ('CA', 2016, 2, 0, 0, 246912),
 ('CA', 2016, 0, 0, 0, 246912),
 ('CA', 0, 0, 0, 0, 246912),
 ('OR', 2016, 2, 1, 9, 123456),
 ('OR', 2016, 2, 1, 0, 123456),
 ('OR', 2016, 2, 0, 0, 123456),
 ('OR', 2016, 0, 0, 0, 123456),
 ('OR', 0, 0, 0, 0, 123456)]

In [100]:
sc.stop()