#About Dataset
- The growth of supermarkets in most populated cities are increasing and market competitions are also high. The dataset is one of the historical sales of supermarket company which has recorded in 3 different branches for 3 months data. Predictive data analytics methods are easy to apply with this dataset.

#Attribute information
  - Invoice id: Computer generated sales slip invoice identification number
  - Branch: Branch of supercenter (3 branches are available identified by A, B and C).
  - City: Location of supercenters
  - Customer type: Type of customers, recorded by Members for customers using member card and Normal for without member card.
  - Gender: Gender type of customer
  - Product line: General item categorization groups - Electronic accessories, Fashion accessories, Food and beverages, Health and beauty, Home and lifestyle, Sports and travel
  - Unit price: Price of each product in $
  - Quantity: Number of products purchased by customer
  - Tax: 5% tax fee for customer buying
  - Total: Total price including tax
  - Date: Date of purchase (Record available from January 2019 to March 2019)
  - Time: Purchase time (10am to 9pm)
  - Payment: Payment used by customer for purchase (3 methods are available – Cash, Credit card and Ewallet)
  - COGS: Cost of goods sold
  - Gross margin percentage: Gross margin percentage
  - Gross income: Gross income
  - Rating: Customer stratification rating on their overall shopping experience (On a scale of 1 to 10)

#Question:
  * Câu 1: Số lượng sản phẩm bán trong tháng của mỗi cửa hàng
  * Câu 2: Số lượng sản phẩm bán trong ngày 5/1/2019 là bao nhiêu và những sản phẩm nào được bán của mỗi cửa hàng
  * Câu 3: Sản phẩm nào được bán nhiều nhất trong 3 tháng
  * Câu 4: Tổng số lượng mỗi cách thức thanh toán 
  * Câu 5: Tổng tiền bán được mỗi cửa hàng

In [0]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# from pyspark.sql import substring

In [0]:
spark = SparkSession \
        .builder \
        .master("spark://spark-master:7077") \
        .appName("spark_sql_supermarket") \
        .getOrCreate()

In [0]:
df = spark.read.format("csv") \
           .option("header", "true") \
           .option("inferSchema", 'True') \
           .load("dbfs:/FileStore/shared_uploads/19447201.iuh@gmail.com/supermarket_sales___Sheet1-1.csv")
df.printSchema()

root
 |-- Invoice ID: string (nullable = true)
 |-- Branch: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Customer type: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Product line: string (nullable = true)
 |-- Unit price: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Tax 5: double (nullable = true)
 |-- Total: double (nullable = true)
 |-- Date: date (nullable = true)
 |-- Time: timestamp (nullable = true)
 |-- Payment: string (nullable = true)
 |-- cogs: double (nullable = true)
 |-- gross margin percentage: double (nullable = true)
 |-- gross income: double (nullable = true)
 |-- Rating: double (nullable = true)



In [0]:
df_rename = df.select([F.col(col).alias(col.replace(' ', '_')) for col in df.columns])

In [0]:
df_rename.show()

+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+----------+-------------------+-----------+------+-----------------------+------------+------+
| Invoice_ID|Branch|     City|Customer_type|Gender|        Product_line|Unit_price|Quantity|  Tax_5|   Total|      Date|               Time|    Payment|  cogs|gross_margin_percentage|gross_income|Rating|
+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+----------+-------------------+-----------+------+-----------------------+------------+------+
|750-67-8428|     A|   Yangon|       Member|Female|   Health and beauty|     74.69|       7|26.1415|548.9715|2019-01-05|2023-02-26 13:08:00|    Ewallet|522.83|            4.761904762|     26.1415|   9.1|
|226-31-3081|     C|Naypyitaw|       Normal|Female|Electronic access...|     15.28|       5|   3.82|   80.22|2019-03-08|2023-02-26 10:29:00|       Cash|  76.4|            4.761904762| 

In [0]:
df_rename.createOrReplaceTempView("supermarket")

#CÂU 1: Số lượng sản phẩm bán trong tháng của mỗi cửa hàng

