In [1]:
pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Note: you may need to restart the kernel to use updated packages.


In [1]:
import findspark

In [2]:
findspark.init() #or put path of spark in the brackets

In [3]:
import pyspark

In [4]:
sc = pyspark.SparkContext(appName = "MyApp")

In [5]:
sc

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [7]:
#creating spark session
spark = SparkSession.builder.getOrCreate()

In [10]:
emp = [(1,'aa',101,34),(2,'gg',102,45),(3,'ty',101,32),(4,'sa',102,23)]
dept = [(101,'cs'),(102,'ds')]

In [11]:
emp_df = spark.createDataFrame(emp,["eno","name","dno","age"])
dept_df = spark.createDataFrame(dept,["dno","dname"])
emp_df.show()
dept_df.show()

+---+----+---+---+
|eno|name|dno|age|
+---+----+---+---+
|  1|  aa|101| 34|
|  2|  gg|102| 45|
|  3|  ty|101| 32|
|  4|  sa|102| 23|
+---+----+---+---+

+---+-----+
|dno|dname|
+---+-----+
|101|   cs|
|102|   ds|
+---+-----+



In [12]:
#retrive basic dataframe details 
emp_df.count()
emp_df.columns
emp_df.dtypes

[('eno', 'bigint'), ('name', 'string'), ('dno', 'bigint'), ('age', 'bigint')]

In [13]:
emp_df.count()

4

In [16]:
emp_df.schema

StructType([StructField('eno', LongType(), True), StructField('name', StringType(), True), StructField('dno', LongType(), True), StructField('age', LongType(), True)])

In [17]:
emp_df.printSchema()

root
 |-- eno: long (nullable = true)
 |-- name: string (nullable = true)
 |-- dno: long (nullable = true)
 |-- age: long (nullable = true)



In [19]:
#retrive specific columns
emp_df.select("eno","name").show()

+---+----+
|eno|name|
+---+----+
|  1|  aa|
|  2|  gg|
|  3|  ty|
|  4|  sa|
+---+----+



In [21]:
emp_df.filter(emp_df.eno==1).show()

+---+----+---+---+
|eno|name|dno|age|
+---+----+---+---+
|  1|  aa|101| 34|
+---+----+---+---+



In [22]:
emp_df.filter(emp_df["eno"]==1).show()

+---+----+---+---+
|eno|name|dno|age|
+---+----+---+---+
|  1|  aa|101| 34|
+---+----+---+---+



In [23]:
emp_df.groupBy("dno").count().show()

+---+-----+
|dno|count|
+---+-----+
|101|    2|
|102|    2|
+---+-----+



In [24]:
emp_df.groupBy("dno").avg("age").show()

+---+--------+
|dno|avg(age)|
+---+--------+
|101|    33.0|
|102|    34.0|
+---+--------+



In [25]:
(emp_df.groupBy("dno").agg(count("age").alias("age_count"),
                           avg("age").alias("avg_age")
                          ).show()
)

+---+---------+-------+
|dno|age_count|avg_age|
+---+---------+-------+
|101|        2|   33.0|
|102|        2|   34.0|
+---+---------+-------+



In [26]:
emp_df.sort("age").show()
#check for descending order

+---+----+---+---+
|eno|name|dno|age|
+---+----+---+---+
|  4|  sa|102| 23|
|  3|  ty|101| 32|
|  1|  aa|101| 34|
|  2|  gg|102| 45|
+---+----+---+---+



In [28]:
#use of expressions
emp_df_new = emp_df.selectExpr("name","age + 5 as new_age")
emp_df_new.show()

+----+-------+
|name|new_age|
+----+-------+
|  aa|     39|
|  gg|     50|
|  ty|     37|
|  sa|     28|
+----+-------+



In [30]:
emp_df.where(emp_df.age>30).show()

+---+----+---+---+
|eno|name|dno|age|
+---+----+---+---+
|  1|  aa|101| 34|
|  2|  gg|102| 45|
|  3|  ty|101| 32|
+---+----+---+---+



In [31]:
emp_df_new.withColumnRenamed("name","emp_name").show()

+--------+-------+
|emp_name|new_age|
+--------+-------+
|      aa|     39|
|      gg|     50|
|      ty|     37|
|      sa|     28|
+--------+-------+



In [33]:
# withColumns() if you want to make changes on multiple columns

emp_df.withColumns({'eno_new':emp_df.eno + 2,'eno_new_2':emp_df.eno+3}).show()

