In [2]:
from pyspark import SparkConf, SparkContext

In [3]:
conf= SparkConf().setMaster("local").setAppName("daily revenue").set("conf.ui.port", "12908")

In [4]:
sc= SparkContext(conf=conf)

In [5]:
# read the 'orders' table from my local storage
orders= sc.textFile("file:///C:/samp_db/retail_db-master/orders/part-00000")

In [6]:
orders.take(3)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE']

In [7]:
orders.map(lambda x: x.split(",")[3]).distinct().take(20)

['CLOSED',
 'PENDING_PAYMENT',
 'COMPLETE',
 'PROCESSING',
 'PAYMENT_REVIEW',
 'PENDING',
 'ON_HOLD',
 'CANCELED',
 'SUSPECTED_FRAUD']

In [8]:
ordersfilt= orders.filter(lambda x: x.split(",")[3] in ("CLOSED", "COMPLETE"))

In [9]:
ordersfilt.take(5)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE',
 '6,2013-07-25 00:00:00.0,7130,COMPLETE']

In [10]:
#to calculate daily revenue we need only order id and date from ordersfilt dataset
ordersByDate= ordersfilt.map(lambda x:(int(x.split(",")[0]), x.split(",")[1]))

In [11]:
ordersByDate.count()

30455

In [12]:
ordersByDate.take(2)

[(1, '2013-07-25 00:00:00.0'), (3, '2013-07-25 00:00:00.0')]

In [13]:
#we are done with "orders successfully placed". Now we need 'orderItems' table to calculate daily revnue, so reading that table from my local storage.
orderItems= sc.textFile("file:///C:/samp_db/retail_db-master/order_items/part-00000")

In [14]:
orderItems.first()

'1,1,957,1,299.98,299.98'

In [15]:
orderItemsById= orderItems.map(lambda x: (int(x.split(",")[1]), float(x.split(",")[4])))

In [16]:
orderItemsById.count()

172198

In [17]:
orderItemsById.take(2)

[(1, 299.98), (2, 199.99)]

In [18]:
ordersJoin= ordersByDate.join(orderItemsById)

In [19]:
ordersJoin.take(5)

[(4, ('2013-07-25 00:00:00.0', 49.98)),
 (4, ('2013-07-25 00:00:00.0', 299.95)),
 (4, ('2013-07-25 00:00:00.0', 150.0)),
 (4, ('2013-07-25 00:00:00.0', 199.92)),
 (12, ('2013-07-25 00:00:00.0', 299.98))]

In [20]:
ordersJoinMap= ordersJoin.map(lambda x: x[1] )

In [21]:
ordersJoinMap.take(3)

[('2013-07-25 00:00:00.0', 49.98),
 ('2013-07-25 00:00:00.0', 299.95),
 ('2013-07-25 00:00:00.0', 150.0)]

In [22]:
ordersRevenuePerDay= ordersJoinMap.reduceByKey(lambda x,y:x+y)

In [23]:
#orders Revenue per Day
ordersRevenuePerDay.take(5)

[('2013-07-26 00:00:00.0', 54713.23000000001),
 ('2013-07-27 00:00:00.0', 48411.479999999996),
 ('2013-07-28 00:00:00.0', 35672.029999999984),
 ('2013-07-29 00:00:00.0', 54579.70000000004),
 ('2013-07-31 00:00:00.0', 59212.490000000005)]

In [24]:
ordersRevenuePerDay.count()

364