In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

conf = SparkConf().set("spark.driver.bindAddress", "127.0.0.1")

spark = SparkSession.builder \
    .config(conf=conf) \
    .appName("my_project") \
    .master("local[*]") \
    .getOrCreate()


In [None]:

df = spark.read.option("header", True).option("multiLine", True).option("inferSchema", True).csv("Data_Raw.csv")

In [6]:
from pyspark.sql.functions import regexp_replace, col, trim

In [7]:
df_cleaned = df.withColumn("Ghi Chú",regexp_replace(col("Ghi Chú"), r"\s/\s", " "))

In [8]:
df_cleaned1 = df_cleaned.withColumn("Ghi Chú",trim(regexp_replace(col("Ghi Chú"), r"[\r\n]+", " ")))

In [9]:
df_cleaned1.show()

+---+------------+------------+-----------+---------+----------+--------------+-------------+---------+----------+-------------+--------------------+--------------------+-----------+----------+-------+--------------------+-----------+-------------------+-------------+--------------------+----------+------+--------+------------+---------+---------+-------------+------------+-----------------+-----------+-------+
|STT|Bưu Cục Nhận|Bưu Cục Phát|Mã Bưu Điện|Người Hủy|  Ngày Hủy|Người Xác Nhận|Ngày Xác Nhận|Người Tạo|  Ngày Tạo|Mã Khách Hàng|      Tên Khách Hàng|           Địa Chỉ|Mã Đơn Hàng|Khối Lượng|NGAY_KG|             Ghi Chú|Ghi Chú Hủy|Trạng Thái Đơn Hàng|Số Điện Thoại|        Tên Sản Phẩm|Tỉnh Thành|   Giá|Phí Ship|   Loại Ship|Tổng Cước|Tổng Tiền|Tổng Sản Phẩm|Người Chuyển|Ghi Chú Giao Hàng|Ngày Chuyển|Trị Giá|
+---+------------+------------+-----------+---------+----------+--------------+-------------+---------+----------+-------------+--------------------+--------------------+

In [10]:
df_write= df_cleaned1.write.option("header", True).csv('Data.csv', mode='overwrite')

In [11]:
df_where = df_cleaned1.where(col("STT") == '22')

In [12]:
df_where.show(truncate= False)

+---+------------+------------+-----------+---------+--------+--------------+-------------+---------+----------+-------------+--------------+-------------------------------------------------------------------------+-----------+----------+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------------+-------------+-------------------------------+----------+------+--------+------------+---------+---------+-------------+------------+-----------------+-----------+-------+
|STT|Bưu Cục Nhận|Bưu Cục Phát|Mã Bưu Điện|Người Hủy|Ngày Hủy|Người Xác Nhận|Ngày Xác Nhận|Người Tạo|Ngày Tạo  |Mã Khách Hàng|Tên Khách Hàng|Địa Chỉ                                                                |Mã Đơn Hàng|Khối Lượng|NG

In [13]:
#nhân viên bán hàng nào bị trả hàng nhiều nhất 
df_nv = df_cleaned1.groupBy("Người Xác Nhận") \
    .count() \
    .withColumnRenamed("count", "Total") \
    .orderBy("Total", ascending=False)


In [14]:
df_na = df_cleaned1.na.drop(subset=["Tên Sản Phẩm"])
df_na.show(truncate = False)

+---+------------+------------+-----------+---------+----------+--------------+-------------+---------+----------+-------------+---------------------------+------------------------------------------------------------------------------------------------------------------------------+-----------+----------+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------------+-------------+---------------------------------------+----------+------+--------+------------+---------+---------+-------------+------------+-----------------+-----------+-------+
|

In [15]:
#Tìm xem sản phẩm nào trả về nhiều nhất
df_sanpham = df_na.groupBy('Tên Sản Phẩm').count() \
  .orderBy("count", ascending=False)

In [16]:
df_sanpham.show(20,truncate = False )

