<a href="https://colab.research.google.com/github/abdelilah-bouslama/Data-Science/blob/master/Apache_Spark/cca175/RDD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [0]:
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz

In [0]:
!tar xf spark-2.4.5-bin-hadoop2.7.tgz

In [0]:
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

# **Loading Data**

In [0]:
ordersRdd = sc.textFile("/content/drive/My Drive/BIG DATA/pyspark/CCA-175/dataset/retail_db/orders/part-00000")

In [0]:
orderItemsRdd = sc.textFile("/content/drive/My Drive/BIG DATA/pyspark/CCA-175/dataset/retail_db/order_items/part-00000") 

In [0]:
productsRdd = sc.textFile("/content/drive/My Drive/BIG DATA/pyspark/CCA-175/dataset/retail_db/products/part-00000")

# **Working on orders dataset**

In [0]:
# Get order list of given status(CLOSED)
orderPerStatus = ordersRdd.filter(lambda order: order.split(",")[3] == "CLOSED")
for i in orderPerStatus.take(3) : print(i)

In [0]:
# get Payed orders of given Year
ordersPerYear = ordersRdd.filter(lambda order: order.split(",")[1].split("-")[0] == "2013" and order.split(",")[3] == "COMPLETE")
for i in ordersPerYear.take(3): print(i)

In [0]:
# get detail of PENDING orders in the last 5 days of each month
pendingOrders = ordersRdd.filter(lambda order : order.split(",")[3] == "PENDING_PAYMENT")\
    .filter(lambda order : 25 >= int(order.split(",")[1].split("-")[2].replace("00:00:00.0", "")) <= 31)\
    .map(lambda order: (order.split(",")[1].split("-")[1], order))\
    .groupByKey()
for i in pendingOrders.take(3): print(i)

In [0]:
ordersMap = ordersRdd.map(lambda order: (int(order.split(",")[2]), 1))\
  .reduceByKey(lambda x,y : x+y)
for i in ordersMap.take(2): print(i)

In [0]:
# Get revenue for orderId
orderidRevenue = orderItemsRdd.filter(lambda orderItem : int(orderItem.split(",")[1]) == 2 and orderItem.split(",")[4] != "")\
    .map(lambda orderItem : float(orderItem.split(",")[4]))\
    .reduce(lambda x, y : x+y)
orderidRevenue

In [0]:
#Get min subtotal of given orderId
minOrderItemForOrder = orderItemsRdd.filter(lambda orderItem : int(orderItem.split(",")[1]) == 2 and orderItem.split(",")[4] != "")\
    .map(lambda orderItem : float(orderItem.split(",")[4]))\
    .reduce(lambda x,y : x if x < y  else  y)

minOrderItemForOrder

In [0]:
# Get max subtotal of each orderId
maxOrderItemForOrder = orderItemsRdd.filter(lambda orderItem : orderItem.split(",")[4] != "")\
    .map(lambda orderItem : (int(orderItem.split(",")[1]),float(orderItem.split(",")[4])))\
    .reduceByKey(lambda x,y : x if x > y  else  y)
    
for i in maxOrderItemForOrder.take(5): print(i)

In [0]:
orderMap = ordersRdd.map(lambda order : (order.split(",")[3], 1))\
    .reduceByKey(lambda x, y : x+y)
for i in orderMap.take(10): print(i)

In [0]:
#Get order items sorted by order_item_subtotal for each orderId
orderItemsMap = orderItemsRdd.filter(lambda orderItem : orderItem.split(",")[4] != "")\
    .map(lambda orderItemFilter : (int(orderItemFilter.split(",")[1]), orderItemFilter))\
    .groupByKey()\
    .map(lambda orderSubTotals : (orderSubTotals[0], sorted(orderSubTotals[1], key= lambda x :  float(x.split(",")[4]), reverse=True)))
for i in orderItemsMap.take(3): print(i)

In [0]:
#Get revenue for each order Id using groupByKey
orderItemMap = orderItemsRdd.map(lambda orderItem: (int(orderItem.split(",")[1]), float(orderItem.split(",")[4])))

In [0]:
for i in orderItemMap.take(3) : print (i)

(1, 299.98)
(2, 199.99)
(2, 250.0)


In [0]:
#Get avg and count of subtotal_orderItem for each order (orderId, (count, avg))
orderItemAggregate = orderItemMap.aggregateByKey((0,0.0), seqFunc=lambda x,y: (x[0]+1, x[1] + y), combFunc=lambda x, y : (x[0]+y[0], (x[1] + y[1])/2))\
    .map(lambda orderItem : (orderItem[0], (orderItem[1][0], (orderItem[1][1]/orderItem[1][0]))))
for i in orderItemAggregate.take(3):print(i)

(2, (3, 193.32666666666668))
(4, (4, 174.9625))
(8, (4, 182.45999999999998))


# **Working on products dataset**

In [0]:
#Get list products per category
productsPerCategory = productsRdd\
    .filter(lambda product: product.split(",")[4] != "")\
    .map(lambda product: (int(product.split(",")[1]), int(product.split(',')[0])))\
    .groupByKey()

In [0]:
for i in productsPerCategory.take(3): print(i)

(2, <pyspark.resultiterable.ResultIterable object at 0x7f92fd6ea9e8>)
(4, <pyspark.resultiterable.ResultIterable object at 0x7f92fd6ea8d0>)
(6, <pyspark.resultiterable.ResultIterable object at 0x7f92fd6eaa58>)


In [0]:
#Get detail of ordered products, sorted by number of orders per product
productsRdd = sc.textFile("/content/drive/My Drive/BIG DATA/pyspark/CCA-175/dataset/retail_db/products/part-00000")

