In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
spark = SparkSession.builder \
    .appName("Baitapthuhanh2") \
    .config("spark.sql.warehouse.dir" , "D:/New-folder-1") \
    .getOrCreate()

In [6]:
orders_schema_struct = StructType([
    StructField("LYLTY_CARD_NBR", LongType()),
    StructField("DATE", DateType()),
    StructField("STORE_NBR", LongType()),
    StructField("TXN_ID", LongType()),
    StructField("PROD_NBR", LongType()),
    StructField("PROD_NAME", StringType()),
    StructField("PROD_QTY", LongType()),
    StructField("TOT_SALES", FloatType()),
    StructField("PACK_SIZE", LongType()),
    StructField("BRAND", StringType()),
    StructField("LIFESTAGE", StringType()),
    StructField("PREMIUM_CUSTOMER", StringType()),
])

In [14]:
df = spark.read \
    .format("csv") \
    .option("delimiter", ",") \
    .option("header", "true") \
    .schema(orders_schema_struct) \
    .load("D:/New-folder-1/QVI_data.csv")

In [15]:
df.show(12)

+--------------+----------+---------+------+--------+--------------------+--------+---------+---------+----------+--------------------+----------------+
|LYLTY_CARD_NBR|      DATE|STORE_NBR|TXN_ID|PROD_NBR|           PROD_NAME|PROD_QTY|TOT_SALES|PACK_SIZE|     BRAND|           LIFESTAGE|PREMIUM_CUSTOMER|
+--------------+----------+---------+------+--------+--------------------+--------+---------+---------+----------+--------------------+----------------+
|          1000|2018-10-17|        1|     1|       5|Natural Chip     ...|       2|      6.0|      175|   NATURAL|YOUNG SINGLES/COU...|         Premium|
|          1002|2018-09-16|        1|     2|      58|Red Rock Deli Chi...|       1|      2.7|      150|       RRD|YOUNG SINGLES/COU...|      Mainstream|
|          1003|2019-03-07|        1|     3|      52|Grain Waves Sour ...|       1|      3.6|      210|   GRNWVES|      YOUNG FAMILIES|          Budget|
|          1003|2019-03-08|        1|     4|     106|Natural ChipCo   ...|       1

In [16]:
df.createOrReplaceTempView("orders_tmp")

In [17]:
spark.sql("select * from orders_tmp").show(5)

+--------------+----------+---------+------+--------+--------------------+--------+---------+---------+----------+--------------------+----------------+
|LYLTY_CARD_NBR|      DATE|STORE_NBR|TXN_ID|PROD_NBR|           PROD_NAME|PROD_QTY|TOT_SALES|PACK_SIZE|     BRAND|           LIFESTAGE|PREMIUM_CUSTOMER|
+--------------+----------+---------+------+--------+--------------------+--------+---------+---------+----------+--------------------+----------------+
|          1000|2018-10-17|        1|     1|       5|Natural Chip     ...|       2|      6.0|      175|   NATURAL|YOUNG SINGLES/COU...|         Premium|
|          1002|2018-09-16|        1|     2|      58|Red Rock Deli Chi...|       1|      2.7|      150|       RRD|YOUNG SINGLES/COU...|      Mainstream|
|          1003|2019-03-07|        1|     3|      52|Grain Waves Sour ...|       1|      3.6|      210|   GRNWVES|      YOUNG FAMILIES|          Budget|
|          1003|2019-03-08|        1|     4|     106|Natural ChipCo   ...|       1

In [18]:
df.groupBy("PREMIUM_CUSTOMER").count().show()

+----------------+------+
|PREMIUM_CUSTOMER| count|
+----------------+------+
|         Premium| 69689|
|          Budget| 93157|
|      Mainstream|101988|
+----------------+------+



In [19]:
spark.sql("select PREMIUM_CUSTOMER, count(*) from orders_tmp group by PREMIUM_CUSTOMER").show()

+----------------+--------+
|PREMIUM_CUSTOMER|count(1)|
+----------------+--------+
|         Premium|   69689|
|          Budget|   93157|
|      Mainstream|  101988|
+----------------+--------+



In [24]:
df.groupBy("PROD_NAME") \
  .agg(F.sum("PROD_QTY").alias("total_quantity")) \
  .sort("total_quantity", ascending=False) \
  .show(10)

