# 5.1 Pair RDD Examples

In [1]:
import findspark 
findspark.init()

In [2]:
from pyspark.sql import SparkSession

In [3]:
pyspark = SparkSession.builder \
.master("local[4]")\
.appName("PairRDD")\
.config("spark.executer.memory","2g")\
.config("spark.driver.memory","2g")\
.getOrCreate()

In [4]:
retailData = sc.textFile("data/OnlineRetail.csv")

In [5]:
retailData.take(10)

['InvoiceNo;StockCode;Description;Quantity;InvoiceDate;UnitPrice;CustomerID;Country',
 '536365;85123A;WHITE HANGING HEART T-LIGHT HOLDER;6;1.12.2010 08:26;2,55;17850;United Kingdom',
 '536365;71053;WHITE METAL LANTERN;6;1.12.2010 08:26;3,39;17850;United Kingdom',
 '536365;84406B;CREAM CUPID HEARTS COAT HANGER;8;1.12.2010 08:26;2,75;17850;United Kingdom',
 '536365;84029G;KNITTED UNION FLAG HOT WATER BOTTLE;6;1.12.2010 08:26;3,39;17850;United Kingdom',
 '536365;84029E;RED WOOLLY HOTTIE WHITE HEART.;6;1.12.2010 08:26;3,39;17850;United Kingdom',
 '536365;22752;SET 7 BABUSHKA NESTING BOXES;2;1.12.2010 08:26;7,65;17850;United Kingdom',
 '536365;21730;GLASS STAR FROSTED T-LIGHT HOLDER;6;1.12.2010 08:26;4,25;17850;United Kingdom',
 '536366;22633;HAND WARMER UNION JACK;6;1.12.2010 08:28;1,85;17850;United Kingdom',
 '536366;22632;HAND WARMER RED POLKA DOT;6;1.12.2010 08:28;1,85;17850;United Kingdom']

### Method 1: Removing first column names

In [6]:
firstline = retailData.first()
firstline = [firstline]

In [7]:
firstlineRdd = sc.parallelize(firstline)

In [8]:
retailWithoutHeader = retailData.subtract(firstlineRdd)

In [9]:
retailWithoutHeader.take(5)

['536366;22632;HAND WARMER RED POLKA DOT;6;1.12.2010 08:28;1,85;17850;United Kingdom',
 '536367;22749;FELTCRAFT PRINCESS CHARLOTTE DOLL;8;1.12.2010 08:34;3,75;13047;United Kingdom',
 '536367;22310;IVORY KNITTED MUG COSY;6;1.12.2010 08:34;1,65;13047;United Kingdom',
 '536370;22900;SET 2 TEA TOWELS I LOVE LONDON;24;1.12.2010 08:45;2,95;12583;France',
 '536372;22633;HAND WARMER UNION JACK;6;1.12.2010 09:01;1,85;17850;United Kingdom']

### Method 2: Removing first column names

In [10]:
retailData = retailData.filter(lambda x: "InvoiceNo" not in x)

In [11]:
retailData.take(5)

['536365;85123A;WHITE HANGING HEART T-LIGHT HOLDER;6;1.12.2010 08:26;2,55;17850;United Kingdom',
 '536365;71053;WHITE METAL LANTERN;6;1.12.2010 08:26;3,39;17850;United Kingdom',
 '536365;84406B;CREAM CUPID HEARTS COAT HANGER;8;1.12.2010 08:26;2,75;17850;United Kingdom',
 '536365;84029G;KNITTED UNION FLAG HOT WATER BOTTLE;6;1.12.2010 08:26;3,39;17850;United Kingdom',
 '536365;84029E;RED WOOLLY HOTTIE WHITE HEART.;6;1.12.2010 08:26;3,39;17850;United Kingdom']

### [Example]: Filtering of products which quantities are higher than 30

In [12]:
retailWithoutHeader.filter(lambda x: int(x.split(";")[3]) > 20).take(5)

['536370;22900;SET 2 TEA TOWELS I LOVE LONDON;24;1.12.2010 08:45;2,95;12583;France',
 '536390;85123A;WHITE HANGING HEART T-LIGHT HOLDER;64;1.12.2010 10:19;2,55;17511;United Kingdom',
 '536394;22866;HAND WARMER SCOTTY DOG DESIGN;96;1.12.2010 10:39;1,85;13408;United Kingdom',
 '536405;20914;SET/5 RED RETROSPOT LID GLASS BOWLS;128;1.12.2010 11:32;2,55;14045;United Kingdom',
 '536409;16238;PARTY TIME PENCIL ERASERS;28;1.12.2010 11:45;0,21;17908;United Kingdom']