+----------------------------------------------------+-----+
|Tên Sản Phẩm                                        |count|
+----------------------------------------------------+-----+
|1 EM7276 Đầm Xoè Tay Lỡ L                           |228  |
|1 EM7145 Đầm Xoè Gấm Vân L                          |165  |
|1 EM7276 Đầm Xoè Tay Lỡ XL                          |162  |
|1 EM7517 Đầm Xoè Đỏ Gấm Taffta L                    |131  |
|1 EM7096 Đầm Xoè Xếp Ply Đô Trước L                 |128  |
|1 EM7276 Đầm Xoè Tay Lỡ M                           |120  |
|1 EM7517 Đầm Xoè Đỏ Gấm Taffta XL                   |119  |
|1 EM7145 Đầm Xoè Gấm Vân M                          |118  |
|1 LIXI1 Combo 5 Bao Lì Xì                           |118  |
|1 EM7178 Đầm Suống A Đen Phối Ren XL                |109  |
|1 EM7267 Đầm Xoè Vai Choàng L                       |107  |
|1 EM7178 Đầm Suống A Đen Phối Ren L                 |102  |
|1 EM7267 Đầm Xoè Vai Choàng XL                      |100  |
|1 EM7145 Đầm Xoè Gấm Vâ

In [17]:
#tỉnh thành nào trả về nhiều nhất 
df_tinh = df_cleaned1.groupBy('Tỉnh Thành') \
    .count() \
    .withColumnRenamed("count", "Total") \
    .orderBy("Total", ascending=False)

In [18]:
df_tinh.show(50,truncate = False )

+---------------+-----+
|Tỉnh Thành     |Total|
+---------------+-----+
|Hồ Chí Minh    |1419 |
|Hà Nội         |1009 |
|Đồng Nai       |686  |
|Bình Dương     |407  |
|Hải Phòng      |345  |
|Thanh Hóa      |344  |
|Lâm Đồng       |334  |
|Nghệ An        |333  |
|Khánh Hòa      |314  |
|Đắk Lắk        |301  |
|Đà Nẵng        |297  |
|Gia Lai        |297  |
|Bà Rịa Vũng Tàu|296  |
|Quảng Nam      |271  |
|Quảng Ninh     |271  |
|Quảng Ngãi     |266  |
|Bình Thuận     |252  |
|Bình Phước     |250  |
|Quảng Bình     |239  |
|Kiên Giang     |219  |
|Bình Định      |206  |
|Cần Thơ        |205  |
|Hải Dương      |203  |
|Cà Mau         |177  |
|Phú Yên        |176  |
|Nam Định       |172  |
|Phú Thọ        |158  |
|Tây Ninh       |156  |
|Hà Tĩnh        |153  |
|Thừa Thiên Huế |148  |
|Hưng Yên       |147  |
|Bắc Ninh       |144  |
|Tiền Giang     |141  |
|Thái Bình      |140  |
|Đắk Nông       |138  |
|Long An        |133  |
|Bắc Giang      |128  |
|Bến Tre        |125  |
|Quảng Trị      

In [19]:
#nhân viên bán hàng nào bị trả hàng nhiều nhất 
df_nv = df_cleaned1.groupBy('Người Xác Nhận') \
    .count() \
    .withColumnRenamed("count", "Total") \
    .orderBy("Total", ascending=False)

In [20]:
df_nv.show(10,truncate = False )

+--------------+-----+
|Người Xác Nhận|Total|
+--------------+-----+
|THANHNHAN     |2769 |
|HONGVY        |1623 |
|TUANTAI       |1586 |
|TRUCMAI       |1448 |
|system        |1300 |
|NGOCLINH      |814  |
|ANHKIM        |767  |
|GIANGHI       |551  |
|NGOCHUYEN     |507  |
|NHATHAN       |341  |
+--------------+-----+
only showing top 10 rows



In [21]:
df_cleaned = df.withColumn("Ghi Chú",regexp_replace(col("Ghi Chú"), r"\s/\s", " "))

In [22]:
df_write= df_sanpham.write.option("header", True).csv('Total_Quantity_Of_Each_ Product', mode='overwrite')

In [23]:
df_write= df_tinh.write.option("header", True).csv('Total_Product_Per_Province..csv', mode='overwrite')

In [24]:
df_write= df_nv.write.option("header", True).csv('Total_Product_Per_ Employee.csv', mode='overwrite')

In [25]:
spark.stop()