In [0]:
entities=['categories', 'employee_territories',  'products',  'suppliers', 'customers', 'order_details', 'regions', 'territories', 'employees',  'orders',  'shippers']
dataframes={}
#load all csv files as dataframes and create tables for them
#By this approch we can load multiples files efficently and fastly
for entity in entities:
    print("Creating DataFrames for ", entity)
    dataframes[entity]=spark.read.csv('/FileStore/tables/'+entity+'.csv', header=True,inferSchema=True)
    #dataframes[entity].cache()
    dataframes[entity].createOrReplaceTempView(entity)   
    

  
for entity in entities:
  dataframes[entity].write.format("parquet").saveAsTable(entity)
  


In [0]:
%sql
/*Write a query to get Product list (id, name, unit price) where the unit price products cost less than $20. Filter out discontinued products. */
SELECT ProductID, ProductName, UnitPrice 
from products 
WHERE Discontinued = 0 AND UnitPrice < 20;

ProductID,ProductName,UnitPrice
1,Chai,18.0
2,Chang,19.0
3,Aniseed Syrup,10.0
13,Konbu,6.0
15,Genen Shouyu,15.5
16,Pavlova,17.45
19,Teatime Chocolate Biscuits,9.2
21,Sir Rodney's Scones,10.0
23,Tunnbröd,9.0
25,NuNuCa Nuß-Nougat-Creme,14.0


In [0]:
%sql

/* Query Make a listing of all categories of products in the order of decreasing number of products in that category. Filter out discontinued products.*/
SELECT CategoryName, count(ProductId) AS count_of_products
FROM categories
JOIN products
ON categories.CategoryID=products.CategoryID
WHERE Discontinued = 0 
Group By CategoryName
ORDER BY count_of_products DESC ;


CategoryName,count_of_products
Confections,13
Seafood,12
Condiments,11
Beverages,11
Dairy Products,10
Grains/Cereals,6
Produce,4
Meat/Poultry,2


In [0]:
%sql

/* Query Make a list of customers who have not made any orders in the months of July - September. */
SELECT ContactName
FROM customers
JOIN orders
ON customers.CustomerID=orders.CustomerID
WHERE orders.CustomerID NOT IN (SELECT orders.CustomerID from orders where EXTRACT(MONTH FROM OrderDate) BETWEEN 7 AND 9);

ContactName
Guillermo Fernández
Martín Sommer
Isabel de Castro
Guillermo Fernández
Thomas Hardy
Hari Kumar
Sven Ottlieb
Ann Devon
Eduardo Saavedra
Hari Kumar


In [0]:
# CSV options
file_type = "csv"
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

#Write a query to get Product list (id, name, unit price) where products cost between $15 and $25.

# The applied options are for CSV files. For other file types, these will be ignored.
df1 = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load("/FileStore/tables/products.csv")

df1.select(df1.ProductID,df1.ProductName,df1.UnitPrice).filter((df1.UnitPrice > 15) & (df1.UnitPrice < 25)).show()

In [0]:
#Write a query to get Product list (name, unit price) of ten most expensive products.
df1.select(df1.ProductName,df1.UnitPrice).orderBy(df1.UnitPrice, ascending=False).show(10)

In [0]:
%sql
/* Give the names of employees who sell the products of more than 7 suppliers using SparkSql.*/
SELECT EmployeeID,LastName,FirstName FROM employees WHERE EmployeeID in (SELECT EmployeeID FROM (SELECT employees.EmployeeID, products.SupplierID, COUNT(*) as num_ordelines
      FROM employees JOIN
           orders
           ON employees.EmployeeID = orders.EmployeeID JOIN
           order_details
           ON orders.OrderID = order_details.OrderID JOIN
           products
           ON order_details.ProductID = products.ProductID 
      GROUP BY employees.EmployeeID, products.SupplierId
     ) GROUP BY EmployeeID
HAVING COUNT(SupplierID) > 7);

EmployeeID,LastName,FirstName
1,Davolio,Nancy
6,Suyama,Michael
3,Leverling,Janet
5,Buchanan,Steven
9,Dodsworth,Anne
4,Peacock,Margaret
8,Callahan,Laura
7,King,Robert
2,Fuller,Andrew


In [0]:

# Give the names of employees who sell the products of more than 7 suppliers using SparkSql.

# CSV options
file_type = "csv"
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df_products = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load("/FileStore/tables/products.csv")

file_type = "csv"
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df_order_details = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load("/FileStore/tables/order_details.csv")



file_type = "csv"
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df_orders = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load("/FileStore/tables/orders.csv")


file_type = "csv"
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df_employees = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load("/FileStore/tables/employees.csv")


          
df = (((df_employees.join(df_orders, on=['EmployeeID'],how='inner')).join(df_order_details, on = ['OrderID'],how="inner")).join(df_products,on=['ProductID'], how='inner'))

df2=df.select(df_employees.EmployeeID, df_products.SupplierID).groupby(df_employees.EmployeeID, df_products.SupplierID ).count()

df2.select(df_employees.EmployeeID).groupBy(df_employees.EmployeeID).agg(count(df_products.SupplierID).alias('count')).filter(column('SupplierID')>7).show()

  
  
 


EmployeeID,SupplierID,num_ordelines
3,22,5
3,15,14
7,21,4
9,10,4
4,10,2
6,20,3
9,16,2
5,16,2
6,1,11
3,1,15


In [0]:


#Write a query to get count of current and discontinued products.

from pyspark.sql.functions import count
df1.groupby(df1.Discontinued).count().show()
Discontinued_products = df1.groupby(df1.Discontinued).count().collect()[0][1]
Current_products = df1.groupby(df1.Discontinued).count().collect()[1][1]

print("Count of Current_products : ",Current_products, "  Discontinued_products : ",Discontinued_products);

In [0]:
#Write a query to get Product list (name, unit price) whose prices are above the average price.

from pyspark.sql.functions import avg
average_of_UnitPrice = df1.select(avg(df1.UnitPrice)).collect()[0][0]
print("Average of UnitPrice is ",average_of_UnitPrice)
df1.select(df1.ProductName,df1.UnitPrice).filter(df1.UnitPrice > average_of_UnitPrice).show()