### [Example]: Filtering of products which contain COFFEE

In [13]:
retailWithoutHeader.filter(lambda x: "COFFEE" in x.split(";")[2]).take(10)

['537051;22305;COFFEE MUG PINK PAISLEY DESIGN;6;5.12.2010 11:12;2,55;15708;United Kingdom',
 '537137;22300;COFFEE MUG DOG + BALL DESIGN;1;5.12.2010 12:43;2,55;16327;United Kingdom',
 '537231;22304;COFFEE MUG BLUE PAISLEY DESIGN;6;6.12.2010 09:21;2,55;13652;United Kingdom',
 '537463;22301;COFFEE MUG CAT + BIRD DESIGN;18;7.12.2010 10:08;2,55;12681;France',
 '537595;21216;SET 3 RETROSPOT TEA,COFFEE,SUGAR;1;7.12.2010 12:29;4,95;13569;United Kingdom',
 '537604;22304;COFFEE MUG BLUE PAISLEY DESIGN;12;7.12.2010 13:01;2,55;13488;United Kingdom',
 '537638;37370;RETRO COFFEE MUGS ASSORTED;1;7.12.2010 15:28;16,13;000000;United Kingdom',
 '537641;37370;RETRO COFFEE MUGS ASSORTED;1;7.12.2010 15:32;16,13;000000;United Kingdom',
 '537643;22301;COFFEE MUG CAT + BIRD DESIGN;2;7.12.2010 15:34;5,06;000000;United Kingdom',
 '537666;37370;RETRO COFFEE MUGS ASSORTED;2;7.12.2010 18:36;16,13;000000;United Kingdom']

### [Example]: Filtering of products by Quantity and COFFEE

#### Without using Function

In [14]:
retailWithoutHeader.filter(lambda x: ("COFFEE" in x.split(";")[2]) & (int(x.split(";")[3])> 20)).take(5)

['538656;37370;RETRO COFFEE MUGS ASSORTED;30;13.12.2010 15:20;1,25;17371;United Kingdom',
 '540316;37342;POLKADOT COFFEE CUP & SAUCER PINK;24;6.01.2011 13:00;0,85;16676;United Kingdom',
 '540459;22971;QUEENS GUARD COFFEE MUG;36;7.01.2011 12:35;2,1;16191;United Kingdom',
 '541220;22305;COFFEE MUG PINK PAISLEY DESIGN;216;14.01.2011 14:11;2,1;14156;EIRE',
 '543594;22303;COFFEE MUG APPLES DESIGN;36;10.02.2011 12:15;2,1;18093;United Kingdom']

#### By using function

In [15]:
def filtering(x):
    quantity = int(x.split(";")[3])
    description = x.split(";")[2]
    
    return (quantity > 20) & ("COFFEE" in description)

In [16]:
retailWithoutHeader.filter(lambda x: filtering(x)).take(5)

['538656;37370;RETRO COFFEE MUGS ASSORTED;30;13.12.2010 15:20;1,25;17371;United Kingdom',
 '540316;37342;POLKADOT COFFEE CUP & SAUCER PINK;24;6.01.2011 13:00;0,85;16676;United Kingdom',
 '540459;22971;QUEENS GUARD COFFEE MUG;36;7.01.2011 12:35;2,1;16191;United Kingdom',
 '541220;22305;COFFEE MUG PINK PAISLEY DESIGN;216;14.01.2011 14:11;2,1;14156;EIRE',
 '543594;22303;COFFEE MUG APPLES DESIGN;36;10.02.2011 12:15;2,1;18093;United Kingdom']

### [Example]: Cancelled total order prices

#### Without using function

In [17]:
cancelledOrder = retailWithoutHeader.filter(lambda x: x.split(";")[0].startswith("C"))
cancelledOrder.take(5)

['C536548;22631;CIRCUS PARADE LUNCH BOX;-1;1.12.2010 14:33;1,95;12472;Germany',
 'C536548;22580;ADVENT CALENDAR GINGHAM SACK;-4;1.12.2010 14:33;5,95;12472;Germany',
 'C536606;20914;SET/5 RED RETROSPOT LID GLASS BOWLS;-2;2.12.2010 09:10;2,95;14092;United Kingdom',
 'C536807;22778;GLASS CLOCHE SMALL;-1;2.12.2010 16:45;3,95;15834;United Kingdom',
 'C536825;22197;SMALL POPCORN HOLDER;-2;2.12.2010 17:27;0,85;15384;United Kingdom']

