# This notebook about the assigmnet 2.<br>
It consists of 4 solutions.

### Section 1: Environment setup and data loading.
* Done (make a account and load data -> code in the loading_dataset.ipynb file)

### Section 2: Analyzing the data model.
Question : <br>

1. Due to some changes in the business processes and market forces, the company has decided to change the categories of various products. Your manager has asked you to analyse the impact on the database. Specifically, he is concerned if existing orders will be impacted. He is also worried about what impact it may have on the downstream data warehouse.

  a. Please go through the ER diagram (northwind-er-relationship.png), understand the relationships between the entities

  b. Identify the entities/attributes that will be impacted.

  c. Answer your manager’s question regarding rows in the order table and the downstream data warehouse.

Solution : 

Suppose we are deciding to change the categories of the various products, then it will have their impact on the database and real life scenario as well.<br>

* How on Database : <br>
   - If the values of categories changes then categoryID inherited by Products table can cause some error, because our database generally follows referential intergrity constraints.
* How on sales : <br>
   - In the sales point of view it might have their impact on the sales because let's suppose we have a comb and we catergorise that as a plastic product and we sale them accordingly but now if we change it into cosmetic product which looks best fit for the product, So chances of sales is high.

### Section 3: Working with data using Spark SQL.

Question : <br>

1. For this task you must use Spark SQL and create a separate dataframe for each of the below tasks: <br>

   a. By mistake some data has been duplicated in the product table and is causing issues with order reconciliation. You have been asked to     check the data and remove the duplicates. Create a dataframe (using Spark SQL) contained de-duped rows from theproduct table.

In [6]:
%sql
select p.ProductID, p.ProductName, count(*)
from products_csv as p
group by p.ProductName, p.ProductID
having count(*)>1;

ProductID,ProductName,count(1)
61,Sirop d'érable,2
56,Gnocchi di nonna Alice,2
70,Outback Lager,2
3,Aniseed Syrup,2
41,Jack's New England Clam Chowder,2


b. The company wants to recognize employees who are doing a great job pushing more orders. Create a dataframe of employees (display their Id, Name) and their total sales. Filter out only employees that have sold more than 70 products. Order the employees in descending order of sales.<br>

    i. Sales Amount needs to be calculated based on ordered quantity, unit price and discount.

In [8]:
%sql
select emp.FirstName, emp.LastName, emp.EmployeeID, sale.sales
from employees_csv as emp inner join 
(  select e.EmployeeID, sum(temp.price) as sales
   from orders_csv as e inner join
      ( select OrderID, sum(UnitPrice * Quantity - Discount/ (UnitPrice * Quantity) *100)  as price from order_details_csv 
        group by OrderID
      ) as temp
      on temp.OrderID=e.OrderID
      group by e.EmployeeID
      having count(*)>70
) as sale 
  on sale.EmployeeID= emp.EmployeeID
  order by sale.sales desc;

FirstName,LastName,EmployeeID,sales
Margaret,Peacock,4,250174.79180101003
Janet,Leverling,3,213043.94145117849
Nancy,Davolio,1,202132.88312733785
Andrew,Fuller,2,177744.18629881673
Robert,King,7,141290.57678761
Laura,Callahan,8,133292.17817489768


### Section 4: Working with data using Spark API.
Question :

1. For this task you must use Spark API and create a separate dataframe for each of the below tasks: <br>

    a. Some customer orders are not getting processed and are getting delayed due to insufficient stock. Your manager asks you to identify the suppliers who supply the  products that have insufficient stock. Create the necessary dataframes for your analysis. The final dataframe should have a record for each product (that is out of stock) and an array of Supplier names that supply the product.

In [10]:
%sql
select p.SupplierID, s.ContactName as SupplierName, p.ProductID, p.ProductName
from products_csv as p inner join suppliers_csv as s
on p.SupplierID=s.SupplierID
where p.UnitsInStock=0;

SupplierID,SupplierName,ProductID,ProductName
2,Shelley Burke,5,Chef Anton's Gumbo Mix
7,Ltd.,17,Alice Mutton
12,Martin Bein,29,Thüringer Rostbratwurst
14,Elio Rossi,31,Gorgonzola Telino
24,Mate,53,Perth Pasties


In [11]:
from pyspark.sql import SQLContext

file_location = "/FileStore/tables/suppliers.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
suppliers = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

file_location = "/FileStore/tables/products.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
products = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

In [12]:
suppliers.toPandas().head()

Unnamed: 0,SupplierID,CompanyName,ContactName,ContactTitle,Address,City,Region,PostalCode,Country,Phone,Fax,HomePage
0,1,Exotic Liquids,Charlotte Cooper,Purchasing Manager,49 Gilbert St.,London,,EC1 4SD,UK,(171) 555-2222,,
1,2,New Orleans Cajun Delights,Shelley Burke,Order Administrator,P.O. Box 78934,New Orleans,LA,70117,USA,(100) 555-4822,,#CAJUN.HTM#
2,3,Grandma Kelly's Homestead,Regina Murphy,Sales Representative,707 Oxford Rd.,Ann Arbor,MI,48104,USA,(313) 555-5735,(313) 555-3349,
3,4,Tokyo Traders,Yoshi Nagase,Marketing Manager,9-8 Sekimai Musashino-shi,Tokyo,,100,Japan,(03) 3555-5011,,
4,5,Cooperativa de Quesos 'Las Cabras',Antonio del Valle Saavedra,Export Administrator,Calle del Rosal 4,Oviedo,Asturias,33007,Spain,(98) 598 76 54,,


In [13]:
products.show(5)

In [14]:
supplier_with_zero_stock = products[products["UnitsInStock"]==0]["ProductID", "ProductName", "SupplierID"]
final_suppliers = suppliers.join(supplier_with_zero_stock, on="SupplierID")["SupplierID", "ContactName","ProductID", "ProductName"]

In [15]:
final_suppliers.show()

Question : <br>
b. Your manager has asked you to identify our most valuable customers. You asked him how do you define the most valuable customers. He suggested a couple of different approaches – by the number of orders, by total order value. He believes that the both approaches should give similar results. Create the top 10 customers based on both approaches and verify how many common customers are present.

In [17]:
from pyspark.sql import SQLContext

file_location = "/FileStore/tables/customers.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
customers = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

file_location = "/FileStore/tables/orders.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
orders = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

file_location = "/FileStore/tables/order_details.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
order_details = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

In [18]:
temp = customers.join(orders, on="CustomerID")["CustomerID", "ContactName", "OrderID"]
temp = temp.join(order_details, on="OrderID")
temp = temp.withColumn("Price" ,temp["UnitPrice"]*temp["Quantity"] - temp["Discount"]/temp["Quantity"]*100)
temp = temp.drop(*["UnitPrice", "Quantity", "Discount"])
temp.show(5)

In [19]:
t1 = temp.groupBy("CustomerID").sum("Price").toPandas().sort_values("sum(Price)", ascending=False).iloc[:10]
t2 = temp.groupby("CustomerID").count().toPandas().sort_values("count", ascending=False).iloc[:10]

In [20]:
import pandas as pd
final_dataframe = pd.merge(t1, t2, how ='inner', on ="CustomerID") 
final_dataframe

Unnamed: 0,CustomerID,sum(Price),count
0,QUICK,117465.809834,86
1,SAVEA,115635.206878,116
2,ERNSH,113209.014551,102
3,HUNGO,57280.094138,55
4,RATTC,52206.257341,71
5,FOLKO,32523.115476,45


This is showing that out of 10 customers 6 have the property that they contribute in highest sales and buy most number of products as well.