In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder \
    .appName("Baitapthuhanh2") \
    .config("spark.sql.warehouse.dir" , "C:\data\Thuc_hanh") \
    .getOrCreate()

In [2]:
orders_schema_struct = StructType([
    StructField("Row ID", LongType()),
    StructField("Order ID", StringType()),
    StructField("Order Date", DateType()),
    StructField("Ship Date", DateType()),
    StructField("Ship Mode", StringType()),
    StructField("Customer ID", StringType()),
    StructField("Customer Name", StringType()),
    StructField("Segment", StringType()),
    StructField("Country", StringType()),
    StructField("City", StringType()),
    StructField("State", StringType()),
    StructField("Postal Code", LongType()),
    StructField("Region", StringType()),
    StructField("Product ID", StringType()),
    StructField("Category", StringType()),
    StructField("Sub-Category", StringType()),
    StructField("Product Name", StringType()),
    StructField("Sales", FloatType()),
    StructField("Quantity", IntegerType()),
    StructField("Discount", FloatType()),
    StructField("Profit", FloatType()),
])

In [3]:
orders_df = spark.read \
    .format("csv") \
    .option("delimiter", ";") \
    .option("header", "true") \
    .schema(orders_schema_struct) \
    .load("c:/data/Sample_Superstore_0_converted.csv")

In [4]:
orders_df.show(12)

+------+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+
|Row ID|Order ID|Order Date|Ship Date|Ship Mode|Customer ID|Customer Name|Segment|Country|City|State|Postal Code|Region|Product ID|Category|Sub-Category|Product Name|Sales|Quantity|Discount|Profit|
+------+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+
|  NULL|    NULL|      NULL|     NULL|     NULL|       NULL|         NULL|   NULL|   NULL|NULL| NULL|       NULL|  NULL|      NULL|    NULL|        NULL|        NULL| NULL|    NULL|    NULL|  NULL|
|  NULL|    NULL|      NULL|     NULL|     NULL|       NULL|         NULL|   NULL|   NULL|NULL| NULL|       NULL|  NULL|      NULL|    NULL|        NULL|        NULL| NULL|    NULL|    NULL|  NULL|
|  NULL|  

In [5]:
orders_df.createOrReplaceTempView("orders_tmp")

In [6]:
spark.sql("select * from orders_tmp").show(5)

+------+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+
|Row ID|Order ID|Order Date|Ship Date|Ship Mode|Customer ID|Customer Name|Segment|Country|City|State|Postal Code|Region|Product ID|Category|Sub-Category|Product Name|Sales|Quantity|Discount|Profit|
+------+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+
|  NULL|    NULL|      NULL|     NULL|     NULL|       NULL|         NULL|   NULL|   NULL|NULL| NULL|       NULL|  NULL|      NULL|    NULL|        NULL|        NULL| NULL|    NULL|    NULL|  NULL|
|  NULL|    NULL|      NULL|     NULL|     NULL|       NULL|         NULL|   NULL|   NULL|NULL| NULL|       NULL|  NULL|      NULL|    NULL|        NULL|        NULL| NULL|    NULL|    NULL|  NULL|
|  NULL|  

In [7]:
orders_df.groupBy("Segment").count().show()

+-------+-----+
|Segment|count|
+-------+-----+
|   NULL| 9994|
+-------+-----+



In [8]:
spark.sql("select Segment, count(*) from orders_tmp group by Segment").show()

+-------+--------+
|Segment|count(1)|
+-------+--------+
|   NULL|    9994|
+-------+--------+



In [9]:
orders_df.groupBy("Customer Name").count().sort("count", ascending = False).show(10)

+-------------+-----+
|Customer Name|count|
+-------------+-----+
|         NULL| 9994|
+-------------+-----+



In [10]:
spark.sql("select `Customer Name`, count(*) as count from orders_tmp group by `Customer Name` order by count desc limit 10").show()


+-------------+-----+
|Customer Name|count|
+-------------+-----+
|         NULL| 9994|
+-------------+-----+



In [11]:
orders_df.select("Customer Name").distinct().count()

1

In [12]:
spark.sql("select count(distinct `Customer Name`) as numCustomer from orders_tmp").show()

+-----------+
|numCustomer|
+-----------+
|          0|
+-----------+



In [13]:
top1 = orders_df.where("Category = 'Technology'").groupBy("Customer Name").count().sort("count", ascending = False).limit(1)
top1.show()

+-------------+-----+
|Customer Name|count|
+-------------+-----+
+-------------+-----+



In [14]:
spark.sql("select `Customer Name`, count(*) as numOrders from orders_tmp where Category = 'Technology' group by `Customer Name` order by numOrders desc limit 1").show()


+-------------+---------+
|Customer Name|numOrders|
+-------------+---------+
+-------------+---------+



In [15]:
#thuchanhdataframe2

In [16]:
orders_df.createOrReplaceTempView("sales_order")
spark.sql("select * from sales_order").show(5)

+------+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+
|Row ID|Order ID|Order Date|Ship Date|Ship Mode|Customer ID|Customer Name|Segment|Country|City|State|Postal Code|Region|Product ID|Category|Sub-Category|Product Name|Sales|Quantity|Discount|Profit|
+------+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+
|  NULL|    NULL|      NULL|     NULL|     NULL|       NULL|         NULL|   NULL|   NULL|NULL| NULL|       NULL|  NULL|      NULL|    NULL|        NULL|        NULL| NULL|    NULL|    NULL|  NULL|
|  NULL|    NULL|      NULL|     NULL|     NULL|       NULL|         NULL|   NULL|   NULL|NULL| NULL|       NULL|  NULL|      NULL|    NULL|        NULL|        NULL| NULL|    NULL|    NULL|  NULL|
|  NULL|  

In [17]:
total = spark.sql("""
SELECT 
    `Order ID`,
    Sales,
    Quantity,
    Discount,
    Sales * Quantity AS Profit1,
    Sales * Quantity * (1 - COALESCE(Discount, 0)) AS Total
FROM sales_order
""")

total.show(10)

+--------+-----+--------+--------+-------+-----+
|Order ID|Sales|Quantity|Discount|Profit1|Total|
+--------+-----+--------+--------+-------+-----+
|    NULL| NULL|    NULL|    NULL|   NULL| NULL|
|    NULL| NULL|    NULL|    NULL|   NULL| NULL|
|    NULL| NULL|    NULL|    NULL|   NULL| NULL|
|    NULL| NULL|    NULL|    NULL|   NULL| NULL|
|    NULL| NULL|    NULL|    NULL|   NULL| NULL|
|    NULL| NULL|    NULL|    NULL|   NULL| NULL|
|    NULL| NULL|    NULL|    NULL|   NULL| NULL|
|    NULL| NULL|    NULL|    NULL|   NULL| NULL|
|    NULL| NULL|    NULL|    NULL|   NULL| NULL|
|    NULL| NULL|    NULL|    NULL|   NULL| NULL|
+--------+-----+--------+--------+-------+-----+
only showing top 10 rows