#### By using function
<font size=3>cancelled_price returns finds cancelled orders and total prices</font>

In [18]:
def cancelled_price(line):
    is_cancelled = True if (line.split(";")[0].startswith("C")) else False
    quantity = float(line.split(";")[3])
    price = float(line.split(";")[5].replace(",","."))
    total = quantity * price
    return (is_cancelled, total)

#### Every cancelled orders are mapped with its price and status


In [19]:
cancelled_total = retailWithoutHeader.map(cancelled_price)

In [20]:
cancelled_total.take(5)

[(False, 11.100000000000001),
 (False, 30.0),
 (False, 9.899999999999999),
 (False, 70.80000000000001),
 (False, 11.100000000000001)]

#### Finding summation of cancelled costs by using reduceByKey()

In [21]:
cancelled_total_reduce = cancelled_total.reduceByKey(lambda x,y: x+y)

In [22]:
cancelled_total_reduce.take(5)

[(False, 10644560.424000185), (True, -896812.4899999995)]

#### Finding Total Cancelled Cost

In [23]:
total = cancelled_total_reduce.filter(lambda x: x[0] == True).map(lambda x: x[1]).take(1)
print("Total Cancelled Cost: ", total[0])

Total Cancelled Cost:  -896812.4899999995


### [Example]: Calculation of Average Film Length according to Genre by using mapValues() and reduceByKey()

In [24]:
filmRdd = sc.textFile("data/film_data.csv")

In [25]:
filmRdd.take(3)

['Name,Genre,Length,Score,Country,Year,Budget',
 'stand by Me,Adventure,89,8.1,USA,1986,8000000',
 "ferris Bueller's Day Off,Comedy,103,7.8,USA,1986,6000000"]

In [26]:
filmRdd = filmRdd.filter(lambda x: "Name" not in x)

In [27]:
filmRdd.take(3)

['stand by Me,Adventure,89,8.1,USA,1986,8000000',
 "ferris Bueller's Day Off,Comedy,103,7.8,USA,1986,6000000",
 'Top Gun,Action,110,6.9,USA,1986,15000000']

#### --> By using function

In [28]:
def length_genre(line):
    genre = line.split(",")[1]
    length = float(line.split(",")[2])
    
    return (genre,length)

In [29]:
genre_length_pairRdd = filmRdd.map(length_genre)

In [30]:
genre_length_pairRdd.take(5)

[('Adventure', 89.0),
 ('Comedy', 103.0),
 ('Action', 110.0),
 ('Action', 137.0),
 ('Adventure', 90.0)]

#### --> Without using function

In [31]:
genre_length_pairRdd = filmRdd.map(lambda x: ((x.split(",")[1]) , (float(x.split(",")[2]))))

In [32]:
genre_length_pairRdd.take(5)

[('Adventure', 89.0),
 ('Comedy', 103.0),
 ('Action', 110.0),
 ('Action', 137.0),
 ('Adventure', 90.0)]

### --> Finding total length and movie count by Genre

In [33]:
genre_length = genre_length_pairRdd.mapValues(lambda x: (x,1))
genre_length.take(5)

[('Adventure', (89.0, 1)),
 ('Comedy', (103.0, 1)),
 ('Action', (110.0, 1)),
 ('Action', (137.0, 1)),
 ('Adventure', (90.0, 1))]

In [34]:
genre_length = genre_length.reduceByKey(lambda x,y: (x[0] + y[0] , x[1] + y[1]))

In [35]:
genre_length.collect()

[('Action', (2329.0, 22)),
 ('Drama', (1689.0, 15)),
 ('Thriller', (97.0, 1)),
 ('Horror', (643.0, 7)),
 ('Biography', (195.0, 2)),
 ('Sci-Fi', (117.0, 1)),
 ('Adventure', (1154.0, 11)),
 ('Comedy', (2982.0, 30)),
 ('Crime', (543.0, 5)),
 ('Animation', (318.0, 4))]

### [Example]: Finding average movie duration by Genre

In [36]:
average = genre_length.mapValues(lambda x: x[0] / x[1])
average.collect()

[('Action', 105.86363636363636),
 ('Drama', 112.6),
 ('Thriller', 97.0),
 ('Horror', 91.85714285714286),
 ('Biography', 97.5),
 ('Sci-Fi', 117.0),
 ('Adventure', 104.9090909090909),
 ('Comedy', 99.4),
 ('Crime', 108.6),
 ('Animation', 79.5)]

### [Example]: The longest movie duration by Genre

