In [0]:
#Source Datasets:
#Order.json=https://easyupload.io/bft757
#Customer.xlsx= https://easyupload.io/fmrx1r
#Product.csv=https://easyupload.io/9ihgl1

In [0]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
  .appName("Customer_product_App") \
  .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:3.5.0_0.20.3").getOrCreate() 
order_cols=['CustomerID',
 'Discount',
 'OrderDate',
 'OrderID',
 'Price',
 'ProductID',
 'Profit',
 'Quantity',
 'RowID',
 'ShipDate',
 'ShipMode']
df_order=spark.read.option("multiline","true").option('inferschema','true').json("/FileStore/tables/Order.json").toDF(*order_cols)

In [0]:
df_order.printSchema()

root
 |-- CustomerID: string (nullable = true)
 |-- Discount: double (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- OrderID: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- Profit: double (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- RowID: long (nullable = true)
 |-- ShipDate: string (nullable = true)
 |-- ShipMode: string (nullable = true)



In [0]:
df_order=df_order.withColumn('OrderDate',date_format(to_date('OrderDate',"d/M/yyyy"), "yyyy-MM-dd"))

In [0]:
df_order.show(3)

+----------+--------+----------+--------------+-------+---------------+-------+--------+-----+---------+--------------+
|CustomerID|Discount| OrderDate|       OrderID|  Price|      ProductID| Profit|Quantity|RowID| ShipDate|      ShipMode|
+----------+--------+----------+--------------+-------+---------------+-------+--------+-----+---------+--------------+
|  JK-15370|     0.3|2016-08-21|CA-2016-122581|573.174|FUR-CH-10002961| 63.686|       7|    1|25/8/2016|Standard Class|
|  BD-11320|     0.0|2017-09-23|CA-2017-117485| 291.96|TEC-AC-10004659|102.186|       4|    2|29/9/2017|Standard Class|
|  LB-16795|     0.7|2016-10-06|US-2016-157490|     17|OFF-BI-10002824| -14.92|       4|    3|7/10/2016|   First Class|
+----------+--------+----------+--------------+-------+---------------+-------+--------+-----+---------+--------------+
only showing top 3 rows



In [0]:
df_order.select("OrderDate").distinct().show(3)

+----------+
| OrderDate|
+----------+
|2017-09-23|
|2016-10-06|
|2016-08-21|
+----------+
only showing top 3 rows



In [0]:

customer_cols=['CustomerID',
 'CustomerName',
 'email',
 'phone',
 'address',
 'Segment',
 'Country',
 'City',
 'State',
 'PostalCode',
 'Region']
df_customer = spark.read.format("com.crealytics.spark.excel")\
  .option("header", "true") \
  .option("inferSchema", "true")\
  .load("/FileStore/tables/Customer.xlsx").toDF(*customer_cols)

In [0]:
df_customer=df_customer.withColumn('CustomerName',regexp_replace(trim(col('CustomerName')),  "[^a-zA-Z' ]",  ""))

In [0]:
df_customer.select('CustomerName').distinct().show(10,truncate=False)

+------------------+
|CustomerName      |
+------------------+
|Rob Lucas         |
|Sung Pak          |
|Ann Blume         |
|Sally Knutson     |
|Aaron Bergman     |
|Craig Reiter      |
|Matt Collins      |
|Sonia Sunley      |
|Lauren Leatherbury|
|Allen Rosenblatt  |
+------------------+
only showing top 10 rows



In [0]:
df_customer.printSchema()

root
 |-- CustomerID: string (nullable = true)
 |-- CustomerName: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- PostalCode: double (nullable = true)
 |-- Region: string (nullable = true)



In [0]:
df_customer.show(3,truncate=False)

+----------+-------------+---------------------------+--------------------+-------------------------------------------------+-----------+-------------+----------------+--------+----------+-------+
|CustomerID|CustomerName |email                      |phone               |address                                          |Segment    |Country      |City            |State   |PostalCode|Region |
+----------+-------------+---------------------------+--------------------+-------------------------------------------------+-----------+-------------+----------------+--------+----------+-------+
|PW-19240  |Pierre Wener |bettysullivan808@gmail.com |421.580.0902x9815   |001 Jones Ridges Suite 338\nJohnsonfort, FL 95462|Consumer   |United States|Louisville      |Colorado|80027.0   |West   |
|GH-14410  |Gary Hansen  |austindyer948@gmail.com    |001-542-415-0246x314|00347 Murphy Unions\nAshleyton, IA 29814         |Home Office|United States|Chicago         |Illinois|60653.0   |Central|
|KL-16555  |Kel

In [0]:
df_product.columns

['ProductID',
 'Category',
 'Sub-Category',
 'ProductName',
 'State',
 'Price_per_product']

In [0]:
prodcut_cols=['ProductID',
 'Category',
 'Sub-Category',
 'ProductName',
 'State',
 'Price_per_product']
df_product=spark.read.option("header","true").option('inferschema','true').csv("/FileStore/tables/Product.csv").toDF(*prodcut_cols)

In [0]:
df_product.show(3)

+---------------+---------------+------------+--------------------+--------+-----------------+
|      ProductID|       Category|Sub-Category|         ProductName|   State|Price_per_product|
+---------------+---------------+------------+--------------------+--------+-----------------+
|FUR-CH-10002961|      Furniture|      Chairs|Leather Task Chai...|New York|           81.882|
|TEC-AC-10004659|     Technology| Accessories|Imation Secure+ H...|Oklahoma|            72.99|
|OFF-BI-10002824|Office Supplies|     Binders|Recycled Easel Ri...|Colorado|             4.25|
+---------------+---------------+------------+--------------------+--------+-----------------+
only showing top 3 rows



In [0]:
%sql
--create schema PEI


In [0]:
df_order.write.saveAsTable('PEI.Raw_Order')
df_customer.write.saveAsTable('PEI.Raw_customer')
df_product.write.saveAsTable('PEI.Raw_product')

In [0]:
%sql

select *  from pei.Raw_Order limit 5

CustomerID,Discount,OrderDate,OrderID,Price,ProductID,Profit,Quantity,RowID,ShipDate,ShipMode
JK-15370,0.3,2016-08-21,CA-2016-122581,573.174,FUR-CH-10002961,63.686,7,1,25/8/2016,Standard Class
BD-11320,0.0,2017-09-23,CA-2017-117485,291.96,TEC-AC-10004659,102.186,4,2,29/9/2017,Standard Class
LB-16795,0.7,2016-10-06,US-2016-157490,17.0,OFF-BI-10002824,-14.92,4,3,7/10/2016,First Class
KB-16315,0.2,2015-07-02,CA-2015-111703,15.552,OFF-PA-10003349,5.6376,3,4,9/7/2015,Standard Class
DO-13435,0.2,2014-10-03,CA-2014-108903,142.488,TEC-AC-10003023,-3.0,3,5,3/10/2014,Same Day


In [0]:
%sql
select * from pei.Raw_customer limit 5

CustomerID,CustomerName,email,phone,address,Segment,Country,City,State,PostalCode,Region
PW-19240,Pierre Wener,bettysullivan808@gmail.com,421.580.0902x9815,"001 Jones Ridges Suite 338 Johnsonfort, FL 95462",Consumer,United States,Louisville,Colorado,80027.0,West
GH-14410,Gary Hansen,austindyer948@gmail.com,001-542-415-0246x314,"00347 Murphy Unions Ashleyton, IA 29814",Home Office,United States,Chicago,Illinois,60653.0,Central
KL-16555,Kelly Lampkin,clarencehughes280@gmail.com,7185624866,"007 Adams Lane Suite 176 East Amyberg, IN 34581",Corporate,United States,Colorado Springs,Colorado,80906.0,West
AH-10075,Ad am Hart,angelabryant256@gmail.com,265.101.5569x1098,"01454 Christopher Turnpike North Ryanstad, MI 36226",Corporate,United States,Columbus,Ohio,43229.0,East
PF-19165,Philip Fox,kristinereynolds576@gmail.com,001-473-645-2141x9154,"0158 Harris Ways Suite 085 East Laceyside, SD 35649",Consumer,United States,San Diego,California,92105.0,West


In [0]:
%sql
select * from pei.Raw_product limit 5

ProductID,Category,Sub-Category,ProductName,State,Price_per_product
FUR-CH-10002961,Furniture,Chairs,"Leather Task Chair, Black",New York,81.882
TEC-AC-10004659,Technology,Accessories,Imation Secure+ Hardware Encrypted USB 2.0 Flash Drive; 16GB,Oklahoma,72.99
OFF-BI-10002824,Office Supplies,Binders,Recycled Easel Ring Binders,Colorado,4.25
OFF-PA-10003349,Office Supplies,Paper,Xerox 1957,Florida,5.184
TEC-AC-10003023,Technology,Accessories,Logitech G105 Gaming Keyboard,Ohio,47.496


In [0]:
df=spark.sql('''select distinct o.*, p.Category, p.`Sub-Category`,p.ProductName,p.State,p.Price_per_product from pei.raw_order o  left join pei.raw_product p on o.ProductID=p.ProductID
order by o.CustomerID''')

In [0]:
df.write.saveAsTable('PEI.ProductOrdered')

In [0]:
%sql
select * from  PEI.ProductOrdered limit 5

CustomerID,Discount,OrderDate,OrderID,Price,ProductID,Profit,Quantity,RowID,ShipDate,ShipMode,Category,Sub-Category,ProductName,State,Price_per_product
AA-10315,0.2,2016-03-03,CA-2016-103982,41.72,TEC-AC-10002857,5.7365,7,2273,8/3/2016,Standard Class,Technology,Accessories,"Verbatim 25 GB 6x Blu-ray Single Layer Recordable Disc, 1/Pack",Texas,5.96
AA-10315,0.0,2017-06-29,CA-2017-147039,362.94,OFF-AP-10000576,90.735,3,2024,4/7/2017,Standard Class,Office Supplies,Appliances,"Belkin 325VA UPS Surge Protector, 6'",Florida,96.75
AA-10315,0.0,2014-03-31,CA-2014-128055,52.98,OFF-AP-10002765,14.8344,2,7528,5/4/2014,Standard Class,Office Supplies,Appliances,Fellowes Advanced Computer Series Surge Protectors,New Jersey,26.49
AA-10315,0.0,2015-10-04,CA-2015-121391,26.96,OFF-ST-10001590,7.0,2,8011,7/10/2015,First Class,Office Supplies,Storage,"Tenex Personal Project File with Scoop Front Design, Black",Indiana,13.48
AA-10315,0.0,2017-06-29,CA-2017-147039,362.94,OFF-AP-10000576,90.735,3,2024,4/7/2017,Standard Class,Office Supplies,Appliances,Belkin 7 Outlet SurgeMaster II,Washington,39.48


In [0]:
df_enrich=spark.sql('''
select distinct c.CustomerName,c.Country,round(o.Profit,2) as Profit, p.Category, p.`Sub-Category` from pei.raw_order o  inner join pei.raw_product p on o.ProductID=p.ProductID
inner join pei.raw_customer c on o.CustomerID=c.CustomerID where c.CustomerName is not null
order by c.CustomerName''')

In [0]:
df_enrich.write.saveAsTable('PEI.CustomerOrderProfit')

In [0]:
df_avg_table=spark.sql('''SELECT Year(OrderDate) as year, round(avg(profit),2) AS AverageProfit , round(min(profit),2) AS MinProfit,round(max(profit),2) as MaxProfit, round(sum(profit),2) as TotalProfit,Category ,`Sub-Category`,customerId 
FROM pei.productordered  
GROUP BY  Year(OrderDate),`Sub-Category`,Category,customerId  ORDER BY Year(OrderDate),Category,`Sub-Category`,customerId; 

''')

In [0]:
df_avg_table.write.saveAsTable('PEI.AggProfit')

In [0]:

%sql
select * from pei.AggProfit limit 5

year,AverageProfit,MinProfit,MaxProfit,TotalProfit,Category,Sub-Category,customerId
2014,7.0,7.0,7.0,7.0,,,AA-10315
2014,121.44,121.44,121.44,121.44,,,AH-10030
2014,-57.3,-120.37,5.77,-114.6,,,BK-11260
2014,3.95,3.95,3.95,3.95,,,BS-11380
2014,4.14,3.0,5.8,12.42,,,CD-12790


In [0]:
%sql
--total profit per year
select year,round(sum(totalprofit),2) as total_pofit from pei.Aggprofit group by year

year,total_pofit
2015,66289.53
2014,41498.55
2016,68565.77
2017,127664.0


In [0]:
%sql
--profit per year+ plus category 
select year,category,sum(totalprofit) as total_pofit from pei.Aggprofit group by year,Category order by year,Category


year,category,total_pofit
2014,,523.12
2014,Furniture,-5174.720000000001
2014,Office Supplies,22663.910000000003
2014,Technology,23486.24
2015,,583.17
2015,Furniture,3392.119999999997
2015,Office Supplies,25490.400000000023
2015,Technology,36823.84
2016,,404.44999999999993
2016,Furniture,7750.209999999992


In [0]:
%sql
--profit per customer
select customerId,round(sum(totalprofit),2) as total_pofit from pei.Aggprofit a group by customerId order by customerId limit 5

customerId,total_pofit
AA-10315,-266.4
AA-10375,277.4
AA-10480,445.96
AA-10645,857.8
AB-10015,129.67


In [0]:
%sql
--profit customer per year
select customerId,year,round(sum(totalprofit),2) as total_pofit from pei.Aggprofit a group by customerId,year order by customerId limit 5

customerId,year,total_pofit
AA-10315,2014,280.68
AA-10315,2016,-747.09
AA-10315,2015,7.0
AA-10315,2017,193.01
AA-10375,2014,16.71