+---+----+---+---+-------+---------+
|eno|name|dno|age|eno_new|eno_new_2|
+---+----+---+---+-------+---------+
|  1|  aa|101| 34|      3|        4|
|  2|  gg|102| 45|      4|        5|
|  3|  ty|101| 32|      5|        6|
|  4|  sa|102| 23|      6|        7|
+---+----+---+---+-------+---------+



In [34]:
emp_city=emp_df.withColumn('city', lit('Pune'))
emp_city.show()

+---+----+---+---+----+
|eno|name|dno|age|city|
+---+----+---+---+----+
|  1|  aa|101| 34|Pune|
|  2|  gg|102| 45|Pune|
|  3|  ty|101| 32|Pune|
|  4|  sa|102| 23|Pune|
+---+----+---+---+----+



In [35]:
# join , merge

emp_df.join(dept_df).show()

+---+----+---+---+---+-----+
|eno|name|dno|age|dno|dname|
+---+----+---+---+---+-----+
|  1|  aa|101| 34|101|   cs|
|  1|  aa|101| 34|102|   ds|
|  2|  gg|102| 45|101|   cs|
|  2|  gg|102| 45|102|   ds|
|  3|  ty|101| 32|101|   cs|
|  3|  ty|101| 32|102|   ds|
|  4|  sa|102| 23|101|   cs|
|  4|  sa|102| 23|102|   ds|
+---+----+---+---+---+-----+



In [36]:
emp_df.join(dept_df, emp_df["dno"] == dept_df["dno"]).show()

+---+----+---+---+---+-----+
|eno|name|dno|age|dno|dname|
+---+----+---+---+---+-----+
|  1|  aa|101| 34|101|   cs|
|  3|  ty|101| 32|101|   cs|
|  2|  gg|102| 45|102|   ds|
|  4|  sa|102| 23|102|   ds|
+---+----+---+---+---+-----+



### Assignment 1:

In [2]:
import findspark
findspark.init()

In [3]:
import pyspark

In [4]:
sc = pyspark.SparkContext(appName = "MyApp")
sc

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
#creating spark session
spark = SparkSession.builder.getOrCreate()

#### Question 1

Create a student dataframe with following details 

student(roll number, name, address, age, gender)

class(class id, class name)

- Add relavant records in both frames (minimum 5)
  
- display frames

- display student names with gender = M and gender = F seperately

- display students of specific class

- display students whoes age>22

- add new column grade to student df

- display classnames and its associated students

In [37]:
student = [(1,'aa','Pune',23,'F',100),(2,'gg','Mumbai',23,'M',100),(3,'ty','Pune',21,'F',101),(4,'sa','Nagpur',22,'M',100),(5,'ab','Pune',22,'M',101)]
class_details = [(100,'cs'),(101,'ds')]

In [38]:
student_df = spark.createDataFrame(student,["roll_number","name","address","age","gender","class_id"])
class_df = spark.createDataFrame(class_details,["class_id","class_name"])
student_df.show()
class_df.show()

+-----------+----+-------+---+------+--------+
|roll_number|name|address|age|gender|class_id|
+-----------+----+-------+---+------+--------+
|          1|  aa|   Pune| 23|     F|     100|
|          2|  gg| Mumbai| 23|     M|     100|
|          3|  ty|   Pune| 21|     F|     101|
|          4|  sa| Nagpur| 22|     M|     100|
|          5|  ab|   Pune| 22|     M|     101|
+-----------+----+-------+---+------+--------+

+--------+----------+
|class_id|class_name|
+--------+----------+
|     100|        cs|
|     101|        ds|
+--------+----------+



In [39]:
#display student names with gender = M and gender = F seperately

student_df.filter(student_df["gender"]=='M').show()

+-----------+----+-------+---+------+--------+
|roll_number|name|address|age|gender|class_id|
+-----------+----+-------+---+------+--------+
|          2|  gg| Mumbai| 23|     M|     100|
|          4|  sa| Nagpur| 22|     M|     100|
|          5|  ab|   Pune| 22|     M|     101|
+-----------+----+-------+---+------+--------+



In [40]:
student_df.filter(student_df["gender"]=='F').show()

+-----------+----+-------+---+------+--------+
|roll_number|name|address|age|gender|class_id|
+-----------+----+-------+---+------+--------+
|          1|  aa|   Pune| 23|     F|     100|
|          3|  ty|   Pune| 21|     F|     101|
+-----------+----+-------+---+------+--------+