In [37]:
average = average.map(lambda x: (x[1], x[0])).sortByKey(False)
average.map(lambda x: (x[1], x[0])).take(10)

[('Sci-Fi', 117.0),
 ('Drama', 112.6),
 ('Crime', 108.6),
 ('Action', 105.86363636363636),
 ('Adventure', 104.9090909090909),
 ('Comedy', 99.4),
 ('Biography', 97.5),
 ('Thriller', 97.0),
 ('Horror', 91.85714285714286),
 ('Animation', 79.5)]

#  Joining of two RDD by PrimaryKey and ForeignKey
## [Example]: Joining of OrderItems and Products tables

In [38]:
order_items = sc.textFile("data/retail_db/order_items.csv")
products = sc.textFile("data/retail_db/products.csv")

### --> Data with column names

In [39]:
order_items.take(5)

['orderItemName,orderItemOrderId,orderItemProductId,orderItemQuantity,orderItemSubTotal,orderItemProductPrice',
 '1,1,957,1,299.98,299.98',
 '2,2,1073,1,199.99,199.99',
 '3,2,502,5,250.0,50.0',
 '4,2,403,1,129.99,129.99']

In [40]:
products.take(5)

['productId,productCategoryId,productName,productDescription,productPrice,productImage',
 '1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy',
 "2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat",
 "3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat",
 "4,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat"]

### --> Removing column names by using filter()

In [41]:
order_items_rdd = order_items.filter(lambda x: "orderItemName" not in x)
products_rdd = products.filter(lambda x: "productCategoryId" not in x)

In [42]:
products.take(3)

['productId,productCategoryId,productName,productDescription,productPrice,productImage',
 '1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy',
 "2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat"]

In [43]:
order_items.take(3)

['orderItemName,orderItemOrderId,orderItemProductId,orderItemQuantity,orderItemSubTotal,orderItemProductPrice',
 '1,1,957,1,299.98,299.98',
 '2,2,1073,1,199.99,199.99']

## --> Converting loaded data to Pair RDD

In [44]:
def makeOrderItemsPairRddFunc(line):
    orderItemName = line.split(",")[0]
    orderItemOrderId = line.split(",")[1]
    orderItemProductId = line.split(",")[2]
    orderItemQuantity = line.split(",")[3]
    orderItemSubTotal = line.split(",")[4]
    orderItemProductPrice = line.split(",")[5]
    
    return (orderItemProductId, (orderItemName, orderItemOrderId, orderItemQuantity, orderItemSubTotal, orderItemProductPrice))

In [45]:
order_items_pairRDD = order_items_rdd.map(makeOrderItemsPairRddFunc)

In [46]:
orderItemRddWithoutFunction = order_items_rdd.map(lambda line: (line.split(",")[2],\
                                                                (line.split(",")[0],\
                                                                 line.split(",")[1],\
                                                                 line.split(",")[3],\
                                                                 line.split(",")[4],\
                                                                 line.split(",")[5])))

In [47]:
order_items_pairRDD.take(10)

[('957', ('1', '1', '1', '299.98', '299.98')),
 ('1073', ('2', '2', '1', '199.99', '199.99')),
 ('502', ('3', '2', '5', '250.0', '50.0')),
 ('403', ('4', '2', '1', '129.99', '129.99')),
 ('897', ('5', '4', '2', '49.98', '24.99')),
 ('365', ('6', '4', '5', '299.95', '59.99')),
 ('502', ('7', '4', '3', '150.0', '50.0')),
 ('1014', ('8', '4', '4', '199.92', '49.98')),
 ('957', ('9', '5', '1', '299.98', '299.98')),
 ('365', ('10', '5', '5', '299.95', '59.99'))]

In [48]:
orderItemRddWithoutFunction.take(10)

[('957', ('1', '1', '1', '299.98', '299.98')),
 ('1073', ('2', '2', '1', '199.99', '199.99')),
 ('502', ('3', '2', '5', '250.0', '50.0')),
 ('403', ('4', '2', '1', '129.99', '129.99')),
 ('897', ('5', '4', '2', '49.98', '24.99')),
 ('365', ('6', '4', '5', '299.95', '59.99')),
 ('502', ('7', '4', '3', '150.0', '50.0')),
 ('1014', ('8', '4', '4', '199.92', '49.98')),
 ('957', ('9', '5', '1', '299.98', '299.98')),
 ('365', ('10', '5', '5', '299.95', '59.99'))]

In [49]:
def makeProductPairRddFunc(line):
    productId = line.split(",")[0]
    productCategoryId = line.split(",")[1]
    productName = line.split(",")[2]
    productDescription = line.split(",")[3]
    productPrice = line.split(",")[4]
    productImage = line.split(",")[5]
    
    return (productId, (productCategoryId, productName,productDescription, productPrice, productImage))

