In [7]:
import pyspark

In [8]:
from pyspark.sql import SparkSession

In [9]:
spark=SparkSession.builder.appName('Practice').getOrCreate()

In [10]:
box_status_df=spark.read.option('header','true').option("multiLine", "true").csv('DataDemo/04. Lịch sử box.csv',inferSchema=True)

In [11]:
box_status_df.show()

+--------------------+------------------+------------+----------+-----------+------------+-----------------+--------------------+--------------------+-----------+--------------------+----------------+-------------------+-------+--------------------+--------------------+
|              box_id|request_box_number|service_code|est_pickup|pickup_code|est_delivery|delivery_tikicode|              status|          sub_status|reason_code|             comment|rescheduled_date|       updated_time|station|               actor|              action|
+--------------------+------------------+------------+----------+-----------+------------+-----------------+--------------------+--------------------+-----------+--------------------+----------------+-------------------+-------+--------------------+--------------------+
|532eea67-942a-4cf...|         582871688|    standard|       hn5|VN034025004|          dn|      VN058002015|  transferred_to_3pl|          in_transit|       null|JNT - Đang luân c...|    

In [12]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [13]:
box_status_short = box_status_df.select("box_id", "status", to_timestamp("updated_time").alias("time")).orderBy("box_id", "time")
box_status_short.show()

+--------------------+--------------------+-------------------+
|              box_id|              status|               time|
+--------------------+--------------------+-------------------+
|000de80b-c61a-4b6...|waiting_for_picki...|2022-09-01 08:38:56|
|000de80b-c61a-4b6...|ready_for_picking_up|2022-09-01 09:26:34|
|000de80b-c61a-4b6...|ready_for_picking_up|2022-09-01 09:26:35|
|000de80b-c61a-4b6...|ready_for_picking_up|2022-09-01 09:26:35|
|000de80b-c61a-4b6...|ready_for_picking_up|2022-09-01 09:27:05|
|000de80b-c61a-4b6...|ready_for_picking_up|2022-09-03 00:25:18|
|000de80b-c61a-4b6...|ready_for_picking_up|2022-09-03 02:37:08|
|000de80b-c61a-4b6...|             picking|2022-09-03 04:40:45|
|000de80b-c61a-4b6...|             storing|2022-09-03 04:47:11|
|000de80b-c61a-4b6...|             storing|2022-09-03 04:47:12|
|000de80b-c61a-4b6...|             storing|2022-09-03 04:47:20|
|000de80b-c61a-4b6...| packed_in_masterbox|2022-09-03 05:30:06|
|000de80b-c61a-4b6...|          in_trans

In [14]:
windowSpec = Window.partitionBy("box_id").orderBy("box_id")
df = box_status_short.withColumn("statuslag", lag("status",1).over(windowSpec))
df2 =df.withColumn("result", when(df.status == df.statuslag, 0).otherwise(1))
box_status_sorted = df2.filter(df2.result == 1).orderBy("box_id", "time").drop("statuslag", "result")
box_status_sorted.show()

# simplify box status

+--------------------+--------------------+-------------------+
|              box_id|              status|               time|
+--------------------+--------------------+-------------------+
|000de80b-c61a-4b6...|waiting_for_picki...|2022-09-01 08:38:56|
|000de80b-c61a-4b6...|ready_for_picking_up|2022-09-01 09:26:34|
|000de80b-c61a-4b6...|             picking|2022-09-03 04:40:45|
|000de80b-c61a-4b6...|             storing|2022-09-03 04:47:11|
|000de80b-c61a-4b6...| packed_in_masterbox|2022-09-03 05:30:06|
|000de80b-c61a-4b6...|          in_transit|2022-09-03 05:32:06|
|000de80b-c61a-4b6...|arrived_at_destin...|2022-09-03 06:23:40|
|000de80b-c61a-4b6...|confirmed_in_mast...|2022-09-03 06:25:52|
|000de80b-c61a-4b6...| packed_in_masterbox|2022-09-03 08:51:29|
|000de80b-c61a-4b6...|          in_transit|2022-09-03 09:00:22|
|000de80b-c61a-4b6...|arrived_at_destin...|2022-09-04 18:47:04|
|000de80b-c61a-4b6...|confirmed_in_mast...|2022-09-04 19:08:34|
|000de80b-c61a-4b6...| packed_in_masterb

In [15]:
box_tmp = box_status_df.select("box_id", "pickup_code", "delivery_tikicode").distinct()
box_tmp.show()

# get pickup and delivery code