In [42]:
#display students of specific class

student_df.where(student_df.class_id == 100).show()

+-----------+----+-------+---+------+--------+
|roll_number|name|address|age|gender|class_id|
+-----------+----+-------+---+------+--------+
|          1|  aa|   Pune| 23|     F|     100|
|          2|  gg| Mumbai| 23|     M|     100|
|          4|  sa| Nagpur| 22|     M|     100|
+-----------+----+-------+---+------+--------+



In [43]:
#display students whoes age>22
student_df.where(student_df.age>22).show()

+-----------+----+-------+---+------+--------+
|roll_number|name|address|age|gender|class_id|
+-----------+----+-------+---+------+--------+
|          1|  aa|   Pune| 23|     F|     100|
|          2|  gg| Mumbai| 23|     M|     100|
+-----------+----+-------+---+------+--------+



In [45]:
#add new column grade to student df
student_df.withColumn('grade', lit('A')).show()

+-----------+----+-------+---+------+--------+-----+
|roll_number|name|address|age|gender|class_id|grade|
+-----------+----+-------+---+------+--------+-----+
|          1|  aa|   Pune| 23|     F|     100|    A|
|          2|  gg| Mumbai| 23|     M|     100|    A|
|          3|  ty|   Pune| 21|     F|     101|    A|
|          4|  sa| Nagpur| 22|     M|     100|    A|
|          5|  ab|   Pune| 22|     M|     101|    A|
+-----------+----+-------+---+------+--------+-----+



In [46]:
#display classnames and its associated students
student_df.join(class_df,student_df['class_id']==class_df['class_id']).show()

+-----------+----+-------+---+------+--------+--------+----------+
|roll_number|name|address|age|gender|class_id|class_id|class_name|
+-----------+----+-------+---+------+--------+--------+----------+
|          1|  aa|   Pune| 23|     F|     100|     100|        cs|
|          2|  gg| Mumbai| 23|     M|     100|     100|        cs|
|          4|  sa| Nagpur| 22|     M|     100|     100|        cs|
|          3|  ty|   Pune| 21|     F|     101|     101|        ds|
|          5|  ab|   Pune| 22|     M|     101|     101|        ds|
+-----------+----+-------+---+------+--------+--------+----------+



### Assignment 1
#### Question2
Create  dataframes 
emp (eno , ename , gender, designation, city, salary, dno)  
dept(dno, dna
m    - e)
Insert 5 records in each f
    r    - ame
Print the schema for both datafr    
    - ames
Filter emp dataframe based on designation, salary respect    
    - ively
Show data of departments for female emp    
    - loyees
Increase salary of employee whose designation is     
    - nager


Add 3 more records to each     
    - dataframe 
Show the print schema for both    
    -  dataframes
Show the use of  join in order to fetch uni    
    - que records 
Show department wise List     
    - of employees 
List of employees whose salary is less than 20000 and dgnation is “   ___”


In [10]:
#Insert 5 records in each frame
employee = [(1,'ram','M','Manager','Pune',100000,100),(2,'rama','F','Senior developer','Pune',150000,100),
            (3,'neha','F','Developer','Mumbai',80000,101),(4,'parth','M','Analyst','Pune',100000,102),(5,'raj','M','Analyst','Mumbai',80000,103)]
department = [(100,'sales'),(101,'IT'),(102,'HR'),(103,'Accounting')]

In [11]:
emp = spark.createDataFrame(employee,["eno","ename","gender","designation","city","salary","dno"])
dept = spark.createDataFrame(department,["dno","dname"])
emp.show()
dept.show()

+---+-----+------+----------------+------+------+---+
|eno|ename|gender|     designation|  city|salary|dno|
+---+-----+------+----------------+------+------+---+
|  1|  ram|     M|         Manager|  Pune|100000|100|
|  2| rama|     F|Senior developer|  Pune|150000|100|
|  3| neha|     F|       Developer|Mumbai| 80000|101|
|  4|parth|     M|         Analyst|  Pune|100000|102|
|  5|  raj|     M|         Analyst|Mumbai| 80000|103|
+---+-----+------+----------------+------+------+---+

+---+----------+
|dno|     dname|
+---+----------+
|100|     sales|
|101|        IT|
|102|        HR|
|103|Accounting|
+---+----------+



