# Install Java, Spark, and Findspark
This installs Apache Spark 2.4, Java 8, and [Findspark](https://github.com/minrk/findspark), a library that makes it easy for Python to find Spark.

In [126]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
!tar -xvf spark-2.4.6-bin-hadoop2.7.tgz
!pip install -q findspark

spark-2.4.6-bin-hadoop2.7/
spark-2.4.6-bin-hadoop2.7/bin/
spark-2.4.6-bin-hadoop2.7/bin/pyspark.cmd
spark-2.4.6-bin-hadoop2.7/bin/spark-submit
spark-2.4.6-bin-hadoop2.7/bin/spark-submit.cmd
spark-2.4.6-bin-hadoop2.7/bin/spark-class2.cmd
spark-2.4.6-bin-hadoop2.7/bin/spark-shell2.cmd
spark-2.4.6-bin-hadoop2.7/bin/pyspark2.cmd
spark-2.4.6-bin-hadoop2.7/bin/docker-image-tool.sh
spark-2.4.6-bin-hadoop2.7/bin/run-example.cmd
spark-2.4.6-bin-hadoop2.7/bin/spark-submit2.cmd
spark-2.4.6-bin-hadoop2.7/bin/beeline.cmd
spark-2.4.6-bin-hadoop2.7/bin/beeline
spark-2.4.6-bin-hadoop2.7/bin/spark-shell
spark-2.4.6-bin-hadoop2.7/bin/find-spark-home
spark-2.4.6-bin-hadoop2.7/bin/sparkR2.cmd
spark-2.4.6-bin-hadoop2.7/bin/find-spark-home.cmd
spark-2.4.6-bin-hadoop2.7/bin/sparkR
spark-2.4.6-bin-hadoop2.7/bin/spark-class
spark-2.4.6-bin-hadoop2.7/bin/spark-sql2.cmd
spark-2.4.6-bin-hadoop2.7/bin/load-spark-env.cmd
spark-2.4.6-bin-hadoop2.7/bin/run-example
spark-2.4.6-bin-hadoop2.7/bin/spark-sql
spark-2.4.6-b

# Set Environment Variables
Set the locations where Spark and Java are installed.

In [127]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"
from collections import namedtuple

# Start a SparkSession
This will start a local Spark session.


In [128]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
from pyspark.sql import Row
from pyspark import *

In [129]:
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])
df.show(3)

+-----+
|hello|
+-----+
|world|
|world|
|world|
+-----+
only showing top 3 rows





In [130]:
def print_top10(rdd):
  for i in rdd.take(10):
    print(i)

# Data Download

The data of https://github.com/dgadiraju/retail_db.git will be used for demo.

In [131]:
!git clone https://github.com/dgadiraju/retail_db.git

fatal: destination path 'retail_db' already exists and is not an empty directory.


In [132]:
!ls ./retail_db/products -l

total 172
-rw-r--r-- 1 root root 174155 Jul  3 08:31 part-00000


In [133]:
sc=spark.sparkContext

# Data Preparation
<Font color=red>**Please don't spend too much time on RDD, which is not required for the exam CCA Spark and Hadoop Developer Exam (CCA175).**</Font>

## Data Preparation - Products

In [134]:
products_txt=sc.textFile("././retail_db/products/part-00000")

In [135]:
products_txt.count()

1345

In [136]:
for i in products_txt.take(10):
  print(i)
  for j in enumerate(i.split(",")):
   print(j[0],":", j[1])

1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy
0 : 1
1 : 2
2 : Quest Q64 10 FT. x 10 FT. Slant Leg Instant U
3 : 
4 : 59.98
5 : http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy
2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat
0 : 2
1 : 2
2 : Under Armour Men's Highlight MC Football Clea
3 : 
4 : 129.99
5 : http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat
3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat
0 : 3
1 : 2
2 : Under Armour Men's Renegade D Mid Football Cl
3 : 
4 : 89.99
5 : http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat
4,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sp

In [137]:
def isInt(s):
    try: 
        int(s)
        return True
    except ValueError:
        return False

In [138]:
def isValidP(p):
  try:
    product_id=int(p.split(",")[0])
    product_category_id=int(p.split(",")[1])
    product_name="".join(p.split(",")[2:-2])
    # product_description= p.split(",")[3]
    product_price=float(p.split(",")[-2])
    return True
  except ValueError:
      return None

In [139]:
# Just in case some special situations, such as the following
#
# 685,31,"TaylorMade SLDR Irons - (Steel) 4-PW, AW",,899.99,http://images.acmesports.sports/TaylorMade+SLDR+Irons+-+%28Steel%29+4-PW%2C+AW
#
# Where a comma in a string, which will cause a lot of troubles, which are difficult to find.

def safe2Product(p):
  try:
    product_id=int(p.split(",")[0])
    product_category_id=int(p.split(",")[1])
    product_name="".join(p.split(",")[2:-2])
    # product_description= p.split(",")[3]
    product_price=float(p.split(",")[-2])
    return Product(product_id=product_id, product_category_id=product_category_id,product_name=product_name, product_description="",product_price=product_price )
  except ValueError:
      return None

In [140]:
for p in products_txt.take(10):
    product_id=int(p.split(",")[0])
    product_category_id=int(p.split(",")[1])
    product_name="".join(p.split(",")[2:-2])
    # product_description= p.split(",")[3]
    product_price=float(p.split(",")[-2])
    print(product_id,product_category_id,product_name, product_price)

1 2 Quest Q64 10 FT. x 10 FT. Slant Leg Instant U 59.98
2 2 Under Armour Men's Highlight MC Football Clea 129.99
3 2 Under Armour Men's Renegade D Mid Football Cl 89.99
4 2 Under Armour Men's Renegade D Mid Football Cl 89.99
5 2 Riddell Youth Revolution Speed Custom Footbal 199.99
6 2 Jordan Men's VI Retro TD Football Cleat 134.99
7 2 Schutt Youth Recruit Hybrid Custom Football H 99.99
8 2 Nike Men's Vapor Carbon Elite TD Football Cle 129.99
9 2 Nike Adult Vapor Jet 3.0 Receiver Gloves 50.0
10 2 Under Armour Men's Highlight MC Football Clea 129.99