+--------------------+-----------+-----------------+
|              box_id|pickup_code|delivery_tikicode|
+--------------------+-----------+-----------------+
|c938ca85-bb39-4e2...|VN039009004|      VN012007002|
|6e6907dc-eb25-457...|VN034022009|      VN034022009|
|b038cb2d-f73c-441...|VN039023010|      VN017006013|
|6aa0e4f7-33e6-4f9...|VN034022005|      VN034022001|
|3be77f79-cd1f-472...|VN034021010|      VN059010013|
|767fcc3d-8e1c-4fa...|VN034022005|      VN034022002|
|ee23dcde-f5d9-464...|VN039023004|      VN054012009|
|79cbe234-f082-464...|VN034029004|      VN039010006|
|a4c3e5ad-6198-447...|VN039009005|      VN039020011|
|298d078d-c46a-498...|VN034022003|      VN055009007|
|875a6e16-25c2-4a5...|VN039009005|      VN039010003|
|4d831e1c-a649-446...|VN039020001|      VN034024007|
|5c7ea77e-b8d6-444...|VN034016002|      VN034016027|
|9999999a-02df-418...|VN039022015|      VN040011006|
|38313f9c-4d82-441...|VN039018004|      VN039010001|
|789316de-904b-47a...|VN039020012|      VN0120

In [16]:
tiki_code_df=spark.read.option('header','true').option("multiLine", "true").csv('DataDemo/17. TikiCode.csv',inferSchema=True)

In [17]:
tk_df = tiki_code_df.select("tiki_code_ward", "kv_sale")

In [18]:
df_tmp = box_tmp.join(tk_df.withColumnRenamed("kv_sale", "kv_pickup"), box_tmp.pickup_code == tk_df.tiki_code_ward, "inner") \
.select("box_id", "kv_pickup")
df_tmp.show()

# get pickup region

+--------------------+---------+
|              box_id|kv_pickup|
+--------------------+---------+
|c938ca85-bb39-4e2...|        1|
|6e6907dc-eb25-457...|        3|
|b038cb2d-f73c-441...|        1|
|6aa0e4f7-33e6-4f9...|        3|
|3be77f79-cd1f-472...|        3|
|767fcc3d-8e1c-4fa...|        3|
|ee23dcde-f5d9-464...|        1|
|79cbe234-f082-464...|        3|
|a4c3e5ad-6198-447...|        1|
|298d078d-c46a-498...|        3|
|875a6e16-25c2-4a5...|        1|
|4d831e1c-a649-446...|        1|
|5c7ea77e-b8d6-444...|        3|
|9999999a-02df-418...|        1|
|38313f9c-4d82-441...|        1|
|789316de-904b-47a...|        1|
|56f27d7f-271c-482...|        1|
|62446dfa-d07e-4b1...|        1|
|4b617c9d-b77a-4f8...|        1|
|100829c4-b400-4fa...|        1|
+--------------------+---------+
only showing top 20 rows



In [19]:
df_tmp2 = box_tmp.join(tk_df.withColumnRenamed("kv_sale", "kv_delivery"), box_tmp.delivery_tikicode == tk_df.tiki_code_ward, "inner") \
.select("box_id", "kv_delivery")
df_tmp2.show()

# get delivery region

+--------------------+-----------+
|              box_id|kv_delivery|
+--------------------+-----------+
|c938ca85-bb39-4e2...|          1|
|6e6907dc-eb25-457...|          3|
|b038cb2d-f73c-441...|          1|
|6aa0e4f7-33e6-4f9...|          3|
|3be77f79-cd1f-472...|          3|
|767fcc3d-8e1c-4fa...|          3|
|ee23dcde-f5d9-464...|          3|
|79cbe234-f082-464...|          1|
|a4c3e5ad-6198-447...|          1|
|298d078d-c46a-498...|          1|
|875a6e16-25c2-4a5...|          1|
|4d831e1c-a649-446...|          3|
|5c7ea77e-b8d6-444...|          3|
|9999999a-02df-418...|          3|
|38313f9c-4d82-441...|          1|
|789316de-904b-47a...|          1|
|56f27d7f-271c-482...|          1|
|62446dfa-d07e-4b1...|          1|
|4b617c9d-b77a-4f8...|          1|
|100829c4-b400-4fa...|          1|
+--------------------+-----------+
only showing top 20 rows