In [50]:
products_pairRdd = products_rdd.map(makeProductPairRddFunc)

In [51]:
products_pairRdd.take(3)

[('1',
  ('2',
   'Quest Q64 10 FT. x 10 FT. Slant Leg Instant U',
   '',
   '59.98',
   'http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy')),
 ('2',
  ('2',
   "Under Armour Men's Highlight MC Football Clea",
   '',
   '129.99',
   'http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat')),
 ('3',
  ('2',
   "Under Armour Men's Renegade D Mid Football Cl",
   '',
   '89.99',
   'http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat'))]

## --> Join OrderItems and Product Pair RDDs

In [52]:
orderItemsProductsPairRdd = order_items_pairRDD.join(products_pairRdd)

In [53]:
orderItemsProductsPairRdd.take(3)

[('957',
  (('1', '1', '1', '299.98', '299.98'),
   ('43',
    "Diamondback Women's Serene Classic Comfort Bi",
    '',
    '299.98',
    'http://images.acmesports.sports/Diamondback+Women%27s+Serene+Classic+Comfort+Bike+2014'))),
 ('957',
  (('9', '5', '1', '299.98', '299.98'),
   ('43',
    "Diamondback Women's Serene Classic Comfort Bi",
    '',
    '299.98',
    'http://images.acmesports.sports/Diamondback+Women%27s+Serene+Classic+Comfort+Bike+2014'))),
 ('957',
  (('12', '5', '1', '299.98', '299.98'),
   ('43',
    "Diamondback Women's Serene Classic Comfort Bi",
    '',
    '299.98',
    'http://images.acmesports.sports/Diamondback+Women%27s+Serene+Classic+Comfort+Bike+2014')))]

### [Example]: Total Product count

In [54]:
count = products_pairRdd.count()
print("Total Products: ", count)

Total Products:  1345


###  [Example]: Total OrderItems count

In [55]:
count = orderItemsProductsPairRdd.count()
print("Total Orders Items: ", count)

Total Orders Items:  172198


In [56]:
newOrderItemsProductsPairRdd = orderItemsProductsPairRdd.map(lambda x: (x[0],x[1][0],x[1][1]))

In [57]:
newOrderItemsProductsPairRdd.take(3)

[('957',
  ('1', '1', '1', '299.98', '299.98'),
  ('43',
   "Diamondback Women's Serene Classic Comfort Bi",
   '',
   '299.98',
   'http://images.acmesports.sports/Diamondback+Women%27s+Serene+Classic+Comfort+Bike+2014')),
 ('957',
  ('9', '5', '1', '299.98', '299.98'),
  ('43',
   "Diamondback Women's Serene Classic Comfort Bi",
   '',
   '299.98',
   'http://images.acmesports.sports/Diamondback+Women%27s+Serene+Classic+Comfort+Bike+2014')),
 ('957',
  ('12', '5', '1', '299.98', '299.98'),
  ('43',
   "Diamondback Women's Serene Classic Comfort Bi",
   '',
   '299.98',
   'http://images.acmesports.sports/Diamondback+Women%27s+Serene+Classic+Comfort+Bike+2014'))]

###  [Example]: Taking particular columns from joined two RDD

In [58]:
convertToTuple = newOrderItemsProductsPairRdd.map(lambda x:\
                                 (int(x[1][1]),\
                                  int(x[0]),\
                                  x[2][1],\
                                  float(x[2][3]),\
                                  int(x[1][2]),\
                                  float(x[1][3])))

In [59]:
print("(orderItemsId, productId, productName, productPrice, quantity, subTotalPrice)")
convertToTuple.take(10)

(orderItemsId, productId, productName, productPrice, quantity, subTotalPrice)


[(1, 957, "Diamondback Women's Serene Classic Comfort Bi", 299.98, 1, 299.98),
 (5, 957, "Diamondback Women's Serene Classic Comfort Bi", 299.98, 1, 299.98),
 (5, 957, "Diamondback Women's Serene Classic Comfort Bi", 299.98, 1, 299.98),
 (7, 957, "Diamondback Women's Serene Classic Comfort Bi", 299.98, 1, 299.98),
 (12, 957, "Diamondback Women's Serene Classic Comfort Bi", 299.98, 1, 299.98),
 (19, 957, "Diamondback Women's Serene Classic Comfort Bi", 299.98, 1, 299.98),
 (23, 957, "Diamondback Women's Serene Classic Comfort Bi", 299.98, 1, 299.98),
 (28, 957, "Diamondback Women's Serene Classic Comfort Bi", 299.98, 1, 299.98),
 (28, 957, "Diamondback Women's Serene Classic Comfort Bi", 299.98, 1, 299.98),
 (34, 957, "Diamondback Women's Serene Classic Comfort Bi", 299.98, 1, 299.98)]

