In [59]:
#Invoke/initialize the spark session ( point of entry into the Sparkverse)
from pyspark.sql import *
spark = SparkSession \
.builder.master("local") \
.appName("Word Count") \
.getOrCreate()

print (spark)

<pyspark.sql.session.SparkSession object at 0x7f624824ac88>


In [61]:
#Loading a file to spark creating an RDD (orders data)
ordersRDD = spark.sparkContext.textFile('/home/veepee555/retail_db/orders')
type(ordersRDD)

pyspark.rdd.RDD

In [60]:
#Loading a file to spark creating an RDD (order items data)
order_itemsRDD = spark.sparkContext.textFile('/home/veepee555/retail_db/order_items')

type(order_itemsRDD)

pyspark.rdd.RDD

In [62]:
#Display RDD data , take(10)
for rec in ordersRDD.take(10):
    print (rec)


1,2013-07-25 00:00:00.0,11599,CLOSED
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,COMPLETE
4,2013-07-25 00:00:00.0,8827,CLOSED
5,2013-07-25 00:00:00.0,11318,COMPLETE
6,2013-07-25 00:00:00.0,7130,COMPLETE
7,2013-07-25 00:00:00.0,4530,COMPLETE
8,2013-07-25 00:00:00.0,2911,PROCESSING
9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT
10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT


In [63]:
#Display RDD data , take(10)
for rec in order_itemsRDD.take(10):
    print (rec)


1,1,957,1,299.98,299.98
2,2,1073,1,199.99,199.99
3,2,502,5,250.0,50.0
4,2,403,1,129.99,129.99
5,4,897,2,49.98,24.99
6,4,365,5,299.95,59.99
7,4,502,3,150.0,50.0
8,4,1014,4,199.92,49.98
9,5,957,1,299.98,299.98
10,5,365,5,299.95,59.99


In [65]:
#RDD data can be saved as a file with compression 
path='/home/veepee555/ordersRDDCompressed'
codec = "org.apache.hadoop.io.compress.GzipCodec"
ordersRDD.saveAsTextFile(path,codec)

In [66]:
# to check if the file was infact save and also compressed with gzip codec
!ls -ltr /home/veepee555/ordersRDDCompressed

total 464
-rw-r--r-- 1 veepee555 veepee555 471106 Jul 20 03:18 part-00000.gz
-rw-r--r-- 1 veepee555 veepee555      0 Jul 20 03:18 _SUCCESS


In [70]:
# Developing word count program
# Create a file and type few lines and save it as wordcount.txt  

dataRDD = spark.sparkContext.textFile('/home/veepee555/Downloads/data-master/wordcount.txt')
dataRDDflatmapped=dataRDD.flatMap(lambda x: x.split(" "))
dataRDDMap = dataRDDflatmapped.map(lambda x: (x, 1))
dataReduceByKey = dataRDDMap.reduceByKey(lambda x,y: x + y)
for rec in dataReduceByKey.take(10):
    print (rec)
  


('', 1352)
('Cookie', 1)
('Notice', 1)
('By', 1)
('using', 1)
('this', 3)
('site,', 1)
('you', 3)
('agree', 1)
('to', 38)


In [75]:
# Join disparate datasets together using PySpark

# Problem statement, get the revenue from order_items on daily basis

#order_item_id int,
#order_item_order_id int,
#order_item_product_id int,
#order_item_quantity int,
#order_item_subtotal float,
#order_item_product_price float

#order_id int,
#order_date string,
#order_customer_id int,
#order_status string
#(Both RDD element have the above structures)


ordersRDD = spark.sparkContext.textFile('/home/veepee555/retail_db/orders')
order_itemsRDD = spark.sparkContext.textFile('/home/veepee555/retail_db/order_items')


ordersParsedRDD = ordersRDD.map(lambda rec: (int(rec.split(",")[0]), rec))
orderItemsParsedRDD = order_itemsRDD.map(lambda rec: (int(rec.split(",")[1]),rec))


ordersJoinOrderItems = orderItemsParsedRDD.join(ordersParsedRDD)

revenuePerOrderPerDay = ordersJoinOrderItems.map(lambda t: (t[1][1].split(",")[1], float(t[1][0].split(",")[4])))

for rec in revenuePerOrderPerDay.take(10):
    print (rec)


('2013-07-25 00:00:00.0', 199.99)
('2013-07-25 00:00:00.0', 250.0)
('2013-07-25 00:00:00.0', 129.99)
('2013-07-25 00:00:00.0', 49.98)
('2013-07-25 00:00:00.0', 299.95)
('2013-07-25 00:00:00.0', 150.0)
('2013-07-25 00:00:00.0', 199.92)
('2013-07-25 00:00:00.0', 179.97)
('2013-07-25 00:00:00.0', 299.95)
('2013-07-25 00:00:00.0', 199.92)


In [77]:
# Get order count per day
ordersRDD = spark.sparkContext.textFile('/home/veepee555/retail_db/orders')
ordersPerDay = ordersJoinOrderItems.map(lambda rec: rec[1][1].split(",")[1] + "," + str(rec[0])).distinct()
ordersPerDayParsedRDD = ordersPerDay.map(lambda rec: (rec.split(",")[0], 1))
totalOrdersPerDay = ordersPerDayParsedRDD.reduceByKey(lambda x, y: x + y)
for rec in revenuePerOrderPerDay.take(10):
    print (rec)

('2013-07-25 00:00:00.0', 199.99)
('2013-07-25 00:00:00.0', 250.0)
('2013-07-25 00:00:00.0', 129.99)
('2013-07-25 00:00:00.0', 49.98)
('2013-07-25 00:00:00.0', 299.95)
('2013-07-25 00:00:00.0', 150.0)
('2013-07-25 00:00:00.0', 199.92)
('2013-07-25 00:00:00.0', 179.97)
('2013-07-25 00:00:00.0', 299.95)
('2013-07-25 00:00:00.0', 199.92)