In [26]:
# box_status_sorted.groupBy("box_id").min(to_timestamp("updated_time"))
df = box_status_sorted.groupBy("box_id").agg(min("time"), max("time"), count("*"))
df_tmp3 = df.withColumn('DiffInSeconds',col("max(time)").cast("long") - col('min(time)').cast("long")) \
.withColumn("AverageTimeB2S", (col("DiffInSeconds") / (col("count(1)") - 1))).orderBy("box_id")
df_tmp3.show()

# calculate average time between 2 box status of all boxes

+--------------------+-------------------+-------------------+--------+-------------+------------------+
|              box_id|          min(time)|          max(time)|count(1)|DiffInSeconds|    AverageTimeB2S|
+--------------------+-------------------+-------------------+--------+-------------+------------------+
|000de80b-c61a-4b6...|2022-09-01 08:38:56|2022-09-05 06:37:46|      19|       338330| 18796.11111111111|
|0013383b-7873-4a5...|2022-08-04 03:14:28|2022-08-07 08:53:22|      12|       279534| 25412.18181818182|
|001f674e-b89c-41a...|2022-09-04 21:05:39|2022-09-07 04:32:28|      17|       199609|        12475.5625|
|0039e926-3ff9-426...|2022-07-16 07:55:00|2022-07-20 09:14:46|      16|       350386|23359.066666666666|
|003cb878-42d0-40a...|2022-07-31 06:23:34|2022-08-02 05:48:00|      18|       170666|10039.176470588236|
|00424424-1f55-454...|2022-09-02 13:20:09|2022-09-06 09:58:15|      13|       333486|           27790.5|
|0043ee67-44d7-4e2...|2022-08-20 06:40:21|2022-08-20 15

In [21]:
df_a = df_tmp3.join(df_tmp, ["box_id"], "inner")
df_a.show()

+--------------------+------------------+---------+
|              box_id|    AverageTimeB2S|kv_pickup|
+--------------------+------------------+---------+
|c938ca85-bb39-4e2...|          15972.25|        1|
|6e6907dc-eb25-457...|         12546.125|        3|
|b038cb2d-f73c-441...| 27882.88888888889|        1|
|6aa0e4f7-33e6-4f9...|3506.1428571428573|        3|
|3be77f79-cd1f-472...|31061.263157894737|        3|
|767fcc3d-8e1c-4fa...|          11374.25|        3|
|ee23dcde-f5d9-464...|           26691.0|        1|
|79cbe234-f082-464...|           25669.4|        3|
|a4c3e5ad-6198-447...|17414.285714285714|        1|
|298d078d-c46a-498...| 80699.78571428571|        3|
|875a6e16-25c2-4a5...|23714.384615384617|        1|
|4d831e1c-a649-446...|           17344.8|        1|
|5c7ea77e-b8d6-444...|           25290.0|        3|
|9999999a-02df-418...|           59953.1|        1|
|38313f9c-4d82-441...|           49952.0|        1|
|789316de-904b-47a...| 10230.90909090909|        1|
|56f27d7f-27

In [22]:
df_b = df_a.join(df_tmp2, ["box_id"], "inner")
df_b.orderBy("box_id").show()

# average time of box status and its regions
# kv1 : mien nam, kv2 : mien trung, kv3: mien bac

+--------------------+------------------+---------+-----------+
|              box_id|    AverageTimeB2S|kv_pickup|kv_delivery|
+--------------------+------------------+---------+-----------+
|000de80b-c61a-4b6...| 18796.11111111111|        3|          1|
|0013383b-7873-4a5...| 25412.18181818182|        1|          1|
|001f674e-b89c-41a...|        12475.5625|        1|          1|
|0039e926-3ff9-426...|23359.066666666666|        3|          1|
|003cb878-42d0-40a...|10039.176470588236|        1|          1|
|00424424-1f55-454...|           27790.5|        1|          1|
|0043ee67-44d7-4e2...|          3760.625|        3|          3|
|004cb51e-fa84-450...|20147.333333333332|        1|          1|
|00683648-d4f6-4ea...|           15242.7|        3|          3|
|00684fe0-de00-46f...|10679.363636363636|        1|          1|
|006947c1-19b2-404...|3147.4444444444443|        3|          3|
|007b39b4-c5e5-4be...|18719.842105263157|        1|          1|
|008206dc-4c27-4b8...| 6287.571428571428

In [23]:
df_c = df_b.groupBy("kv_pickup", "kv_delivery").agg({"AverageTimeB2S":"avg"}).withColumn("inHours", col("avg(AverageTimeB2S)")/3600)

# get average time for each delivery type

In [25]:
df_c.toPandas().to_csv('AvgTimeBoxStatus.csv')