### [Example]: The most common products in the shopping cart.

In [60]:
convertToTuple.map(lambda x: (x[2],1)).reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1], ascending=False).take(10)

[('Perfect Fitness Perfect Rip Deck', 24515),
 ("Nike Men's CJ Elite 2 TD Football Cleat", 22246),
 ("Nike Men's Dri-FIT Victory Golf Polo", 21035),
 ("O'Brien Men's Neoprene Life Vest", 19298),
 ('Field & Stream Sportsman 16 Gun Fire Safe', 17325),
 ('Pelican Sunstream 100 Kayak', 15500),
 ("Diamondback Women's Serene Classic Comfort Bi", 13729),
 ("Nike Men's Free 5.0+ Running Shoe", 12169),
 ("Under Armour Girls' Toddler Spine Surge Runni", 10617),
 ("Nike Men's Comfort 2 Slide", 328)]

### [Example]: The least visible products in the shopping cart.

In [61]:
convertToTuple.map(lambda x: (x[2],1)).reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1], ascending=True).take(10)

[('SOLE E25 Elliptical', 10),
 ('Bowflex SelectTech 1090 Dumbbells', 10),
 ('Bushnell Pro X7 Jolt Slope Rangefinder', 11),
 ('SOLE E35 Elliptical', 15),
 ('Stiga Master Series ST3100 Competition Indoor', 27),
 ("Diamondback Girls' Clarity 24 Hybrid Bike 201", 28),
 ("Diamondback Boys' Insight 24 Performance Hybr", 29),
 ('GoPro HERO3+ Black Edition Camera', 32),
 ('Titleist Club Glove Travel Cover', 34),
 ('Garmin Forerunner 910XT GPS Watch', 35)]

### [Example]: The most sold product (best-seller)

In [62]:
print("(productName, quantity)")
convertToTuple.map(lambda x: (x[2],x[4])).reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1], ascending=False).take(10)

(productName, quantity)


[('Perfect Fitness Perfect Rip Deck', 73698),
 ("Nike Men's Dri-FIT Victory Golf Polo", 62956),
 ("O'Brien Men's Neoprene Life Vest", 57803),
 ("Nike Men's Free 5.0+ Running Shoe", 36680),
 ("Under Armour Girls' Toddler Spine Surge Runni", 31735),
 ("Nike Men's CJ Elite 2 TD Football Cleat", 22246),
 ('Field & Stream Sportsman 16 Gun Fire Safe', 17325),
 ('Pelican Sunstream 100 Kayak', 15500),
 ("Diamondback Women's Serene Classic Comfort Bi", 13729),
 ('ENO Atlas Hammock Straps', 998)]

### [Example]: The least sold product 

In [63]:
print("(productName, quantity)")
convertToTuple.map(lambda x: (x[2],x[4])).reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1], ascending=True).take(10)

(productName, quantity)


[('SOLE E25 Elliptical', 10),
 ('Bowflex SelectTech 1090 Dumbbells', 10),
 ('Bushnell Pro X7 Jolt Slope Rangefinder', 11),
 ('SOLE E35 Elliptical', 15),
 ('Stiga Master Series ST3100 Competition Indoor', 27),
 ("Diamondback Girls' Clarity 24 Hybrid Bike 201", 28),
 ("Diamondback Boys' Insight 24 Performance Hybr", 29),
 ('GoPro HERO3+ Black Edition Camera', 32),
 ('Titleist Club Glove Travel Cover', 34),
 ('Garmin Forerunner 910XT GPS Watch', 35)]

### [Example]: The highest 10 SubTotal Prices

In [64]:
print("(OrderItemsOrderId,ProductId,ProductName,ProductPrice,OrderItemQuantity,OrderItemSubTotal)")
convertToTuple.sortBy(lambda x: x[5], ascending = False).take(10)

(OrderItemsOrderId,ProductId,ProductName,ProductPrice,OrderItemQuantity,OrderItemSubTotal)