##Spark sử dụng SQL query

In [0]:
#Cách 1
spark.sql(" \
            SELECT Branch, Month, SUM(Quantity)\
            FROM \
                  ( \
                    SELECT Branch, Quantity, SUBSTRING(CAST(Date AS VARCHAR(11)), -5, 2) AS Month \
                    FROM supermarket \
                  ) a \
            GROUP BY Branch, Month \
            ORDER BY Branch, Month\
          ").show()

+------+-----+-------------+
|Branch|Month|sum(Quantity)|
+------+-----+-------------+
|     A|   01|          685|
|     A|   02|          493|
|     A|   03|          681|
|     B|   01|          600|
|     B|   02|          624|
|     B|   03|          596|
|     C|   01|          680|
|     C|   02|          537|
|     C|   03|          614|
+------+-----+-------------+



In [0]:
#Cách 2

spark.sql(" \
            SELECT Branch, \
                   SUM(case when Month = 01 then Quantity else null end) as Jan, \
                   SUM(case when Month = 02 then Quantity else null end) as Feb, \
                   SUM(case when Month = 03 then Quantity else null end) as Mar \
            FROM \
                  ( \
                    SELECT Branch, Quantity, SUBSTRING(CAST(Date AS VARCHAR(11)), -5, 2) AS Month \
                    FROM supermarket \
                  ) \
            GROUP BY Branch \
            ORDER BY Branch\
          ").show()

# spark.sql(" \
#             SELECT *\
#             FROM \
#                   ( \
#                     SELECT Branch, Quantity, SUBSTRING(CAST(Date AS VARCHAR(11)), -5, 2) AS Month \
#                     FROM supermarket \
#                   ) AS a \
#             PIVOT ( \
#                       SUM(Quantity) \
#                       FOR Month in ((01),(02),(03)) \
#                   ) \
#           ").show()

+------+---+---+---+
|Branch|Jan|Feb|Mar|
+------+---+---+---+
|     A|685|493|681|
|     B|600|624|596|
|     C|680|537|614|
+------+---+---+---+



## Spark SQL (không dùng SQL query)

In [0]:
df_1 = df_rename.select(["Branch", "Quantity", "Date"]) \
                .withColumn("Date", F.substring("Date",-5,2)) \
                .withColumnRenamed("Date","Month")
df_final_1 = df_1.groupBy(["Branch", "Month"]).sum("Quantity").sort("Branch","Month")
df_final_1.show()

+------+-----+-------------+
|Branch|Month|sum(Quantity)|
+------+-----+-------------+
|     A|   01|          685|
|     A|   02|          493|
|     A|   03|          681|
|     B|   01|          600|
|     B|   02|          624|
|     B|   03|          596|
|     C|   01|          680|
|     C|   02|          537|
|     C|   03|          614|
+------+-----+-------------+



## Câu 2: Số lượng sản phẩm bán trong ngày 5/1/2019 là bao nhiêu và những sản phẩm nào được bán của mỗi cửa hàng

##Spark SQL query

In [0]:
spark.sql(" \
            SELECT Branch, DATE, SUM(Quantity) AS Total, COLLECT_SET(Product_line) AS Product_sell \
            FROM supermarket \
            WHERE DATE = '2019-01-05' \
            GROUP BY Branch, DATE \
            ORDER BY Branch \
          ").show(truncate=False)

+------+----------+-----+---------------------------------------------------------------+
|Branch|DATE      |Total|Product_sell                                                   |
+------+----------+-----+---------------------------------------------------------------+
|A     |2019-01-05|27   |[Electronic accessories, Health and beauty, Home and lifestyle]|
|B     |2019-01-05|11   |[Fashion accessories, Home and lifestyle, Food and beverages]  |
|C     |2019-01-05|17   |[Fashion accessories, Health and beauty]                       |
+------+----------+-----+---------------------------------------------------------------+



## Spark SQL (không dùng SQL query)