In [14]:
# display schema for both tables 

#schema for employee
emp.schema

StructType([StructField('eno', LongType(), True), StructField('ename', StringType(), True), StructField('gender', StringType(), True), StructField('designation', StringType(), True), StructField('city', StringType(), True), StructField('salary', LongType(), True), StructField('dno', LongType(), True)])

In [15]:
#schema for department
dept.schema

StructType([StructField('dno', LongType(), True), StructField('dname', StringType(), True)])

In [17]:
# Filter emp dataframe based on designation, salary respectively
emp.filter(emp["designation"]=='Analyst').show()

+---+-----+------+-----------+------+------+---+
|eno|ename|gender|designation|  city|salary|dno|
+---+-----+------+-----------+------+------+---+
|  4|parth|     M|    Analyst|  Pune|100000|102|
|  5|  raj|     M|    Analyst|Mumbai| 80000|103|
+---+-----+------+-----------+------+------+---+



In [18]:
emp.filter(emp["salary"]==100000).show()

+---+-----+------+-----------+----+------+---+
|eno|ename|gender|designation|city|salary|dno|
+---+-----+------+-----------+----+------+---+
|  1|  ram|     M|    Manager|Pune|100000|100|
|  4|parth|     M|    Analyst|Pune|100000|102|
+---+-----+------+-----------+----+------+---+



In [20]:
#Show data of departments for female employees
emp.join(dept, emp["dno"] == dept["dno"]).where(emp["gender"]=='F').show()

+---+-----+------+----------------+------+------+---+---+-----+
|eno|ename|gender|     designation|  city|salary|dno|dno|dname|
+---+-----+------+----------------+------+------+---+---+-----+
|  2| rama|     F|Senior developer|  Pune|150000|100|100|sales|
|  3| neha|     F|       Developer|Mumbai| 80000|101|101|   IT|
+---+-----+------+----------------+------+------+---+---+-----+



In [23]:
#Increase salary of employee whose designation is manager

emp.selectExpr("ename","designation","salary + 5000 as updated_sal").where(emp["designation"]=="Manager").show()

+-----+-----------+-----------+
|ename|designation|updated_sal|
+-----+-----------+-----------+
|  ram|    Manager|     105000|
+-----+-----------+-----------+



In [30]:
# add 3 more records to dataframe

new_rows = spark.createDataFrame([(6,"ekta","F","Analyst","Mumbai",80000,100),
                                  (7,"raja","M","Senior Analyst","Mumbai",80000,100),
                                  (8,"sona","F","Analyst","Mumbai",80000,101)])

In [31]:
emp_df = emp.union(new_rows)

In [32]:
emp_df.show()

+---+-----+------+----------------+------+------+---+
|eno|ename|gender|     designation|  city|salary|dno|
+---+-----+------+----------------+------+------+---+
|  1|  ram|     M|         Manager|  Pune|100000|100|
|  2| rama|     F|Senior developer|  Pune|150000|100|
|  3| neha|     F|       Developer|Mumbai| 80000|101|
|  4|parth|     M|         Analyst|  Pune|100000|102|
|  5|  raj|     M|         Analyst|Mumbai| 80000|103|
|  6| ekta|     F|         Analyst|Mumbai| 80000|100|
|  7| raja|     M|  Senior Analyst|Mumbai| 80000|100|
|  8| sona|     F|         Analyst|Mumbai| 80000|101|
+---+-----+------+----------------+------+------+---+



In [33]:
new_rows = spark.createDataFrame([(104,"aa"),(105,"bb")])

In [34]:
dept_df = dept.union(new_rows)

In [36]:
dept_df.show()

+---+----------+
|dno|     dname|
+---+----------+
|100|     sales|
|101|        IT|
|102|        HR|
|103|Accounting|
|104|        aa|
|105|        bb|
+---+----------+



In [37]:
emp_df.printSchema()

root
 |-- eno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- designation: string (nullable = true)
 |-- city: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- dno: long (nullable = true)



In [38]:
dept_df.printSchema()

root
 |-- dno: long (nullable = true)
 |-- dname: string (nullable = true)



In [39]:
#List of employees whose salary is less than 100000 and dgnation is “ Analyst”

emp_df.where(emp_df["salary"]<100000).filter(emp_df["designation"]=="Analyst").show()

