In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,StringType

In [2]:
spark = (SparkSession.builder.appName("Challenge 2").getOrCreate())

### tại sao khi load ta lại để hết StringType 
#### Vì rất có thể dữ liệu đang bị sai mà load vào sẽ không nhận và bị lỗi trong khi load
#### trong khi đó stringtype nhận mọi loại dữ liệu, ta có thể load vào và xử lý từ từ

In [3]:
sale_df = StructType([
    StructField("Order ID",StringType(),True),
    StructField("Product",StringType(),True),
    StructField("Quantity",StringType(),True),
    StructField("Price",StringType(),True),
    StructField("Order Date",StringType(),True),
    StructField("Address",StringType(),True)
])

In [4]:
sale_path = "salesdata"
sale_df = (spark.read.format("csv")
          .option("header",True)
          .schema(sale_df)
          .load(sale_path))
sale_df.show(10)

+--------+--------------------+--------+------+--------------+--------------------+
|Order ID|             Product|Quantity| Price|    Order Date|             Address|
+--------+--------------------+--------+------+--------------+--------------------+
|  295665|  Macbook Pro Laptop|       1|  1700|12/30/19 00:01|136 Church St, Ne...|
|  295666|  LG Washing Machine|       1| 600.0|12/29/19 07:03|562 2nd St, New Y...|
|  295667|USB-C Charging Cable|       1| 11.95|12/12/19 18:21|277 Main St, New ...|
|  295668|    27in FHD Monitor|       1|149.99|12/22/19 15:13|410 6th St, San F...|
|  295669|USB-C Charging Cable|       1| 11.95|12/18/19 12:38|43 Hill St, Atlan...|
|  295670|AA Batteries (4-p...|       1|  3.84|12/31/19 22:58|200 Jefferson St,...|
|  295671|USB-C Charging Cable|       1| 11.95|12/16/19 15:10|928 12th St, Port...|
|  295672|USB-C Charging Cable|       2| 11.95|12/13/19 09:29|813 Hickory St, D...|
|  295673|Bose SoundSport H...|       1| 99.99|12/15/19 23:26|718 Wilson St,

In [5]:
from pyspark.sql.functions import col

In [6]:
sale_df.filter(col("Order ID") == "Order ID").show(10)

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|        Quantity|     Price|Order Date|         Address|
+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+-------

### Xoá các hàng bị null và sai
#### bây giờ ta sẽ thực hiện xoá các cột bị null 
#### tiếp theo sẽ xoá các cột có giá trị không mong muốn như sai type, sai kiểu dữ liệu

In [7]:
sale_df.filter(col("Order ID").isNull()==True).show(10)

+--------+-------+--------+-----+----------+-------+
|Order ID|Product|Quantity|Price|Order Date|Address|
+--------+-------+--------+-----+----------+-------+
|    null|   null|    null| null|      null|   null|
|    null|   null|    null| null|      null|   null|
|    null|   null|    null| null|      null|   null|
|    null|   null|    null| null|      null|   null|
|    null|   null|    null| null|      null|   null|
|    null|   null|    null| null|      null|   null|
|    null|   null|    null| null|      null|   null|
|    null|   null|    null| null|      null|   null|
|    null|   null|    null| null|      null|   null|
|    null|   null|    null| null|      null|   null|
+--------+-------+--------+-----+----------+-------+
only showing top 10 rows



In [8]:
sale_df = sale_df.na.drop("any")

In [9]:
sale_df.filter(col("Order ID").isNull()==True).show(10)

+--------+-------+--------+-----+----------+-------+
|Order ID|Product|Quantity|Price|Order Date|Address|
+--------+-------+--------+-----+----------+-------+
+--------+-------+--------+-----+----------+-------+



In [10]:
sale_df.describe("Order ID","Product","Quantity","Price","Order Date","Address").show()

+-------+------------------+------------+------------------+------------------+--------------+--------------------+
|summary|          Order ID|     Product|          Quantity|             Price|    Order Date|             Address|
+-------+------------------+------------+------------------+------------------+--------------+--------------------+
|  count|            186305|      186305|            186305|            186305|        186305|              186305|
|   mean| 230417.5693788653|        null|1.1243828986286637|184.39973476747707|          null|                null|
| stddev|51512.737109995265|        null|0.4427926240286704| 332.7313298843439|          null|                null|
|    min|            141234|20in Monitor|                 1|            109.99|01/01/19 03:07|1 11th St, Atlant...|
|    max|          Order ID|      iPhone|  Quantity Ordered|        Price Each|    Order Date|    Purchase Address|
+-------+------------------+------------+------------------+------------

### ta thấy rằng có một vài vấn đề không đúng ở đây, ví dụ
#### cột id đáng ra chỉ có  số nhưng lại có những giá trị như "Order ID"
#### và các cột còn lại có rất nhiều giá trị bất hợp lý ở hàng max
#### đầu tiên ở dưới ta thực hiện xoá những bản ghi trùng lặp

In [11]:
distinct_df = sale_df.distinct()

In [12]:
distinct_df.filter(col("Order ID")== "Order ID").show(10)

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|        Quantity|     Price|Order Date|         Address|
+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+



In [13]:
clean_df = distinct_df.filter(col("Order ID") != "Order ID")

In [14]:
clean_df.filter(col("Order ID") == "Order ID").show(10)

+--------+-------+--------+-----+----------+-------+
|Order ID|Product|Quantity|Price|Order Date|Address|
+--------+-------+--------+-----+----------+-------+
+--------+-------+--------+-----+----------+-------+



In [15]:
clean_df.describe("Order ID","Product","Quantity","Price","Order Date","Address").show()

+-------+------------------+------------+-------------------+------------------+--------------+--------------------+
|summary|          Order ID|     Product|           Quantity|             Price|    Order Date|             Address|
+-------+------------------+------------+-------------------+------------------+--------------+--------------------+
|  count|            185686|      185686|             185686|            185686|        185686|              185686|
|   mean|230411.37622653297|        null| 1.1245435843305365|184.51925546352425|          null|                null|
| stddev| 51511.71718332087|        null|0.44306873838328736|332.84383839005227|          null|                null|
|    min|            141234|20in Monitor|                  1|            109.99|01/01/19 03:07|1 11th St, Atlant...|
|    max|            319670|      iPhone|                  9|            999.99|12/31/19 23:53|999 Wilson St, Sa...|
+-------+------------------+------------+-------------------+---

#### bây giờ ở các cột ta đã thấy rằng giá trị đúng cần tìm đúng là số, chữ hay địa chỉ chứ không phải sai thể loại nữa

### Trích xuất những dữ liệu nhỏ hơn
trong ý tưởng ta có thể nghĩ ra rằng chúng ta sẽ rút chuỗi dựa trên dấu phẩy sau đó trích xuất tương ứng

In [16]:
clean_df.show(10, truncate = False)

+--------+------------------------+--------+-----+--------------+-------------------------------------+
|Order ID|Product                 |Quantity|Price|Order Date    |Address                              |
+--------+------------------------+--------+-----+--------------+-------------------------------------+
|295726  |Lightning Charging Cable|1       |14.95|12/25/19 14:49|203 Lakeview St, Boston, MA 02215    |
|295936  |AAA Batteries (4-pack)  |1       |2.99 |12/23/19 10:08|283 Highland St, Seattle, WA 98101   |
|295991  |Lightning Charging Cable|1       |14.95|12/15/19 20:16|857 Center St, Boston, MA 02215      |
|296052  |Apple Airpods Headphones|1       |150  |12/08/19 18:59|349 Maple St, San Francisco, CA 94016|
|296154  |Lightning Charging Cable|1       |14.95|12/08/19 20:30|150 Ridge St, Austin, TX 73301       |
|296387  |USB-C Charging Cable    |2       |11.95|12/27/19 21:07|40 North St, Portland, OR 97035      |
|296566  |Lightning Charging Cable|1       |14.95|12/07/19 15:14

In [17]:
from pyspark.sql.functions import split

In [18]:
clean_df.select("Address").show(10,False)

+-------------------------------------+
|Address                              |
+-------------------------------------+
|203 Lakeview St, Boston, MA 02215    |
|283 Highland St, Seattle, WA 98101   |
|857 Center St, Boston, MA 02215      |
|349 Maple St, San Francisco, CA 94016|
|150 Ridge St, Austin, TX 73301       |
|40 North St, Portland, OR 97035      |
|343 Park St, New York City, NY 10001 |
|709 Spruce St, Los Angeles, CA 90001 |
|793 10th St, Los Angeles, CA 90001   |
|25 Lincoln St, Portland, OR 97035    |
+-------------------------------------+
only showing top 10 rows



In [19]:
clean_df.select("Address",split(col("Address"),",")).show(10,False)

+-------------------------------------+-----------------------------------------+
|Address                              |split(Address, ,, -1)                    |
+-------------------------------------+-----------------------------------------+
|203 Lakeview St, Boston, MA 02215    |[203 Lakeview St,  Boston,  MA 02215]    |
|283 Highland St, Seattle, WA 98101   |[283 Highland St,  Seattle,  WA 98101]   |
|857 Center St, Boston, MA 02215      |[857 Center St,  Boston,  MA 02215]      |
|349 Maple St, San Francisco, CA 94016|[349 Maple St,  San Francisco,  CA 94016]|
|150 Ridge St, Austin, TX 73301       |[150 Ridge St,  Austin,  TX 73301]       |
|40 North St, Portland, OR 97035      |[40 North St,  Portland,  OR 97035]      |
|343 Park St, New York City, NY 10001 |[343 Park St,  New York City,  NY 10001] |
|709 Spruce St, Los Angeles, CA 90001 |[709 Spruce St,  Los Angeles,  CA 90001] |
|793 10th St, Los Angeles, CA 90001   |[793 10th St,  Los Angeles,  CA 90001]   |
|25 Lincoln St, 

Bây giờ ta có thẻ hiểu rằng cột thứ 2 như là một list, ta có thể thoải mái lấy thông tin dựa trên index

In [20]:
clean_df.select("Address",split(col("Address"),",").getItem(1)).show(10,False)

+-------------------------------------+------------------------+
|Address                              |split(Address, ,, -1)[1]|
+-------------------------------------+------------------------+
|203 Lakeview St, Boston, MA 02215    | Boston                 |
|283 Highland St, Seattle, WA 98101   | Seattle                |
|857 Center St, Boston, MA 02215      | Boston                 |
|349 Maple St, San Francisco, CA 94016| San Francisco          |
|150 Ridge St, Austin, TX 73301       | Austin                 |
|40 North St, Portland, OR 97035      | Portland               |
|343 Park St, New York City, NY 10001 | New York City          |
|709 Spruce St, Los Angeles, CA 90001 | Los Angeles            |
|793 10th St, Los Angeles, CA 90001   | Los Angeles            |
|25 Lincoln St, Portland, OR 97035    | Portland               |
+-------------------------------------+------------------------+
only showing top 10 rows



In [21]:
clean_df.select("Address",split(col("Address"),",").getItem(2)).show(10,False)

+-------------------------------------+------------------------+
|Address                              |split(Address, ,, -1)[2]|
+-------------------------------------+------------------------+
|203 Lakeview St, Boston, MA 02215    | MA 02215               |
|283 Highland St, Seattle, WA 98101   | WA 98101               |
|857 Center St, Boston, MA 02215      | MA 02215               |
|349 Maple St, San Francisco, CA 94016| CA 94016               |
|150 Ridge St, Austin, TX 73301       | TX 73301               |
|40 North St, Portland, OR 97035      | OR 97035               |
|343 Park St, New York City, NY 10001 | NY 10001               |
|709 Spruce St, Los Angeles, CA 90001 | CA 90001               |
|793 10th St, Los Angeles, CA 90001   | CA 90001               |
|25 Lincoln St, Portland, OR 97035    | OR 97035               |
+-------------------------------------+------------------------+
only showing top 10 rows



In [22]:
clean_df.select("Address",split(split(col("Address"),",").getItem(2)," ")).show(10,False)

+-------------------------------------+--------------------------------------+
|Address                              |split(split(Address, ,, -1)[2],  , -1)|
+-------------------------------------+--------------------------------------+
|203 Lakeview St, Boston, MA 02215    |[, MA, 02215]                         |
|283 Highland St, Seattle, WA 98101   |[, WA, 98101]                         |
|857 Center St, Boston, MA 02215      |[, MA, 02215]                         |
|349 Maple St, San Francisco, CA 94016|[, CA, 94016]                         |
|150 Ridge St, Austin, TX 73301       |[, TX, 73301]                         |
|40 North St, Portland, OR 97035      |[, OR, 97035]                         |
|343 Park St, New York City, NY 10001 |[, NY, 10001]                         |
|709 Spruce St, Los Angeles, CA 90001 |[, CA, 90001]                         |
|793 10th St, Los Angeles, CA 90001   |[, CA, 90001]                         |
|25 Lincoln St, Portland, OR 97035    |[, OR, 97035]

In [23]:
new_col = (sale_df.withColumn("City",split(col("Address"),",").getItem(1))
                  .withColumn("State",split(split(col("Address"),",").getItem(2)," ").getItem(1)))

In [24]:
new_col.show(10)

+--------+--------------------+--------+------+--------------+--------------------+--------------+-----+
|Order ID|             Product|Quantity| Price|    Order Date|             Address|          City|State|
+--------+--------------------+--------+------+--------------+--------------------+--------------+-----+
|  295665|  Macbook Pro Laptop|       1|  1700|12/30/19 00:01|136 Church St, Ne...| New York City|   NY|
|  295666|  LG Washing Machine|       1| 600.0|12/29/19 07:03|562 2nd St, New Y...| New York City|   NY|
|  295667|USB-C Charging Cable|       1| 11.95|12/12/19 18:21|277 Main St, New ...| New York City|   NY|
|  295668|    27in FHD Monitor|       1|149.99|12/22/19 15:13|410 6th St, San F...| San Francisco|   CA|
|  295669|USB-C Charging Cable|       1| 11.95|12/18/19 12:38|43 Hill St, Atlan...|       Atlanta|   GA|
|  295670|AA Batteries (4-p...|       1|  3.84|12/31/19 22:58|200 Jefferson St,...| New York City|   NY|
|  295671|USB-C Charging Cable|       1| 11.95|12/16/19

### Rename and change DataType
bây giờ ta đã thấy dữ liệu đã sạch đẹp, bước tiếp theo là trả về đúng kiểu dữ liệu

In [25]:
from pyspark.sql.functions import to_timestamp, year, month
from pyspark.sql.types import IntegerType,FloatType

In [26]:
new_df = (new_col.withColumn("Order_ID",col("Order ID").cast(IntegerType()))
                 .withColumn("Quantity_pro",col("Quantity").cast(IntegerType()))
                 .withColumn("Price_pro",col("Price").cast(IntegerType()))
                 .withColumn("Order_Date",to_timestamp(col("Order Date"),"MM/dd/yy HH:mm"))
                 .withColumnRenamed("Address","StoreAddress")
                 .drop("Order ID")
                 .drop("Quantity")
                 .drop("Price")
                 .drop("Order Date"))
         

In [27]:
new_df.show(10)

+--------------------+--------------------+--------------+-----+--------+------------+---------+-------------------+
|             Product|        StoreAddress|          City|State|Order_ID|Quantity_pro|Price_pro|         Order_Date|
+--------------------+--------------------+--------------+-----+--------+------------+---------+-------------------+
|  Macbook Pro Laptop|136 Church St, Ne...| New York City|   NY|  295665|           1|     1700|2019-12-30 00:01:00|
|  LG Washing Machine|562 2nd St, New Y...| New York City|   NY|  295666|           1|      600|2019-12-29 07:03:00|
|USB-C Charging Cable|277 Main St, New ...| New York City|   NY|  295667|           1|       11|2019-12-12 18:21:00|
|    27in FHD Monitor|410 6th St, San F...| San Francisco|   CA|  295668|           1|      149|2019-12-22 15:13:00|
|USB-C Charging Cable|43 Hill St, Atlan...|       Atlanta|   GA|  295669|           1|       11|2019-12-18 12:38:00|
|AA Batteries (4-p...|200 Jefferson St,...| New York City|   NY|

In [28]:
new_df.printSchema()

root
 |-- Product: string (nullable = true)
 |-- StoreAddress: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Order_ID: integer (nullable = true)
 |-- Quantity_pro: integer (nullable = true)
 |-- Price_pro: integer (nullable = true)
 |-- Order_Date: timestamp (nullable = true)



In [29]:
new_df = (new_df.withColumn("Year",year(col("Order_Date")))
                 .withColumn("Month",month(col("Order_Date"))))

In [30]:
new_df.show(10)

+--------------------+--------------------+--------------+-----+--------+------------+---------+-------------------+----+-----+
|             Product|        StoreAddress|          City|State|Order_ID|Quantity_pro|Price_pro|         Order_Date|Year|Month|
+--------------------+--------------------+--------------+-----+--------+------------+---------+-------------------+----+-----+
|  Macbook Pro Laptop|136 Church St, Ne...| New York City|   NY|  295665|           1|     1700|2019-12-30 00:01:00|2019|   12|
|  LG Washing Machine|562 2nd St, New Y...| New York City|   NY|  295666|           1|      600|2019-12-29 07:03:00|2019|   12|
|USB-C Charging Cable|277 Main St, New ...| New York City|   NY|  295667|           1|       11|2019-12-12 18:21:00|2019|   12|
|    27in FHD Monitor|410 6th St, San F...| San Francisco|   CA|  295668|           1|      149|2019-12-22 15:13:00|2019|   12|
|USB-C Charging Cable|43 Hill St, Atlan...|       Atlanta|   GA|  295669|           1|       11|2019-12-

### lưu vào file parquet
Tại sao ta không lưu vào một định dạng dễ nhìn như csv hay pgadmin mà phải lưu vào parquet<br>
ta có thể hiểu rằng file parquet phân vùng dữ liệu cực kì tốt và có thể chia nhỏ những dữ liệu theo mong muốn của mình<br>
việc này giúp khi lưu dữ liệu xuống nó được sắp xếp ngăn nắp trong các folder riêng việt<br>
khi muốn lấy dữ liệu nào ta chỉ cần lấy đúng vị trí và địa chỉ đó không phải load một khối lượng data to đùng<br>

In [31]:
output_path = "challenge2"
new_df.write.mode("overwrite").partitionBy("Year","Month").parquet(output_path)

vậy là ta sẽ thấy được dữ liệu lưu xuống theo 2 năm là 2019 và 2020<br>
trong folder 2019 và 2020 lại được chia ra folder nhỏ với 12 tháng<br>