In [0]:
df_2 = df_rename.select(['Branch', 'Date', 'Quantity', 'Product_line']) \
                .filter("Date = '2019-01-05' ") \
                .groupBy("Branch","Date") \
                .agg(F.sum('Quantity').alias('Total'), F.collect_set('Product_line').alias('Product_sell')) \
                .sort(F.col('Branch').asc())
df_2.show(truncate=False)

+------+----------+-----+---------------------------------------------------------------+
|Branch|Date      |Total|Product_sell                                                   |
+------+----------+-----+---------------------------------------------------------------+
|A     |2019-01-05|27   |[Electronic accessories, Health and beauty, Home and lifestyle]|
|B     |2019-01-05|11   |[Fashion accessories, Home and lifestyle, Food and beverages]  |
|C     |2019-01-05|17   |[Fashion accessories, Health and beauty]                       |
+------+----------+-----+---------------------------------------------------------------+



## Câu 3: Sản phẩm nào được bán nhiều nhất trong 3 tháng

## SPARK SQL QUERY

In [0]:
#MONTH(DATE) lấy ra tháng trong cột Date
spark.sql(" \
              SELECT Product_line, SUM(Quantity) AS Total\
              FROM supermarket \
              GROUP BY Product_line \
              HAVING Total >= MAX(Total)\
          ").show(truncate=False)

+----------------------+-----+
|Product_line          |Total|
+----------------------+-----+
|Electronic accessories|971  |
+----------------------+-----+



## Spark SQL (không dùng SQL query)

In [0]:
df_3 = df_rename.select(['Product_line','Quantity']) \
                .groupBy('Product_line')\
                .agg(F.sum('Quantity').alias('Total'))

df_3.show()

+--------------------+-----+
|        Product_line|Total|
+--------------------+-----+
|  Home and lifestyle|  911|
| Fashion accessories|  902|
|   Health and beauty|  854|
|Electronic access...|  971|
|  Food and beverages|  952|
|   Sports and travel|  920|
+--------------------+-----+



In [0]:
#max_total
max_total = df_3.agg(F.max('Total')).collect()[0][0]
max_total

Out[122]: 971

In [0]:
df_final_3 = df_3.filter(F.col('Total') >= max_total)
                     
df_final_3.show(truncate=False)

+----------------------+-----+
|Product_line          |Total|
+----------------------+-----+
|Electronic accessories|971  |
+----------------------+-----+



## Câu 4: Tổng số lượng mỗi cách thức thanh toán

## SPARK SQL QUERY

In [0]:
spark.sql(" \
            SELECT Payment, COUNT(Payment)\
            FROM supermarket \
            GROUP BY Payment \
          ").show()

+-----------+--------------+
|    Payment|count(Payment)|
+-----------+--------------+
|    Ewallet|           345|
|       Cash|           344|
|Credit card|           311|
+-----------+--------------+



## Spark SQL (không dùng SQL query)

In [0]:
df_4 = df_rename.select('Payment').groupBy('Payment').agg(F.count('Payment'))
df_4.show()

+-----------+--------------+
|    Payment|count(Payment)|
+-----------+--------------+
|    Ewallet|           345|
|       Cash|           344|
|Credit card|           311|
+-----------+--------------+



## Câu 5: Tổng tiền bán được mỗi cửa hàng

## SPARK SQL QUERY

In [0]:
spark.sql(" \
            SELECT Branch, ROUND(SUM(Total - Tax_5),0) as Total_sell\
            FROM supermarket \
            GROUP BY Branch \
          ").show()

+------+----------+
|Branch|Total_sell|
+------+----------+
|     B|  101141.0|
|     C|  105304.0|
|     A|  101143.0|
+------+----------+



## Spark SQL (không dùng SQL query)

In [0]:
df_5 = df_rename.select(['Branch', 'Total', 'Tax_5']) \
                .groupBy('Branch') \
                 .agg(F.round(F.sum(F.col('Total') - F.col('Tax_5')), 0).alias("Total_sell"))
df_5.show()

+------+----------+
|Branch|Total_sell|
+------+----------+
|     B|  101141.0|
|     C|  105304.0|
|     A|  101143.0|
+------+----------+