+---+-----+------+-----------+------+------+---+
|eno|ename|gender|designation|  city|salary|dno|
+---+-----+------+-----------+------+------+---+
|  5|  raj|     M|    Analyst|Mumbai| 80000|103|
|  6| ekta|     F|    Analyst|Mumbai| 80000|100|
|  8| sona|     F|    Analyst|Mumbai| 80000|101|
+---+-----+------+-----------+------+------+---+



### Assignment 1
#### Question 3

Consider daily product revenue data 
Product - product_id, pname, ptype, price

Customer - cust_id, cname, mobileno, city

orders – order_id, order_date, order_customer_id, order_status

order_items –  order_item_order_id, order_item_product_id, order_item_quantity, 
order_item_subtotal


In [6]:
product_data = [
    (1, "ProductA", "Type1", 100.0),
    (2, "ProductB", "Type2", 150.0),
    (3, "ProductC", "Type1", 200.0)
]
customer_data = [
    (1, "CustomerA", "1234567890", "Pune"),
    (2, "CustomerB", "0987654321", "Mumbai"),
    (3, "CustomerC", "1122334455", "Pune")
]
orders_data = [
    (1, "2013-08-01", 1, "COMPLETE"),
    (2, "2013-08-01", 2, "CLOSED"),
    (3, "2013-09-01", 3, "PENDING")
]
order_items_data = [
    (1, 1, 2, 200.0),
    (2, 2, 1, 150.0),
    (3, 3, 3, 600.0)
]

In [7]:
products_df = spark.createDataFrame(product_data, ["product_id","product_name","p_type","price"])

In [8]:
customers_df = spark.createDataFrame(customer_data, ["cust_id","cust_name","mobile","city"])
orders_df = spark.createDataFrame(orders_data, ["order_id","order_date","order_customer_id","status"])
order_items_df = spark.createDataFrame(order_items_data, ["order_item_order_id","order_item_product_id","order_item_quantity","order_item_subtotal"])

In [9]:
products_df.show()

+----------+------------+------+-----+
|product_id|product_name|p_type|price|
+----------+------------+------+-----+
|         1|    ProductA| Type1|100.0|
|         2|    ProductB| Type2|150.0|
|         3|    ProductC| Type1|200.0|
+----------+------------+------+-----+



In [10]:
orders_df.show()

+--------+----------+-----------------+--------+
|order_id|order_date|order_customer_id|  status|
+--------+----------+-----------------+--------+
|       1|2013-08-01|                1|COMPLETE|
|       2|2013-08-01|                2|  CLOSED|
|       3|2013-09-01|                3| PENDING|
+--------+----------+-----------------+--------+



In [13]:
order_items_df.show()

+-------------------+---------------------+-------------------+-------------------+
|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|
+-------------------+---------------------+-------------------+-------------------+
|                  1|                    1|                  2|              200.0|
|                  2|                    2|                  1|              150.0|
|                  3|                    3|                  3|              600.0|
+-------------------+---------------------+-------------------+-------------------+



In [14]:
customers_df.show()

+-------+---------+----------+------+
|cust_id|cust_name|    mobile|  city|
+-------+---------+----------+------+
|      1|CustomerA|1234567890|  Pune|
|      2|CustomerB|0987654321|Mumbai|
|      3|CustomerC|1122334455|  Pune|
+-------+---------+----------+------+



In [14]:
#1. Get details of all customers from pune city
customers_df.filter(customers_df.city == "Pune").show()

+-------+---------+----------+----+
|cust_id|cust_name|    mobile|city|
+-------+---------+----------+----+
|      1|CustomerA|1234567890|Pune|
|      3|CustomerC|1122334455|Pune|
+-------+---------+----------+----+



In [17]:
#Get details of orders with subtotal > ___ in month __

from pyspark.sql.functions import month, year
value = 200  # Example value
month_number = 8  # Example for August
orders_df.join(order_items_df, orders_df.order_id == order_items_df.order_item_order_id) \
    .filter((order_items_df.order_item_subtotal > value) & (month(orders_df.order_date) == month_number)).show()

+--------+----------+-----------------+------+-------------------+---------------------+-------------------+-------------------+
|order_id|order_date|order_customer_id|status|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|
+--------+----------+-----------------+------+-------------------+---------------------+-------------------+-------------------+
+--------+----------+-----------------+------+-------------------+---------------------+-------------------+-------------------+



In [18]:
#3. Print orders in ascending order of subtotal 
order_items_df.orderBy("order_item_subtotal").show()