In [0]:
productsMap = productsRdd.map(lambda product: (int(product.split(",")[0]), product))
for i in productsMap.take(2): print(i)

In [0]:
productOrdersJoin = productsMap.join(ordersMap)
for i in productOrdersJoin.take(3): print(i)

In [0]:
coutProducts = productOrdersJoin.map(lambda product : (int(product[1][1]),  product[1][0]))
for i in coutProducts.take(3): print(i)

In [0]:
countProductsSorted = coutProducts.sortByKey(ascending=False)
for i in countProductsSorted.take(3): print(i)

In [0]:
#Sort data by product price
productMap = productsRdd.filter(lambda product: product.split(",")[4] != "")\
    .map(lambda product: (float(product.split(",")[4]), product))\
    .sortByKey(False)
for i in productMap.take(4):print(i)

(1999.99, '208,10,SOLE E35 Elliptical,,1999.99,http://images.acmesports.sports/SOLE+E35+Elliptical')
(1799.99, '66,4,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill')
(1799.99, '199,10,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill')
(1799.99, '496,22,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill')


In [0]:
productMap = productsRdd.filter(lambda product: product.split(",")[4] != "")\
    .map(lambda product: (int(product.split(",")[1]), float(product.split(",")[4])))\
    .groupByKey()\
    .map(lambda product: (product[0], sorted(product[1], reverse=True)) )\
    .sortByKey()
for i in productMap.take(3): print(i)

(2, [299.99, 209.99, 199.99, 199.99, 139.99, 139.99, 134.99, 129.99, 129.99, 129.99, 129.99, 129.99, 124.99, 99.99, 89.99, 89.99, 89.99, 79.99, 59.99, 59.98, 54.99, 50.0, 29.99, 29.97])
(3, [199.99, 189.99, 159.99, 159.99, 149.99, 99.0, 90.0, 90.0, 90.0, 90.0, 90.0, 75.0, 59.99, 59.99, 59.98, 44.99, 39.99, 34.99, 29.99, 29.99, 28.0, 24.97, 21.99, 0.0])
(4, [1799.99, 999.99, 349.98, 309.99, 299.99, 299.99, 299.98, 249.97, 209.99, 199.99, 179.97, 179.97, 159.99, 149.99, 99.95, 99.0, 79.99, 69.99, 59.98, 39.99, 29.99, 29.99, 28.0, 21.99])


In [0]:
# Get Name of 3 top ordered products
productsMap = productsRdd.filter(lambda product: product.split(",")[4] != "")\
    .map(lambda product : (int(product.split(",")[0]), product.split(",")[2]))

ordersMap = ordersRdd.map(lambda order : (int(order.split(",")[2]), 1))\
    .reduceByKey(lambda x,y : x+y)

productJoinOrders = productsMap.join(ordersMap)\
    .takeOrdered(num=3, key= lambda x: -x[1][1])
#Using Top
#productJoinOrders = productsMap.join(ordersMap)\
#    .top(num=3, key= lambda x: x[1][1])

for i in productJoinOrders:print(i[1][0])

adidas Men's Germany Home Soccer Jersey
Fitness Gear Pro Olympic Bench
Hirzl Women's Trust Feel Golf Glove


In [0]:
#Get top N(=3) products by price per category (1,product)
productsMap = productsRdd.filter(lambda product: product.split(",")[4] != "")\
    .map(lambda product : (int(product.split(",")[1]), product))\
    .groupByKey()\
    .map(lambda product : (product[0], sorted(product[1],key=lambda x : float(x.split(",")[4]), reverse=True)))\
    .map(lambda product :  (product[0], product[1][:3]))
for i in productsMap.take(3):print(i)

(2, ['16,2,Riddell Youth 360 Custom Football Helmet,,299.99,http://images.acmesports.sports/Riddell+Youth+360+Custom+Football+Helmet', '11,2,Fitness Gear 300 lb Olympic Weight Set,,209.99,http://images.acmesports.sports/Fitness+Gear+300+lb+Olympic+Weight+Set', '5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet'])
(4, ['66,4,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill', '60,4,SOLE E25 Elliptical,,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical', '71,4,Diamondback Adult Response XE Mountain Bike 2,,349.98,http://images.acmesports.sports/Diamondback+Adult+Response+XE+Mountain+Bike+2014'])
(6, ['117,6,YETI Tundra 65 Chest Cooler,,399.99,http://images.acmesports.sports/YETI+Tundra+65+Chest+Cooler', '106,6,Teeter Hang Ups NXT-S Inversion Table,,299.99,http://images.acmesports.sports/Teeter+Hang+Ups+NXT-S+Inversion+Table', '100,6,Quik Shade Summit SX170 1

In [0]:
#Get Top N(5) priced products
def getTopProduct(x):
    return  float(x.split(",")[4])
productsMap = productsRdd.filter(lambda product: product.split(",")[4] != "")\
    .top(5, key=lambda x: getTopProduct(x))
for i in productsMap: print(i)

208,10,SOLE E35 Elliptical,,1999.99,http://images.acmesports.sports/SOLE+E35+Elliptical
66,4,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
199,10,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
496,22,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
1048,47,"Spalding Beast 60"" Glass Portable Basketball ",,1099.99,http://images.acmesports.sports/Spalding+Beast+60%22+Glass+Portable+Basketball+Hoop


In [0]:
#Get products ids sold in 2013-12 and 2014-01
ordersMap = ordersRdd.filter(lambda order:  order.split(",")[1][:7] in ("2013-12", "2014-01") and order.split(",")[3] == "COMPLETE")\
    .map(lambda order : order.split(",")[2])
for i in ordersMap.take(3):print(i)

383
9720
10118