+--------------------+--------------+
|           PROD_NAME|total_quantity|
+--------------------+--------------+
|Kettle Mozzarella...|          6381|
|Kettle Tortilla C...|          6309|
|Cobs Popd Sea Sal...|          6277|
|Cobs Popd Swt/Chl...|          6256|
|Tostitos Splash O...|          6234|
|Tyrrells Crisps  ...|          6227|
|Kettle 135g Swt P...|          6212|
|Infuzions Thai Sw...|          6206|
|Thins Potato Chip...|          6185|
|Doritos Corn Chip...|          6180|
+--------------------+--------------+
only showing top 10 rows



In [26]:
spark.sql("select `PROD_NAME`, SUM(PROD_QTY) as total_quantity from orders_tmp group by `PROD_NAME` order by total_quantity desc limit 10").show()


+--------------------+--------------+
|           PROD_NAME|total_quantity|
+--------------------+--------------+
|Kettle Mozzarella...|          6381|
|Kettle Tortilla C...|          6309|
|Cobs Popd Sea Sal...|          6277|
|Cobs Popd Swt/Chl...|          6256|
|Tostitos Splash O...|          6234|
|Tyrrells Crisps  ...|          6227|
|Kettle 135g Swt P...|          6212|
|Infuzions Thai Sw...|          6206|
|Thins Potato Chip...|          6185|
|Doritos Corn Chip...|          6180|
+--------------------+--------------+



In [27]:
df.select("PROD_NAME").distinct().count()

114

In [28]:
spark.sql("select count(distinct `PROD_NAME`) as numPROD_NAME from orders_tmp").show()

+------------+
|numPROD_NAME|
+------------+
|         114|
+------------+



In [13]:
top1 = df.where("Category = 'Technology'").groupBy("Customer Name").count().sort("count", ascending = False).limit(1)
top1.show()

+-------------+-----+
|Customer Name|count|
+-------------+-----+
+-------------+-----+



In [14]:
spark.sql("select `Customer Name`, count(*) as numOrders from orders_tmp where Category = 'Technology' group by `Customer Name` order by numOrders desc limit 1").show()


+-------------+---------+
|Customer Name|numOrders|
+-------------+---------+
+-------------+---------+



In [15]:
#thuchanhdataframe2

In [16]:
orders_df.createOrReplaceTempView("sales_order")
spark.sql("select * from sales_order").show(5)

+------+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+
|Row ID|Order ID|Order Date|Ship Date|Ship Mode|Customer ID|Customer Name|Segment|Country|City|State|Postal Code|Region|Product ID|Category|Sub-Category|Product Name|Sales|Quantity|Discount|Profit|
+------+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+
|  NULL|    NULL|      NULL|     NULL|     NULL|       NULL|         NULL|   NULL|   NULL|NULL| NULL|       NULL|  NULL|      NULL|    NULL|        NULL|        NULL| NULL|    NULL|    NULL|  NULL|
|  NULL|    NULL|      NULL|     NULL|     NULL|       NULL|         NULL|   NULL|   NULL|NULL| NULL|       NULL|  NULL|      NULL|    NULL|        NULL|        NULL| NULL|    NULL|    NULL|  NULL|
|  NULL|  

In [17]:
total = spark.sql("""
SELECT 
    `Order ID`,
    Sales,
    Quantity,
    Discount,
    Sales * Quantity AS Profit1,
    Sales * Quantity * (1 - COALESCE(Discount, 0)) AS Total
FROM sales_order
""")

total.show(10)

+--------+-----+--------+--------+-------+-----+
|Order ID|Sales|Quantity|Discount|Profit1|Total|
+--------+-----+--------+--------+-------+-----+
|    NULL| NULL|    NULL|    NULL|   NULL| NULL|
|    NULL| NULL|    NULL|    NULL|   NULL| NULL|
|    NULL| NULL|    NULL|    NULL|   NULL| NULL|
|    NULL| NULL|    NULL|    NULL|   NULL| NULL|
|    NULL| NULL|    NULL|    NULL|   NULL| NULL|
|    NULL| NULL|    NULL|    NULL|   NULL| NULL|
|    NULL| NULL|    NULL|    NULL|   NULL| NULL|
|    NULL| NULL|    NULL|    NULL|   NULL| NULL|
|    NULL| NULL|    NULL|    NULL|   NULL| NULL|
|    NULL| NULL|    NULL|    NULL|   NULL| NULL|
+--------+-----+--------+--------+-------+-----+
only showing top 10 rows