+-------------------+---------------------+-------------------+-------------------+
|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|
+-------------------+---------------------+-------------------+-------------------+
|                  2|                    2|                  1|              150.0|
|                  1|                    1|                  2|              200.0|
|                  3|                    3|                  3|              600.0|
+-------------------+---------------------+-------------------+-------------------+



In [19]:
#4. Print customer details with min order and max order amount
from pyspark.sql.functions import min, max
order_totals = order_items_df.groupBy("order_item_order_id").agg({"order_item_subtotal": "sum"})
customer_orders = orders_df.join(order_totals, orders_df.order_id == order_totals.order_item_order_id) \
    .groupBy("order_customer_id").agg(min("sum(order_item_subtotal)").alias("min_order"),
                                      max("sum(order_item_subtotal)").alias("max_order"))

customer_details = customers_df.join(customer_orders, customers_df.cust_id == customer_orders.order_customer_id)
customer_details.show()

+-------+---------+----------+------+-----------------+---------+---------+
|cust_id|cust_name|    mobile|  city|order_customer_id|min_order|max_order|
+-------+---------+----------+------+-----------------+---------+---------+
|      1|CustomerA|1234567890|  Pune|                1|    200.0|    200.0|
|      2|CustomerB|0987654321|Mumbai|                2|    150.0|    150.0|
|      3|CustomerC|1122334455|  Pune|                3|    600.0|    600.0|
+-------+---------+----------+------+-----------------+---------+---------+



In [21]:
#5. Get orders which are either COMPLETE or CLOSED
orders_df.filter((orders_df.status == "COMPLETE") | (orders_df.status == "CLOSED")).show()

+--------+----------+-----------------+--------+
|order_id|order_date|order_customer_id|  status|
+--------+----------+-----------------+--------+
|       1|2013-08-01|                1|COMPLETE|
|       2|2013-08-01|                2|  CLOSED|
+--------+----------+-----------------+--------+



In [23]:
#6. Get orders which are either COMPLETE or CLOSED and placed in month of 2013 August
orders_df.filter((orders_df.status == "COMPLETE") | (orders_df.status == "CLOSED") & (month(orders_df.order_date) == 8) & (year(orders_df.order_date) == 2013)).show()

+--------+----------+-----------------+--------+
|order_id|order_date|order_customer_id|  status|
+--------+----------+-----------------+--------+
|       1|2013-08-01|                1|COMPLETE|
|       2|2013-08-01|                2|  CLOSED|
+--------+----------+-----------------+--------+



In [26]:
#7. Get order items where order_item_subtotal is not equal to product of order_item_quantity and order_item_product_price
#8. Get all the orders which are placed on the first of every month
from pyspark.sql.functions import dayofmonth
orders_df.filter(dayofmonth(orders_df.order_date) == 1).show()

+--------+----------+-----------------+--------+
|order_id|order_date|order_customer_id|  status|
+--------+----------+-----------------+--------+
|       1|2013-08-01|                1|COMPLETE|
|       2|2013-08-01|                2|  CLOSED|
|       3|2013-09-01|                3| PENDING|
+--------+----------+-----------------+--------+



In [28]:
#9. Get count by status from orders
orders_df.groupBy("status").count().show()

+--------+-----+
|  status|count|
+--------+-----+
|COMPLETE|    1|
|  CLOSED|    1|
| PENDING|    1|
+--------+-----+



In [29]:
#10. Get revenue for each order id from order items
order_items_df.groupBy("order_item_order_id").agg({"order_item_subtotal": "sum"}).show()

+-------------------+------------------------+
|order_item_order_id|sum(order_item_subtotal)|
+-------------------+------------------------+
|                  1|                   200.0|
|                  2|                   150.0|
|                  3|                   600.0|
+-------------------+------------------------+



In [31]:
#11. Get daily product revenue (order_date and order_item_product_id are part of keys, order_item_subtotal is used for aggregation)
orders_df.join(order_items_df, orders_df.order_id == order_items_df.order_item_order_id) \
    .groupBy("order_date", "order_item_product_id").agg({"order_item_subtotal": "sum"}).show()

+----------+---------------------+------------------------+
|order_date|order_item_product_id|sum(order_item_subtotal)|
+----------+---------------------+------------------------+
|2013-08-01|                    2|                   150.0|
|2013-08-01|                    1|                   200.0|
|2013-09-01|                    3|                   600.0|
+----------+---------------------+------------------------+