[(68703, 208, 'SOLE E35 Elliptical', 1999.99, 1, 1999.99),
 (68722, 208, 'SOLE E35 Elliptical', 1999.99, 1, 1999.99),
 (68724, 208, 'SOLE E35 Elliptical', 1999.99, 1, 1999.99),
 (68736, 208, 'SOLE E35 Elliptical', 1999.99, 1, 1999.99),
 (68766, 208, 'SOLE E35 Elliptical', 1999.99, 1, 1999.99),
 (68778, 208, 'SOLE E35 Elliptical', 1999.99, 1, 1999.99),
 (68806, 208, 'SOLE E35 Elliptical', 1999.99, 1, 1999.99),
 (68809, 208, 'SOLE E35 Elliptical', 1999.99, 1, 1999.99),
 (68821, 208, 'SOLE E35 Elliptical', 1999.99, 1, 1999.99),
 (68837, 208, 'SOLE E35 Elliptical', 1999.99, 1, 1999.99)]

### [Example]: The lowest 10 SubTotal Prices

In [65]:
print("(OrderItemsOrderId,ProductId,ProductName,ProductPrice,OrderItemQuantity,OrderItemSubTotal)")
convertToTuple.sortBy(lambda x: x[5], ascending = True).take(10)

(OrderItemsOrderId,ProductId,ProductName,ProductPrice,OrderItemQuantity,OrderItemSubTotal)


[(234, 775, 'Clicgear 8.0 Shoe Brush', 9.99, 1, 9.99),
 (1944, 775, 'Clicgear 8.0 Shoe Brush', 9.99, 1, 9.99),
 (2023, 775, 'Clicgear 8.0 Shoe Brush', 9.99, 1, 9.99),
 (2277, 775, 'Clicgear 8.0 Shoe Brush', 9.99, 1, 9.99),
 (5404, 775, 'Clicgear 8.0 Shoe Brush', 9.99, 1, 9.99),
 (5557, 775, 'Clicgear 8.0 Shoe Brush', 9.99, 1, 9.99),
 (5597, 775, 'Clicgear 8.0 Shoe Brush', 9.99, 1, 9.99),
 (6177, 775, 'Clicgear 8.0 Shoe Brush', 9.99, 1, 9.99),
 (7361, 775, 'Clicgear 8.0 Shoe Brush', 9.99, 1, 9.99),
 (7387, 775, 'Clicgear 8.0 Shoe Brush', 9.99, 1, 9.99)]

### [Example]: Calculating Total Price from SubTotals according to OrderId

In [66]:
SubTotalByOrderId = convertToTuple.map(lambda x: (x[0], x[5]))
SubTotalByOrderId.take(10)

[(1, 299.98),
 (5, 299.98),
 (5, 299.98),
 (7, 299.98),
 (12, 299.98),
 (19, 299.98),
 (23, 299.98),
 (28, 299.98),
 (28, 299.98),
 (34, 299.98)]

In [67]:
sumCount = SubTotalByOrderId.combineByKey(lambda value: (value, 1),
                            lambda x, value: (x[0] + value, x[1] + 1),
                            lambda x, y: (x[0] + y[0], x[1] + y[1]))

In [68]:
print("(OrderId,(TotalPrice, CountSubTotal))")
sumCount.take(10)

(OrderId,(TotalPrice, CountSubTotal))


[(12, (1299.8700000000001, 5)),
 (28, (1159.9, 5)),
 (36, (799.96, 3)),
 (56, (699.89, 3)),
 (68, (299.98, 1)),
 (96, (529.9300000000001, 3)),
 (104, (549.95, 3)),
 (116, (795.9100000000001, 5)),
 (136, (299.98, 1)),
 (140, (1249.9, 5))]

In [69]:
sumCount.count()

57431

### [Example]: Average Price by OrderId

In [70]:
sumCount.mapValues(lambda x: x[0]/x[1]).take(10)

[(12, 259.97400000000005),
 (28, 231.98000000000002),
 (36, 266.65333333333336),
 (56, 233.29666666666665),
 (68, 299.98),
 (96, 176.64333333333335),
 (104, 183.3166666666667),
 (116, 159.18200000000002),
 (136, 299.98),
 (140, 249.98000000000002)]

### [Example]: Order Table Loaded
Load and Convert to Pair RDD

In [71]:
orders = sc.textFile("data/retail_db/orders.csv")
order_rdd = orders.filter(lambda x: "orderId" not in x)
order_rdd.take(10)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE',
 '6,2013-07-25 00:00:00.0,7130,COMPLETE',
 '7,2013-07-25 00:00:00.0,4530,COMPLETE',
 '8,2013-07-25 00:00:00.0,2911,PROCESSING',
 '9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT',
 '10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT']