In [141]:
products_invalid=products_txt.filter(lambda p: not isValidP(p))
for i in products_invalid.take(10):
  print(i)
  for j in enumerate(i.split(",")):
   print(j[0],":", j[1])
# print_top10(products_invalid)

In [142]:
Product=namedtuple("Product", "product_id product_category_id product_name product_description product_price")

In [143]:
products=products_txt.map(lambda p: safe2Product(p))

In [144]:
products.count()

1345

In [145]:
print_top10(products.filter(lambda p: p.product_id==975))

Product(product_id=975, product_category_id=44, product_name='Eureka! Tetragon 5 Five Person Tent', product_description='', product_price=129.99)


## Data Preparation - Orders

In [146]:
orders_txt=sc.textFile("././retail_db/orders/part-00000")

In [147]:
from datetime import datetime
for i in orders_txt.take(10):
  print(i)
  for j in enumerate(i.split(",")):
    if (j[0]==1):
      print(j[0],":", datetime.strptime(j[1][:-2], "%Y-%m-%d %H:%M:%S"))

1,2013-07-25 00:00:00.0,11599,CLOSED
1 : 2013-07-25 00:00:00
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
1 : 2013-07-25 00:00:00
3,2013-07-25 00:00:00.0,12111,COMPLETE
1 : 2013-07-25 00:00:00
4,2013-07-25 00:00:00.0,8827,CLOSED
1 : 2013-07-25 00:00:00
5,2013-07-25 00:00:00.0,11318,COMPLETE
1 : 2013-07-25 00:00:00
6,2013-07-25 00:00:00.0,7130,COMPLETE
1 : 2013-07-25 00:00:00
7,2013-07-25 00:00:00.0,4530,COMPLETE
1 : 2013-07-25 00:00:00
8,2013-07-25 00:00:00.0,2911,PROCESSING
1 : 2013-07-25 00:00:00
9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT
1 : 2013-07-25 00:00:00
10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT
1 : 2013-07-25 00:00:00


In [148]:
from collections import namedtuple
Order=namedtuple("Order", "order_id order_date order_customer_id order_status")

In [149]:
orders=orders_txt.map(lambda p: Order(order_id=int(p.split(",")[0]), order_date=datetime.strptime(p.split(",")[1][:-2], "%Y-%m-%d %H:%M:%S"),\
                  order_customer_id=int(p.split(",")[2]),\
                  order_status= p.split(",")[3]))

In [150]:
print_top10(orders)