In [72]:
def makeOrderPairRddFunc(line):
    orderId = int(line.split(",")[0])
    orderDate = line.split(",")[1]
    orderCustomerId = int(line.split(",")[2])
    orderStatus = line.split(",")[3]
    
    return (orderId, (orderDate, orderCustomerId, orderStatus))

order_pairRdd = order_rdd.map(makeOrderPairRddFunc)
order_pairRdd.take(10)

[(1, ('2013-07-25 00:00:00.0', 11599, 'CLOSED')),
 (2, ('2013-07-25 00:00:00.0', 256, 'PENDING_PAYMENT')),
 (3, ('2013-07-25 00:00:00.0', 12111, 'COMPLETE')),
 (4, ('2013-07-25 00:00:00.0', 8827, 'CLOSED')),
 (5, ('2013-07-25 00:00:00.0', 11318, 'COMPLETE')),
 (6, ('2013-07-25 00:00:00.0', 7130, 'COMPLETE')),
 (7, ('2013-07-25 00:00:00.0', 4530, 'COMPLETE')),
 (8, ('2013-07-25 00:00:00.0', 2911, 'PROCESSING')),
 (9, ('2013-07-25 00:00:00.0', 5657, 'PENDING_PAYMENT')),
 (10, ('2013-07-25 00:00:00.0', 5648, 'PENDING_PAYMENT'))]

### [Example]: Daily number of orders

In [73]:
order_pairRdd.map(lambda x: (x[1][0],1)).reduceByKey(lambda x,y: x+y).take(10)

[('2013-07-26 00:00:00.0', 269),
 ('2013-07-27 00:00:00.0', 202),
 ('2013-07-28 00:00:00.0', 187),
 ('2013-07-29 00:00:00.0', 253),
 ('2013-07-31 00:00:00.0', 252),
 ('2013-08-05 00:00:00.0', 153),
 ('2013-08-06 00:00:00.0', 258),
 ('2013-08-08 00:00:00.0', 154),
 ('2013-08-09 00:00:00.0', 125),
 ('2013-08-10 00:00:00.0', 270)]

### [Example]: The highest 10 number of sales days

In [74]:
order_pairRdd.map(lambda x: (x[1][0],1)).reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1], ascending = False).take(10)

[('2013-11-03 00:00:00.0', 347),
 ('2013-11-24 00:00:00.0', 292),
 ('2013-11-14 00:00:00.0', 287),
 ('2013-10-04 00:00:00.0', 287),
 ('2013-12-26 00:00:00.0', 286),
 ('2014-07-20 00:00:00.0', 285),
 ('2014-01-11 00:00:00.0', 281),
 ('2013-11-05 00:00:00.0', 278),
 ('2014-02-01 00:00:00.0', 278),
 ('2013-09-25 00:00:00.0', 277)]

### [Example]: The lowest 10 number of sales days

In [75]:
order_pairRdd.map(lambda x: (x[1][0],1)).reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1], ascending = True).take(10)

[('2013-08-13 00:00:00.0', 73),
 ('2014-05-27 00:00:00.0', 85),
 ('2014-03-17 00:00:00.0', 91),
 ('2014-06-21 00:00:00.0', 92),
 ('2013-08-19 00:00:00.0', 93),
 ('2013-09-04 00:00:00.0', 96),
 ('2014-03-29 00:00:00.0', 96),
 ('2014-05-19 00:00:00.0', 97),
 ('2013-12-02 00:00:00.0', 99),
 ('2014-05-08 00:00:00.0', 99)]

### [Example]: Number of Order Statuses

#### --> Mapping OrderStatus and 1

In [76]:
order_status = order_pairRdd.map(lambda x: (x[1][2],1))
order_status.take(10)

[('CLOSED', 1),
 ('PENDING_PAYMENT', 1),
 ('COMPLETE', 1),
 ('CLOSED', 1),
 ('COMPLETE', 1),
 ('COMPLETE', 1),
 ('COMPLETE', 1),
 ('PROCESSING', 1),
 ('PENDING_PAYMENT', 1),
 ('PENDING_PAYMENT', 1)]

In [77]:
order_status.reduceByKey(lambda key,count: key + count).sortBy(lambda x: x[1], ascending=False).take(10)

[('COMPLETE', 22899),
 ('PENDING_PAYMENT', 15030),
 ('PROCESSING', 8275),
 ('PENDING', 7610),
 ('CLOSED', 7556),
 ('ON_HOLD', 3798),
 ('SUSPECTED_FRAUD', 1558),
 ('CANCELED', 1428),
 ('PAYMENT_REVIEW', 729)]