Order(order_id=1, order_date=datetime.datetime(2013, 7, 25, 0, 0), order_customer_id=11599, order_status='CLOSED')
Order(order_id=2, order_date=datetime.datetime(2013, 7, 25, 0, 0), order_customer_id=256, order_status='PENDING_PAYMENT')
Order(order_id=3, order_date=datetime.datetime(2013, 7, 25, 0, 0), order_customer_id=12111, order_status='COMPLETE')
Order(order_id=4, order_date=datetime.datetime(2013, 7, 25, 0, 0), order_customer_id=8827, order_status='CLOSED')
Order(order_id=5, order_date=datetime.datetime(2013, 7, 25, 0, 0), order_customer_id=11318, order_status='COMPLETE')
Order(order_id=6, order_date=datetime.datetime(2013, 7, 25, 0, 0), order_customer_id=7130, order_status='COMPLETE')
Order(order_id=7, order_date=datetime.datetime(2013, 7, 25, 0, 0), order_customer_id=4530, order_status='COMPLETE')
Order(order_id=8, order_date=datetime.datetime(2013, 7, 25, 0, 0), order_customer_id=2911, order_status='PROCESSING')
Order(order_id=9, order_date=datetime.datetime(2013, 7, 25, 0, 0)

## Data Preparation - Order_Items


In [151]:
order_Items_txt=sc.textFile("././retail_db/order_items/part-00000")

In [152]:
for i in order_Items_txt.take(2):
  print(i)
  for j in enumerate(i.split(",")):
   print(j[0],":", j[1])

1,1,957,1,299.98,299.98
0 : 1
1 : 1
2 : 957
3 : 1
4 : 299.98
5 : 299.98
2,2,1073,1,199.99,199.99
0 : 2
1 : 2
2 : 1073
3 : 1
4 : 199.99
5 : 199.99


In [153]:
from collections import namedtuple

Order_Items=namedtuple("Order_Items", "order_item_id order_item_order_id order_item_product_id order_item_quantity order_item_subtotal order_item_product_price")

In [154]:
order_Items=order_Items_txt.map(lambda p: Order_Items(order_item_id=int(p.split(",")[0]), order_item_order_id=int(p.split(",")[1]),\
                  order_item_product_id=int(p.split(",")[2]),order_item_quantity=int(p.split(",")[3]), \
                  order_item_subtotal=float(p.split(",")[4]),\
                  order_item_product_price=float(p.split(",")[5])))


In [155]:
print_top10(order_Items)

Order_Items(order_item_id=1, order_item_order_id=1, order_item_product_id=957, order_item_quantity=1, order_item_subtotal=299.98, order_item_product_price=299.98)
Order_Items(order_item_id=2, order_item_order_id=2, order_item_product_id=1073, order_item_quantity=1, order_item_subtotal=199.99, order_item_product_price=199.99)
Order_Items(order_item_id=3, order_item_order_id=2, order_item_product_id=502, order_item_quantity=5, order_item_subtotal=250.0, order_item_product_price=50.0)
Order_Items(order_item_id=4, order_item_order_id=2, order_item_product_id=403, order_item_quantity=1, order_item_subtotal=129.99, order_item_product_price=129.99)
Order_Items(order_item_id=5, order_item_order_id=4, order_item_product_id=897, order_item_quantity=2, order_item_subtotal=49.98, order_item_product_price=24.99)
Order_Items(order_item_id=6, order_item_order_id=4, order_item_product_id=365, order_item_quantity=5, order_item_subtotal=299.95, order_item_product_price=59.99)
Order_Items(order_item_id=7

# Now we can practice Spark -- SQL

**Spark SQL is the primary topic covered by CCA Spark and Hadoop Developer Exam (CCA175).**

# Data preparation

At first, we create dataframe

**There is NO header here!**


In [156]:
print_top10(products_txt)

1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy
2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat
3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat
4,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat
5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet
6,2,Jordan Men's VI Retro TD Football Cleat,,134.99,http://images.acmesports.sports/Jordan+Men%27s+VI+Retro+TD+Football+Cleat
7,2,Schutt Youth Recruit Hybrid Custom Football H,,99.99,http://images.acmesports.sports/Schutt+Youth+Recruit+Hybrid+Custom+Football+Helmet+2014
8,2,Nike Men's Vapor Ca

In [157]:
products_df=spark.read.csv("retail_db/products",sep=",", header=False, inferSchema=True)

products_df=products_df.toDF("product_id", "product_category_id","product_name", "product_description","product_price","product_image")
products_df.show()

+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|product_id|product_category_id|        product_name|product_description|product_price|       product_image|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|         1|                  2|Quest Q64 10 FT. ...|               null|        59.98|http://images.acm...|
|         2|                  2|Under Armour Men'...|               null|       129.99|http://images.acm...|
|         3|                  2|Under Armour Men'...|               null|        89.99|http://images.acm...|
|         4|                  2|Under Armour Men'...|               null|        89.99|http://images.acm...|
|         5|                  2|Riddell Youth Rev...|               null|       199.99|http://images.acm...|
|         6|                  2|Jordan Men's VI R...|               null|       134.99|http://images.acm...|
|         7|       

In [158]:
products_df.select("product_id","product_name").show()

+----------+--------------------+
|product_id|        product_name|
+----------+--------------------+
|         1|Quest Q64 10 FT. ...|
|         2|Under Armour Men'...|
|         3|Under Armour Men'...|
|         4|Under Armour Men'...|
|         5|Riddell Youth Rev...|
|         6|Jordan Men's VI R...|
|         7|Schutt Youth Recr...|
|         8|Nike Men's Vapor ...|
|         9|Nike Adult Vapor ...|
|        10|Under Armour Men'...|
|        11|Fitness Gear 300 ...|
|        12|Under Armour Men'...|
|        13|Under Armour Men'...|
|        14|Quik Shade Summit...|
|        15|Under Armour Kids...|
|        16|Riddell Youth 360...|
|        17|Under Armour Men'...|
|        18|Reebok Men's Full...|
|        19|Nike Men's Finger...|
|        20|Under Armour Men'...|
+----------+--------------------+
only showing top 20 rows



In [159]:
products_df.createOrReplaceTempView("products")

spark.sql("select * from Products").show() # Not case sensitive in view Name and sql keyword

+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|product_id|product_category_id|        product_name|product_description|product_price|       product_image|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|         1|                  2|Quest Q64 10 FT. ...|               null|        59.98|http://images.acm...|
|         2|                  2|Under Armour Men'...|               null|       129.99|http://images.acm...|
|         3|                  2|Under Armour Men'...|               null|        89.99|http://images.acm...|
|         4|                  2|Under Armour Men'...|               null|        89.99|http://images.acm...|
|         5|                  2|Riddell Youth Rev...|               null|       199.99|http://images.acm...|
|         6|                  2|Jordan Men's VI R...|               null|       134.99|http://images.acm...|
|         7|       

In [160]:
spark.sql(" select * from products where Product_Price<100").show() # Column Name is not case sensitive

+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|product_id|product_category_id|        product_name|product_description|product_price|       product_image|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|         1|                  2|Quest Q64 10 FT. ...|               null|        59.98|http://images.acm...|
|         3|                  2|Under Armour Men'...|               null|        89.99|http://images.acm...|
|         4|                  2|Under Armour Men'...|               null|        89.99|http://images.acm...|
|         7|                  2|Schutt Youth Recr...|               null|        99.99|http://images.acm...|
|         9|                  2|Nike Adult Vapor ...|               null|         50.0|http://images.acm...|
|        13|                  2|Under Armour Men'...|               null|        89.99|http://images.acm...|
|        15|       

In [161]:
spark.sql("Select Product_id, length(Product_Name) from products").show() # length function!

+----------+--------------------+
|Product_id|length(Product_Name)|
+----------+--------------------+
|         1|                  45|
|         2|                  45|
|         3|                  45|
|         4|                  45|
|         5|                  45|
|         6|                  39|
|         7|                  45|
|         8|                  45|
|         9|                  40|
|        10|                  45|
|        11|                  38|
|        12|                  45|
|        13|                  45|
|        14|                  45|
|        15|                  45|
|        16|                  40|
|        17|                  45|
|        18|                  37|
|        19|                  39|
|        20|                  45|
+----------+--------------------+
only showing top 20 rows



In [162]:
sql1=spark.sql(" select * from products where Product_Price<100")

In [163]:
#dataframe to rdd

for i in sql1.rdd.map(lambda p: p.product_name).take(10): # column IS case sensitive 
  print(i)

Quest Q64 10 FT. x 10 FT. Slant Leg Instant U
Under Armour Men's Renegade D Mid Football Cl
Under Armour Men's Renegade D Mid Football Cl
Schutt Youth Recruit Hybrid Custom Football H
Nike Adult Vapor Jet 3.0 Receiver Gloves
Under Armour Men's Renegade D Mid Football Cl
Under Armour Kids' Highlight RM Alter Ego Sup
Reebok Men's Full Zip Training Jacket
Under Armour Kids' Highlight RM Football Clea
Kijaro Dual Lock Chair


In [164]:
orders_df=spark.read.csv("retail_db/orders", sep=",", header=False, inferSchema=True)
orders_df=orders_df.toDF("order_customer_id","order_date","order_id","order_status")
orders_df.show()

+-----------------+-------------------+--------+---------------+
|order_customer_id|         order_date|order_id|   order_status|
+-----------------+-------------------+--------+---------------+
|                1|2013-07-25 00:00:00|   11599|         CLOSED|
|                2|2013-07-25 00:00:00|     256|PENDING_PAYMENT|
|                3|2013-07-25 00:00:00|   12111|       COMPLETE|
|                4|2013-07-25 00:00:00|    8827|         CLOSED|
|                5|2013-07-25 00:00:00|   11318|       COMPLETE|
|                6|2013-07-25 00:00:00|    7130|       COMPLETE|
|                7|2013-07-25 00:00:00|    4530|       COMPLETE|
|                8|2013-07-25 00:00:00|    2911|     PROCESSING|
|                9|2013-07-25 00:00:00|    5657|PENDING_PAYMENT|
|               10|2013-07-25 00:00:00|    5648|PENDING_PAYMENT|
|               11|2013-07-25 00:00:00|     918| PAYMENT_REVIEW|
|               12|2013-07-25 00:00:00|    1837|         CLOSED|
|               13|2013-0

In [165]:
orders_df.createOrReplaceTempView("orders")
spark.sql("select * from orders").show()

+-----------------+-------------------+--------+---------------+
|order_customer_id|         order_date|order_id|   order_status|
+-----------------+-------------------+--------+---------------+
|                1|2013-07-25 00:00:00|   11599|         CLOSED|
|                2|2013-07-25 00:00:00|     256|PENDING_PAYMENT|
|                3|2013-07-25 00:00:00|   12111|       COMPLETE|
|                4|2013-07-25 00:00:00|    8827|         CLOSED|
|                5|2013-07-25 00:00:00|   11318|       COMPLETE|
|                6|2013-07-25 00:00:00|    7130|       COMPLETE|
|                7|2013-07-25 00:00:00|    4530|       COMPLETE|
|                8|2013-07-25 00:00:00|    2911|     PROCESSING|
|                9|2013-07-25 00:00:00|    5657|PENDING_PAYMENT|
|               10|2013-07-25 00:00:00|    5648|PENDING_PAYMENT|
|               11|2013-07-25 00:00:00|     918| PAYMENT_REVIEW|
|               12|2013-07-25 00:00:00|    1837|         CLOSED|
|               13|2013-0

In [166]:
order_item_df=spark.read.csv("retail_db/order_items", sep=",", header=False, inferSchema=True)
order_item_df=order_item_df.toDF("order_item_id","order_item_order_id","order_item_product_id","order_item_product_price","order_item_quantity","order_item_subtotal")
order_item_df.show()

+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_product_price|order_item_quantity|order_item_subtotal|
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|            1|                  1|                  957|                       1|             299.98|             299.98|
|            2|                  2|                 1073|                       1|             199.99|             199.99|
|            3|                  2|                  502|                       5|              250.0|               50.0|
|            4|                  2|                  403|                       1|             129.99|             129.99|
|            5|                  4|                  897|                       2|              49.98|              24.99|
|            6| 

In [167]:
order_item_df.createOrReplaceTempView("order_item")

spark.sql("select * from order_item").show()

+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_product_price|order_item_quantity|order_item_subtotal|
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|            1|                  1|                  957|                       1|             299.98|             299.98|
|            2|                  2|                 1073|                       1|             199.99|             199.99|
|            3|                  2|                  502|                       5|              250.0|               50.0|
|            4|                  2|                  403|                       1|             129.99|             129.99|
|            5|                  4|                  897|                       2|              49.98|              24.99|
|            6| 

## Join (as, alias work!)

In [168]:
# Vow! we can use SQL join directly
spark.sql("select  i.order_item_order_id as order_id, i.order_item_product_id as product_id, p.product_name from order_item i, products p where i.order_item_product_id=p.product_id and i.order_item_order_id=1 ").show()

+--------+----------+--------------------+
|order_id|product_id|        product_name|
+--------+----------+--------------------+
|       1|       957|Diamondback Women...|
+--------+----------+--------------------+



In [169]:
# Vow! we can use SQL join directly
spark.sql("select  i.order_item_order_id as order_id, i.order_item_product_id as product_id, p.product_name from order_item i JOIN products p on i.order_item_product_id=p.product_id where i.order_item_order_id=1 ").show()

+--------+----------+--------------------+
|order_id|product_id|        product_name|
+--------+----------+--------------------+
|       1|       957|Diamondback Women...|
+--------+----------+--------------------+



## Aggregate

In [170]:
spark.sql("select  i.order_item_order_id as order_id, count(1) as count_items from order_item i group by order_item_order_id").show()

+--------+-----------+
|order_id|count_items|
+--------+-----------+
|     148|          3|
|     463|          4|
|     471|          2|
|     496|          5|
|    1088|          2|
|    1580|          1|
|    1591|          3|
|    1645|          5|
|    2366|          1|
|    2659|          5|
|    2866|          4|
|    3175|          2|
|    3749|          1|
|    3794|          1|
|    3918|          4|
|    3997|          2|
|    4101|          1|
|    4519|          1|
|    4818|          1|
|    4900|          2|
+--------+-----------+
only showing top 20 rows



In [171]:
sql4=spark.sql("select  i.order_item_order_id as order_id, count(*) as count_items from order_item i group by order_item_order_id")

In [172]:
sql4.createOrReplaceTempView("i_by_o")

### Query on query

In [173]:
# Vow! we can use SQL join directly
spark.sql("select  * from i_by_o i JOIN orders o on i.order_id=o.order_id where o.order_id=2").show()

+--------+-----------+-----------------+-------------------+--------+---------------+
|order_id|count_items|order_customer_id|         order_date|order_id|   order_status|
+--------+-----------+-----------------+-------------------+--------+---------------+
|       2|          3|            15192|2013-10-29 00:00:00|       2|PENDING_PAYMENT|
|       2|          3|            33865|2014-02-18 00:00:00|       2|       COMPLETE|
|       2|          3|            57963|2013-08-02 00:00:00|       2|        ON_HOLD|
|       2|          3|            67863|2013-11-30 00:00:00|       2|       COMPLETE|
+--------+-----------+-----------------+-------------------+--------+---------------+



## Save/ Read
**We use SAME options for save and read**



In [174]:
sql_test2=spark.sql("select  i.order_item_order_id as order_id, i.order_item_product_id as product_id, p.product_name from order_item i JOIN products p on i.order_item_product_id=p.product_id where i.order_item_order_id=1 ")

In [191]:
sql_test2.coalesce(1).write.options(compression="gzip",header=True,sep="#").csv("sql_test5.csv") # coalesce(1) is important to put all contents in 1 file.

In [192]:
spark.read.options(compression="gzip",header=True,sep="#").csv("sql_test5.csv").show() # compression in options function!

+--------+----------+--------------------+
|order_id|product_id|        product_name|
+--------+----------+--------------------+
|       1|       957|Diamondback Women...|
+--------+----------+--------------------+



### Order by asc/desc

In [193]:
spark.sql("select * from products order by product_price desc").show()

+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|product_id|product_category_id|        product_name|product_description|product_price|       product_image|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|       208|                 10| SOLE E35 Elliptical|               null|      1999.99|http://images.acm...|
|       199|                 10|  SOLE F85 Treadmill|               null|      1799.99|http://images.acm...|
|        66|                  4|  SOLE F85 Treadmill|               null|      1799.99|http://images.acm...|
|       496|                 22|  SOLE F85 Treadmill|               null|      1799.99|http://images.acm...|
|      1048|                 47|"Spalding Beast 6...|               null|      1099.99|http://images.acm...|
|        60|                  4| SOLE E25 Elliptical|               null|       999.99|http://images.acm...|
|       694|       

# CCA 175 Exam Prepare

## Common imports

```Python
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
from pyspark.sql import Row

```

## Common Commands

### sqoop

sqoop import --conect jdbc://gateway/problem1 --table customer --username cloudera --password cloudera --target-dir /user/cert/problem1/solution/

### pyspark

![alt text](https://img2018.cnblogs.com/blog/724315/201809/724315-20180921223109361-1426293188.jpg)

### spark-submit

### hdfs dfs 

![alt text](https://linoxide.com/images/hadoop-hdfs-commands-cheatsheet-900x1500.png)

### impala-shell

impala-shell -i worker1

create database problem3;

use problem3;

create external table solution (
id int,
fname string,
)
row format delimited
fields terminated by ‘\t’
location ‘/user…’



CREATE [EXTERNAL] TABLE [IF NOT EXISTS] db_name.]table_name
  [COMMENT 'table_comment']
  [WITH SERDEPROPERTIES ('key1'='value1', 'key2'='value2', ...)]
  [
   [ROW FORMAT row_format] [STORED AS ctas_file_format]
  ]
  [LOCATION 'hdfs_path']
  [TBLPROPERTIES ('key1'='value1', 'key2'='value2', ...)]
  [CACHED IN 'pool_name' [WITH REPLICATION = integer] | UNCACHED]
AS
  select_statement

primitive_type:
    TINYINT
  | SMALLINT
  | INT
  | BIGINT
  | BOOLEAN
  | FLOAT
  | DOUBLE
  | DECIMAL
  | STRING
  | CHAR
  | VARCHAR
  | TIMESTAMP

complex_type:
    struct_type
  | array_type
  | map_type

struct_type: STRUCT < name : primitive_or_complex_type [COMMENT 'comment_string'], ... >

array_type: ARRAY < primitive_or_complex_type >

map_type: MAP < primitive_type, primitive_or_complex_type >

row_format:
  DELIMITED [FIELDS TERMINATED BY 'char' [ESCAPED BY 'char']]
  [LINES TERMINATED BY 'char']

file_format:
    PARQUET
  | TEXTFILE
  | AVRO
  | SEQUENCEFILE
  | RCFILE

ctas_file_format:
    PARQUET
  | TEXTFILE


### vi

![alt text](https://image.slidesharecdn.com/vicheatsheet-190416230629/95/vi-cheat-sheet-1-638.jpg?cb=1555456011)

## Study points

### text/csv file read/write

In [195]:
!wget --output-document=Crimes_-_2019.csv https://data.cityofchicago.org/api/views/w98m-zvie/rows.csv?accessType=DOWNLOAD

--2020-07-03 09:45:04--  https://data.cityofchicago.org/api/views/w98m-zvie/rows.csv?accessType=DOWNLOAD
Resolving data.cityofchicago.org (data.cityofchicago.org)... 52.206.140.199, 52.206.68.26, 52.206.140.205
Connecting to data.cityofchicago.org (data.cityofchicago.org)|52.206.140.199|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘Crimes_-_2019.csv’

Crimes_-_2019.csv       [          <=>       ]  58.98M  3.08MB/s    in 20s     

2020-07-03 09:45:25 (2.99 MB/s) - ‘Crimes_-_2019.csv’ saved [61844522]



In [196]:
df=spark.read.csv("Crimes_-_2019.csv",header=True, inferSchema=True,sep=",")
df.show()

+--------+-----------+--------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|      ID|Case Number|                Date|               Block|IUCR|      Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|
+--------+-----------+--------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|11761076|   JC326952|06/29/2019 08:25:...|   042XX W WILCOX ST|2014|         NARCOTICS|MANUFACTURE / DEL...|VEHICLE NON-COMME...|

In [197]:
df.createOrReplaceTempView("crime")

In [198]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: boolean (nullable = true)
 |-- Domestic: boolean (nullable = true)
 |-- Beat: integer (nullable = true)
 |-- District: integer (nullable = true)
 |-- Ward: integer (nullable = true)
 |-- Community Area: integer (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: integer (nullable = true)
 |-- Y Coordinate: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)



### Concat, concat_ws work!

Don't use "+"!

In [199]:
spark.sql("select concat_ws('###',`Case Number`, Date) as t from crime").show()

+--------------------+
|                   t|
+--------------------+
|JC326952###06/29/...|
|JD279923###12/01/...|
|JD280968###11/14/...|
|JC285931###05/31/...|
|JC375483###08/02/...|
|JC370906###07/30/...|
|JC322986###06/26/...|
|JD278760###09/30/...|
|JC368168###07/28/...|
|JC301932###06/11/...|
|JC325517###06/28/...|
|JC345576###07/12/...|
|JD278275###07/09/...|
|JC369446###07/29/...|
|JD276690###11/26/...|
|JC315872###06/21/...|
|JC324471###05/22/...|
|JC367890###07/28/...|
|JC345563###07/12/...|
|JC369182###07/29/...|
+--------------------+
only showing top 20 rows



In [201]:
result1=spark.sql("select date_format(to_date(date,'MM/dd/yyyy'),'yyyyMM') as month, `primary type`, count(1) as counts from crime group by month, `primary type` order by month, counts desc")

result1.rdd.map(lambda rec: '\t'.join([str(r) for r in rec])).coalesce(1).saveAsTextFile("text1.gzip",compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")


In [202]:
result1=spark.sql("select date_format(to_date(date,'MM/dd/yyyy'),'yyyyMM') as month, `primary type`, count(*) as counts from crime group by month, `primary type` order by month, counts desc")

result1_rdd=result1.rdd.map(lambda rec: '\t'.join([str(r) for r in rec]))

result1_rdd.coalesce(1).saveAsTextFile("text6.gzip",compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")


### sql write text does work TOO

In [203]:
result1.createOrReplaceTempView("result1")
result2=spark.sql("select concat_ws('\t',month, `primary type`, CAST(counts as string)) as value from result1")

In [204]:
result2.show()

+--------------------+
|               value|
+--------------------+
|   201901	THEFT	4554|
| 201901	BATTERY	3519|
|201901	CRIMINAL D...|
|201901	DECEPTIVE ...|
| 201901	ASSAULT	1458|
|201901	OTHER OFFE...|
|201901	NARCOTICS	...|
| 201901	BURGLARY	797|
|201901	MOTOR VEHI...|
|  201901	ROBBERY	658|
|201901	CRIMINAL T...|
|201901	WEAPONS VI...|
|201901	OFFENSE IN...|
|201901	CRIM SEXUA...|
|201901	INTERFEREN...|
|201901	SEX OFFENS...|
|201901	PUBLIC PEA...|
|201901	PROSTITUTI...|
|201901	CRIMINAL S...|
|  201901	HOMICIDE	23|
+--------------------+
only showing top 20 rows



In [205]:
result2.coalesce(1).write.options(compression="gzip").text("sql_test3.gzip")


In [208]:
result2.coalesce(1).write.text("text1.txt")

In [206]:
spark.sql("select `primary type`, count(*) as counts from crime group by `primary type` having counts<100 order by counts limit 2").show()

# limit 2 at the ending to select top 2
# top 2 at the beginning is NOT valid

+--------------------+------+
|        primary type|counts|
+--------------------+------+
|        NON-CRIMINAL|     4|
|OTHER NARCOTIC VI...|     8|
+--------------------+------+



In [207]:
sql5=spark.sql("select `primary type`, count(*) as counts from crime group by `primary type` having counts<100 order by counts limit 2")
#sql5.write.csv("sql6.csv", sep="\t", header=True) # Sep header

# Now we can practice Spark -- RDD
<Font color=red>**Please don't spend too much time on RDD, which is not required for the exam CCA Spark and Hadoop Developer Exam (CCA175).**</Font>

## Data - Order-OrderItems-Products

In [209]:
print_top10(products)

Product(product_id=1, product_category_id=2, product_name='Quest Q64 10 FT. x 10 FT. Slant Leg Instant U', product_description='', product_price=59.98)
Product(product_id=2, product_category_id=2, product_name="Under Armour Men's Highlight MC Football Clea", product_description='', product_price=129.99)
Product(product_id=3, product_category_id=2, product_name="Under Armour Men's Renegade D Mid Football Cl", product_description='', product_price=89.99)
Product(product_id=4, product_category_id=2, product_name="Under Armour Men's Renegade D Mid Football Cl", product_description='', product_price=89.99)
Product(product_id=5, product_category_id=2, product_name='Riddell Youth Revolution Speed Custom Footbal', product_description='', product_price=199.99)
Product(product_id=6, product_category_id=2, product_name="Jordan Men's VI Retro TD Football Cleat", product_description='', product_price=134.99)
Product(product_id=7, product_category_id=2, product_name='Schutt Youth Recruit Hybrid Cust

In [210]:
print_top10(orders)

Order(order_id=1, order_date=datetime.datetime(2013, 7, 25, 0, 0), order_customer_id=11599, order_status='CLOSED')
Order(order_id=2, order_date=datetime.datetime(2013, 7, 25, 0, 0), order_customer_id=256, order_status='PENDING_PAYMENT')
Order(order_id=3, order_date=datetime.datetime(2013, 7, 25, 0, 0), order_customer_id=12111, order_status='COMPLETE')
Order(order_id=4, order_date=datetime.datetime(2013, 7, 25, 0, 0), order_customer_id=8827, order_status='CLOSED')
Order(order_id=5, order_date=datetime.datetime(2013, 7, 25, 0, 0), order_customer_id=11318, order_status='COMPLETE')
Order(order_id=6, order_date=datetime.datetime(2013, 7, 25, 0, 0), order_customer_id=7130, order_status='COMPLETE')
Order(order_id=7, order_date=datetime.datetime(2013, 7, 25, 0, 0), order_customer_id=4530, order_status='COMPLETE')
Order(order_id=8, order_date=datetime.datetime(2013, 7, 25, 0, 0), order_customer_id=2911, order_status='PROCESSING')
Order(order_id=9, order_date=datetime.datetime(2013, 7, 25, 0, 0)

In [211]:
print_top10(order_Items)

Order_Items(order_item_id=1, order_item_order_id=1, order_item_product_id=957, order_item_quantity=1, order_item_subtotal=299.98, order_item_product_price=299.98)
Order_Items(order_item_id=2, order_item_order_id=2, order_item_product_id=1073, order_item_quantity=1, order_item_subtotal=199.99, order_item_product_price=199.99)
Order_Items(order_item_id=3, order_item_order_id=2, order_item_product_id=502, order_item_quantity=5, order_item_subtotal=250.0, order_item_product_price=50.0)
Order_Items(order_item_id=4, order_item_order_id=2, order_item_product_id=403, order_item_quantity=1, order_item_subtotal=129.99, order_item_product_price=129.99)
Order_Items(order_item_id=5, order_item_order_id=4, order_item_product_id=897, order_item_quantity=2, order_item_subtotal=49.98, order_item_product_price=24.99)
Order_Items(order_item_id=6, order_item_order_id=4, order_item_product_id=365, order_item_quantity=5, order_item_subtotal=299.95, order_item_product_price=59.99)
Order_Items(order_item_id=7

## Spark RDD

In [212]:
#Parallelized collections are created by calling SparkContext’s parallelize method on an existing iterable or collection in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

 However, in cluster mode, the output to stdout being called by the executors is now writing to the executor’s stdout instead, not the one on the driver, so stdout on the driver won’t show these! To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node thus: **rdd.collect().foreach(println).**

## Transformations
The following table lists some of the common transformations supported by Spark. Refer to the RDD API doc (Scala, Java, Python, R) and pair RDD functions doc (Scala, Java) for details.


### map(func)	
Return a new distributed dataset formed by passing each element of the source through a function func.

In [213]:
products_short=products.map(lambda p: (p.product_id, p.product_name))

print_top10(products_short)

(1, 'Quest Q64 10 FT. x 10 FT. Slant Leg Instant U')
(2, "Under Armour Men's Highlight MC Football Clea")
(3, "Under Armour Men's Renegade D Mid Football Cl")
(4, "Under Armour Men's Renegade D Mid Football Cl")
(5, 'Riddell Youth Revolution Speed Custom Footbal')
(6, "Jordan Men's VI Retro TD Football Cleat")
(7, 'Schutt Youth Recruit Hybrid Custom Football H')
(8, "Nike Men's Vapor Carbon Elite TD Football Cle")
(9, 'Nike Adult Vapor Jet 3.0 Receiver Gloves')
(10, "Under Armour Men's Highlight MC Football Clea")


### filter(func)
Return a new dataset formed by selecting those elements of the source on which func returns true.

In [214]:
filtered=order_Items.filter(lambda o: o.order_item_product_id==957)

print_top10(filtered)

Order_Items(order_item_id=1, order_item_order_id=1, order_item_product_id=957, order_item_quantity=1, order_item_subtotal=299.98, order_item_product_price=299.98)
Order_Items(order_item_id=9, order_item_order_id=5, order_item_product_id=957, order_item_quantity=1, order_item_subtotal=299.98, order_item_product_price=299.98)
Order_Items(order_item_id=12, order_item_order_id=5, order_item_product_id=957, order_item_quantity=1, order_item_subtotal=299.98, order_item_product_price=299.98)
Order_Items(order_item_id=15, order_item_order_id=7, order_item_product_id=957, order_item_quantity=1, order_item_subtotal=299.98, order_item_product_price=299.98)
Order_Items(order_item_id=34, order_item_order_id=12, order_item_product_id=957, order_item_quantity=1, order_item_subtotal=299.98, order_item_product_price=299.98)
Order_Items(order_item_id=59, order_item_order_id=19, order_item_product_id=957, order_item_quantity=1, order_item_subtotal=299.98, order_item_product_price=299.98)
Order_Items(orde

### flatMap(func)	
Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).

In [215]:
print_top10( products_txt.flatMap(lambda p: p.split(",")))

1
2
Quest Q64 10 FT. x 10 FT. Slant Leg Instant U

59.98
http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy
2
2
Under Armour Men's Highlight MC Football Clea



### union(otherDataset)	intersection(otherDataset) distinct([numPartitions]))


union(otherDataset) Return a new dataset that contains the union of the elements in the source dataset and the argument.

intersection(otherDataset)	Return a new RDD that contains the intersection of elements in the source dataset and the argument.

distinct([numPartitions]))	Return a new dataset that contains the distinct elements of the source dataset.

In [216]:
print_top10(order_Items)

Order_Items(order_item_id=1, order_item_order_id=1, order_item_product_id=957, order_item_quantity=1, order_item_subtotal=299.98, order_item_product_price=299.98)
Order_Items(order_item_id=2, order_item_order_id=2, order_item_product_id=1073, order_item_quantity=1, order_item_subtotal=199.99, order_item_product_price=199.99)
Order_Items(order_item_id=3, order_item_order_id=2, order_item_product_id=502, order_item_quantity=5, order_item_subtotal=250.0, order_item_product_price=50.0)
Order_Items(order_item_id=4, order_item_order_id=2, order_item_product_id=403, order_item_quantity=1, order_item_subtotal=129.99, order_item_product_price=129.99)
Order_Items(order_item_id=5, order_item_order_id=4, order_item_product_id=897, order_item_quantity=2, order_item_subtotal=49.98, order_item_product_price=24.99)
Order_Items(order_item_id=6, order_item_order_id=4, order_item_product_id=365, order_item_quantity=5, order_item_subtotal=299.95, order_item_product_price=59.99)
Order_Items(order_item_id=7

In [217]:
print("P1=============")
p1=order_Items.filter(lambda i: i.order_item_order_id==1).map(lambda i: i.order_item_product_id)
print_top10(p1)
print("P2=============")
p2=order_Items.filter(lambda i: i.order_item_order_id==5).map(lambda i: i.order_item_product_id)
print_top10(p2)
print("p1.union(p2).distinct()=============")
print_top10(p1.union(p2).distinct()) # distinct is needed to avoid duplicates
print("p1.intersection(p2).distinct()=============")
print_top10(p1.intersection(p2).distinct())

957
957
365
1014
957
403
957
365
1014
403
957


### groupByKey([numPartitions])	

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numPartitions argument to set a different number of tasks.

In [218]:
Order_Items.order_item_order_id

<property at 0x7f94acec1368>

In [219]:
print_top10(order_Items.map(lambda o: (o.order_item_order_id, \
            o.order_item_product_id)).groupByKey())

(2, <pyspark.resultiterable.ResultIterable object at 0x7f94ace8d390>)
(4, <pyspark.resultiterable.ResultIterable object at 0x7f94ace8d2b0>)
(8, <pyspark.resultiterable.ResultIterable object at 0x7f94ace8d5c0>)
(10, <pyspark.resultiterable.ResultIterable object at 0x7f94ace8d438>)
(12, <pyspark.resultiterable.ResultIterable object at 0x7f94ace8d3c8>)
(14, <pyspark.resultiterable.ResultIterable object at 0x7f94ace8dcf8>)
(16, <pyspark.resultiterable.ResultIterable object at 0x7f94ace8d160>)
(18, <pyspark.resultiterable.ResultIterable object at 0x7f94ace8df28>)
(20, <pyspark.resultiterable.ResultIterable object at 0x7f94adf161d0>)
(24, <pyspark.resultiterable.ResultIterable object at 0x7f94acf99080>)


### reduceByKey(func, [numPartitions])

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

In [220]:
print_top10(order_Items.filter(lambda o: (o.order_item_order_id==2)))

Order_Items(order_item_id=2, order_item_order_id=2, order_item_product_id=1073, order_item_quantity=1, order_item_subtotal=199.99, order_item_product_price=199.99)
Order_Items(order_item_id=3, order_item_order_id=2, order_item_product_id=502, order_item_quantity=5, order_item_subtotal=250.0, order_item_product_price=50.0)
Order_Items(order_item_id=4, order_item_order_id=2, order_item_product_id=403, order_item_quantity=1, order_item_subtotal=129.99, order_item_product_price=129.99)


In [221]:
print_top10(order_Items.map(lambda o: (o.order_item_order_id, \
            o.order_item_product_id)).reduceByKey(lambda x, y: max(x,y)))

(2, 1073)
(4, 1014)
(8, 1014)
(10, 1073)
(12, 1014)
(14, 1014)
(16, 365)
(18, 1073)
(20, 1014)
(24, 1073)


### aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])	

When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

In [222]:
print_top10(order_Items.map(lambda o: (o.order_item_order_id, \
            o.order_item_product_id)).aggregateByKey(0,(lambda x,y: x+1), (lambda x,y: x+1)))

(2, 3)
(4, 4)
(8, 4)
(10, 5)
(12, 5)
(14, 3)
(16, 2)
(18, 3)
(20, 4)
(24, 5)


### sortByKey([ascending], [numPartitions])	
When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

In [223]:
print_top10(order_Items.map(lambda o: (o.order_item_order_id, \
            o.order_item_product_id)).sortByKey(False))

(68883, 208)
(68883, 502)
(68882, 365)
(68882, 502)
(68881, 403)
(68880, 1014)
(68880, 502)
(68880, 1073)
(68880, 1014)
(68880, 1014)


### join(otherDataset, [numPartitions])	
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.

In [224]:
o_by_k=order_Items.map(lambda o: (o.order_item_product_id, o.order_item_order_id)).filter(lambda x: x[0]==957)
print_top10(o_by_k)


(957, 1)
(957, 5)
(957, 5)
(957, 7)
(957, 12)
(957, 19)
(957, 23)
(957, 28)
(957, 28)
(957, 34)


In [225]:
p_by_k=products.filter(lambda p: p.product_id==957).map(lambda p: (p.product_id, p.product_name))
print_top10(p_by_k)

(957, "Diamondback Women's Serene Classic Comfort Bi")


In [226]:
j_by_k=o_by_k.join(p_by_k)
# print(j_by_k.first())

print_top10(j_by_k.\
  map(lambda l: (l[0],l[1][0], l[1][1])))

(957, 1, "Diamondback Women's Serene Classic Comfort Bi")
(957, 5, "Diamondback Women's Serene Classic Comfort Bi")
(957, 5, "Diamondback Women's Serene Classic Comfort Bi")
(957, 7, "Diamondback Women's Serene Classic Comfort Bi")
(957, 12, "Diamondback Women's Serene Classic Comfort Bi")
(957, 19, "Diamondback Women's Serene Classic Comfort Bi")
(957, 23, "Diamondback Women's Serene Classic Comfort Bi")
(957, 28, "Diamondback Women's Serene Classic Comfort Bi")
(957, 28, "Diamondback Women's Serene Classic Comfort Bi")
(957, 34, "Diamondback Women's Serene Classic Comfort Bi")
