In [1]:
!pip install pyspark==3.2.2
!pip install delta-spark

Collecting pyspark==3.2.2
  Downloading pyspark-3.2.2.tar.gz (281.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.5/281.5 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5 (from pyspark==3.2.2)
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m10.4 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.2-py2.py3-none-any.whl size=281969432 sha256=3982946552b1101e2eb7b2e8534d16c604c7bfa333ecbe9864087d9dd9bfcec5
  Stored in directory: /root/.cache/pip/wheels/99/2c/e7/e06690607c5342affbc132b79732a3a321c83eec58d5617c38
Successfully built pyspark
Installing collected packages: py4j, pyspark
 

CARRIERRATES PATH

In [26]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from delta import *

builder = SparkSession.builder.appName("Supply chain") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.0.0")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

path = '/content/raw_data/CarrierRates.csv'

carrierrates_df = spark.read.csv(path,header = True,inferSchema = True)
carrierrates_df.show()

+-------+----------+---------------+-----------------+-----------------+-----------+------------------+------------------+---------------+-----------------+-----------+
|Carrier|OriginPort|DestinationPort|MinWeightQuantity|MaxWeightQuantity|ServiceCode|       MinimumCost|              Rate|ModeDescription|TransportDayCount|CarrierType|
+-------+----------+---------------+-----------------+-----------------+-----------+------------------+------------------+---------------+-----------------+-----------+
| V444_6|    PORT08|         PORT09|            250.0|           499.99|        DTD|43.227199999999996|            0.7132|         AIR   |                2|V88888888_0|
| V444_6|    PORT08|         PORT09|             65.0|            69.99|        DTD|43.227199999999996|0.7512000000000001|         AIR   |                2|V88888888_0|
| V444_6|    PORT08|         PORT09|             60.0|            64.99|        DTD|43.227199999999996|            0.7892|         AIR   |                2

In [27]:
# data cleaning
#remove unnecessary whitespaces

from pyspark.sql.functions import trim

# Apply trim to string columns
carrierrates_df = carrierrates_df.withColumn("Carrier", trim(carrierrates_df.Carrier)) \
                     .withColumn("OriginPort", trim(carrierrates_df.OriginPort)) \
                     .withColumn("DestinationPort", trim(carrierrates_df.DestinationPort)) \
                     .withColumn("ServiceCode", trim(carrierrates_df.ServiceCode)) \
                     .withColumn("ModeDescription", trim(carrierrates_df.ModeDescription)) \
                     .withColumn("CarrierType", trim(carrierrates_df.CarrierType))

carrierrates_df.show()

+-------+----------+---------------+-----------------+-----------------+-----------+------------------+------------------+---------------+-----------------+-----------+
|Carrier|OriginPort|DestinationPort|MinWeightQuantity|MaxWeightQuantity|ServiceCode|       MinimumCost|              Rate|ModeDescription|TransportDayCount|CarrierType|
+-------+----------+---------------+-----------------+-----------------+-----------+------------------+------------------+---------------+-----------------+-----------+
| V444_6|    PORT08|         PORT09|            250.0|           499.99|        DTD|43.227199999999996|            0.7132|            AIR|                2|V88888888_0|
| V444_6|    PORT08|         PORT09|             65.0|            69.99|        DTD|43.227199999999996|0.7512000000000001|            AIR|                2|V88888888_0|
| V444_6|    PORT08|         PORT09|             60.0|            64.99|        DTD|43.227199999999996|            0.7892|            AIR|                2

In [28]:
from pyspark.sql.functions import round

# Round the MaxWeightQuantity column to 2 decimal places
carrierrates_df = carrierrates_df.withColumn("MaxWeightQuantity", round(carrierrates_df["MaxWeightQuantity"], 0)).withColumn("Rate", round(carrierrates_df["Rate"], 2)).withColumn("MinWeightQuantity", round(carrierrates_df["MinWeightQuantity"], 0)).withColumn("MinimumCost", round(carrierrates_df["MinimumCost"], 0))

# Show the updated DataFrame
carrierrates_df.show()



+-------+----------+---------------+-----------------+-----------------+-----------+-----------+----+---------------+-----------------+-----------+
|Carrier|OriginPort|DestinationPort|MinWeightQuantity|MaxWeightQuantity|ServiceCode|MinimumCost|Rate|ModeDescription|TransportDayCount|CarrierType|
+-------+----------+---------------+-----------------+-----------------+-----------+-----------+----+---------------+-----------------+-----------+
| V444_6|    PORT08|         PORT09|            250.0|            500.0|        DTD|       43.0|0.71|            AIR|                2|V88888888_0|
| V444_6|    PORT08|         PORT09|             65.0|             70.0|        DTD|       43.0|0.75|            AIR|                2|V88888888_0|
| V444_6|    PORT08|         PORT09|             60.0|             65.0|        DTD|       43.0|0.79|            AIR|                2|V88888888_0|
| V444_6|    PORT08|         PORT09|             50.0|             55.0|        DTD|       43.0|0.83|           

In [29]:


orderlist_path = '/content/raw_data/OrderList.csv'

OrderList_df = spark.read.csv(orderlist_path, header=True, inferSchema=True)

OrderList_df.show()

+--------------+----------+----------+-------+---+------------+-----------------+----------------+---------+---------+---------+---------------+------------+-----------------+
|       OrderID| OrderDate|OriginPort|Carrier|TPT|ServiceLevel|ShipAheadDayCount|ShipLateDayCount| Customer|ProductID|PlantCode|DestinationPort|UnitQuantity|           Weight|
+--------------+----------+----------+-------+---+------------+-----------------+----------------+---------+---------+---------+---------------+------------+-----------------+
|1.4472964467E9|2013-05-26|    PORT09|  V44_3|  1|         CRF|                3|               0|V55555_53|  1700106|  PLANT16|         PORT09|         808|             14.3|
|1.4471580147E9|2013-05-26|    PORT09|  V44_3|  1|         CRF|                3|               0|V55555_53|  1700106|  PLANT16|         PORT09|        3188|            87.94|
|1.4471388987E9|2013-05-26|    PORT09|  V44_3|  1|         CRF|                3|               0|V55555_53|  1700106|  

In [30]:
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, LongType, IntegerType, FloatType, StringType, DateType

schema = StructType([
    StructField("OrderID", StringType(), True),
    StructField("OrderDate", DateType(), True),
    StructField("OriginPort", StringType(), True),
    StructField("Carrier", StringType(), True),
    StructField("TPT", StringType(), True),
    StructField("ServiceLevel", StringType(), True),
    StructField("ShipAheadDayCount", IntegerType(), True),
    StructField("ShipLateDayCount", IntegerType(), True),
    StructField("Customer", StringType(), True),
    StructField("ProductID", IntegerType(), True),
    StructField("PlantCode", StringType(), True),
    StructField("DestinationPort", StringType(), True),
    StructField("UnitQuantity", IntegerType(), True),
    StructField("Weight", FloatType(), True)
])

OrderList_df = spark.read.csv(orderlist_path, header=True, schema=schema)

OrderList_df = OrderList_df.withColumn("OrderID", col("OrderID").cast(LongType()))

OrderList_df.show()

OrderList_df.printSchema()

+----------+----------+----------+-------+---+------------+-----------------+----------------+---------+---------+---------+---------------+------------+------+
|   OrderID| OrderDate|OriginPort|Carrier|TPT|ServiceLevel|ShipAheadDayCount|ShipLateDayCount| Customer|ProductID|PlantCode|DestinationPort|UnitQuantity|Weight|
+----------+----------+----------+-------+---+------------+-----------------+----------------+---------+---------+---------+---------------+------------+------+
|1447296446|2013-05-26|    PORT09|  V44_3|  1|         CRF|                3|               0|V55555_53|  1700106|  PLANT16|         PORT09|         808|  14.3|
|1447158014|2013-05-26|    PORT09|  V44_3|  1|         CRF|                3|               0|V55555_53|  1700106|  PLANT16|         PORT09|        3188| 87.94|
|1447138898|2013-05-26|    PORT09|  V44_3|  1|         CRF|                3|               0|V55555_53|  1700106|  PLANT16|         PORT09|        2331|  61.2|
|1447363527|2013-05-26|    PORT09|

In [31]:

path = '/content/raw_data/PlantCapacities.csv'
plantcapacities_df = spark.read.csv(path,header = True,inferSchema = True)
plantcapacities_df.show()

plantcapacities_df.printSchema()

+---------+-------------+
|PlantCode|DailyCapacity|
+---------+-------------+
|  PLANT15|           11|
|  PLANT17|            8|
|  PLANT18|          111|
|  PLANT05|          385|
|  PLANT02|          138|
|  PLANT01|         1070|
|  PLANT06|           49|
|  PLANT10|          118|
|  PLANT07|          265|
|  PLANT14|          549|
|  PLANT16|          457|
|  PLANT12|          209|
|  PLANT11|          332|
|  PLANT09|           11|
|  PLANT03|         1013|
|  PLANT13|          490|
|  PLANT19|            7|
|  PLANT08|           14|
|  PLANT04|          554|
+---------+-------------+

root
 |-- PlantCode: string (nullable = true)
 |-- DailyCapacity: integer (nullable = true)



In [32]:

path = '/content/raw_data/PlantCosts.csv'
plantcost_df = spark.read.csv(path,header = True,inferSchema = True)
plantcost_df = plantcost_df.withColumn("CostPerUnit", round(plantcost_df["CostPerUnit"], 4))
plantcost_df.show()

plantcost_df.printSchema()

+---------+-----------+
|PlantCode|CostPerUnit|
+---------+-----------+
|  PLANT15|     1.4151|
|  PLANT17|     0.4289|
|  PLANT18|     2.0363|
|  PLANT05|     0.4881|
|  PLANT02|     0.4775|
|  PLANT01|      0.567|
|  PLANT06|     0.5541|
|  PLANT10|     0.4936|
|  PLANT07|     0.3714|
|  PLANT14|     0.6343|
|  PLANT16|     1.9198|
|  PLANT12|     0.7731|
|  PLANT11|     0.5552|
|  PLANT09|     0.4651|
|  PLANT03|     0.5175|
|  PLANT13|     0.4697|
|  PLANT19|     0.6398|
|  PLANT08|     0.5229|
|  PLANT04|     0.4285|
+---------+-----------+

root
 |-- PlantCode: string (nullable = true)
 |-- CostPerUnit: double (nullable = true)



In [33]:

path = '/content/raw_data/PlantCustomers.csv'
plantcustomers_df = spark.read.csv(path,header = True,inferSchema = True)
plantcustomers_df.show()

plantcustomers_df.printSchema()

+---------+--------------------+
|PlantCode|            Customer|
+---------+--------------------+
|  PLANT02|   V5555555555555_16|
|  PLANT02| V555555555555555_29|
|  PLANT02|        V555555555_3|
|  PLANT02|   V55555555555555_8|
|  PLANT02|         V55555555_9|
|  PLANT02|           V55555_10|
|  PLANT02|         V55555555_5|
|  PLANT06| V555555555555555_18|
|  PLANT06|           V55555_10|
|  PLANT10| V555555555555555_29|
|  PLANT10|          V555555_34|
|  PLANT10|V5555555555555555...|
|  PLANT10|           V55555_10|
|  PLANT11|V5555555555555555...|
+---------+--------------------+

root
 |-- PlantCode: string (nullable = true)
 |-- Customer: string (nullable = true)



In [34]:

path = '/content/raw_data/PlantPorts.csv'
plantports_df = spark.read.csv(path,header = True,inferSchema = True)
plantports_df.show()

plantports_df.printSchema()

+---------+------+
|PlantCode|  Port|
+---------+------+
|  PLANT01|PORT01|
|  PLANT01|PORT02|
|  PLANT02|PORT03|
|  PLANT03|PORT04|
|  PLANT04|PORT05|
|  PLANT05|PORT06|
|  PLANT06|PORT06|
|  PLANT07|PORT01|
|  PLANT07|PORT02|
|  PLANT08|PORT04|
|  PLANT09|PORT04|
|  PLANT10|PORT01|
|  PLANT10|PORT02|
|  PLANT11|PORT04|
|  PLANT12|PORT04|
|  PLANT13|PORT04|
|  PLANT14|PORT07|
|  PLANT15|PORT08|
|  PLANT16|PORT09|
|  PLANT17|PORT10|
+---------+------+
only showing top 20 rows

root
 |-- PlantCode: string (nullable = true)
 |-- Port: string (nullable = true)



In [35]:

path = '/content/raw_data/PlantProducts.csv'
plantproducts_df = spark.read.csv(path,header = True,inferSchema = True)
plantproducts_df.show()

plantproducts_df.printSchema()

+---------+---------+
|PlantCode|ProductID|
+---------+---------+
|  PLANT15|  1698815|
|  PLANT17|  1664419|
|  PLANT17|  1664426|
|  PLANT17|  1672826|
|  PLANT17|  1674916|
|  PLANT17|  1674918|
|  PLANT17|  1675507|
|  PLANT17|  1676151|
|  PLANT17|  1676152|
|  PLANT17|  1677864|
|  PLANT17|  1677865|
|  PLANT17|  1679124|
|  PLANT17|  1685369|
|  PLANT17|  1685370|
|  PLANT17|  1685378|
|  PLANT17|  1685979|
|  PLANT17|  1691969|
|  PLANT17|  1694139|
|  PLANT17|  1694217|
|  PLANT17|  1696107|
+---------+---------+
only showing top 20 rows

root
 |-- PlantCode: string (nullable = true)
 |-- ProductID: integer (nullable = true)



In [36]:
OrderList_df.write.mode("overwrite").parquet("/content/transformed_data/orderList.parquet")
carrierrates_df.write.mode("overwrite").parquet("/content/transformed_data/carrierrates.parquet")
plantports_df.write.mode("overwrite").parquet("/content/transformed_data/plantports.parquet")
plantproducts_df.write.mode("overwrite").parquet("/content/transformed_data/plantproducts.parquet")
plantcustomers_df.write.mode("overwrite").parquet("/content/transformed_data/plantcustomers.parquet")
plantcapacities_df.write.mode("overwrite").parquet("/content/transformed_data/plantcapacities.parquet")
plantcost_df.write.mode("overwrite").parquet("/content/transformed_data/plantcost.parquet")


In [37]:
orderList_parquet = spark.read.parquet("/content/transformed_data/orderList.parquet").show()
carrierrates_parquet = spark.read.parquet("/content/transformed_data/carrierrates.parquet").show()
plantports_parquet = spark.read.parquet("/content/transformed_data/plantports.parquet").show()
plantproducts_parquet = spark.read.parquet("/content/transformed_data/plantproducts.parquet").show()
plantcustomers_parquet = spark.read.parquet("/content/transformed_data/plantcustomers.parquet").show()
plantcapacities_parquet = spark.read.parquet("/content/transformed_data/plantcapacities.parquet").show()
plantcost_parquet = spark.read.parquet("/content/transformed_data/plantcost.parquet").show()

+----------+----------+----------+-------+---+------------+-----------------+----------------+---------+---------+---------+---------------+------------+------+
|   OrderID| OrderDate|OriginPort|Carrier|TPT|ServiceLevel|ShipAheadDayCount|ShipLateDayCount| Customer|ProductID|PlantCode|DestinationPort|UnitQuantity|Weight|
+----------+----------+----------+-------+---+------------+-----------------+----------------+---------+---------+---------+---------------+------------+------+
|1447296446|2013-05-26|    PORT09|  V44_3|  1|         CRF|                3|               0|V55555_53|  1700106|  PLANT16|         PORT09|         808|  14.3|
|1447158014|2013-05-26|    PORT09|  V44_3|  1|         CRF|                3|               0|V55555_53|  1700106|  PLANT16|         PORT09|        3188| 87.94|
|1447138898|2013-05-26|    PORT09|  V44_3|  1|         CRF|                3|               0|V55555_53|  1700106|  PLANT16|         PORT09|        2331|  61.2|
|1447363527|2013-05-26|    PORT09|

In [38]:
# delta format

delta_path = '/content/transformed_data/carrierdata.delta'
carrierrates_df.write.format("delta").save(delta_path)

delta_df = spark.read.format("delta").load(delta_path)
delta_df.show()


+-------+----------+---------------+-----------------+-----------------+-----------+-----------+----+---------------+-----------------+-----------+
|Carrier|OriginPort|DestinationPort|MinWeightQuantity|MaxWeightQuantity|ServiceCode|MinimumCost|Rate|ModeDescription|TransportDayCount|CarrierType|
+-------+----------+---------------+-----------------+-----------------+-----------+-----------+----+---------------+-----------------+-----------+
| V444_6|    PORT08|         PORT09|            250.0|            500.0|        DTD|       43.0|0.71|            AIR|                2|V88888888_0|
| V444_6|    PORT08|         PORT09|             65.0|             70.0|        DTD|       43.0|0.75|            AIR|                2|V88888888_0|
| V444_6|    PORT08|         PORT09|             60.0|             65.0|        DTD|       43.0|0.79|            AIR|                2|V88888888_0|
| V444_6|    PORT08|         PORT09|             50.0|             55.0|        DTD|       43.0|0.83|           

In [39]:
# delta format

delta_path = '/content/transformed_data/orderlist.delta'
OrderList_df.write.format("delta").save(delta_path)

delta_df = spark.read.format("delta").load(delta_path)
delta_df.show()


+----------+----------+----------+-------+---+------------+-----------------+----------------+---------+---------+---------+---------------+------------+------+
|   OrderID| OrderDate|OriginPort|Carrier|TPT|ServiceLevel|ShipAheadDayCount|ShipLateDayCount| Customer|ProductID|PlantCode|DestinationPort|UnitQuantity|Weight|
+----------+----------+----------+-------+---+------------+-----------------+----------------+---------+---------+---------+---------------+------------+------+
|1447296446|2013-05-26|    PORT09|  V44_3|  1|         CRF|                3|               0|V55555_53|  1700106|  PLANT16|         PORT09|         808|  14.3|
|1447158014|2013-05-26|    PORT09|  V44_3|  1|         CRF|                3|               0|V55555_53|  1700106|  PLANT16|         PORT09|        3188| 87.94|
|1447138898|2013-05-26|    PORT09|  V44_3|  1|         CRF|                3|               0|V55555_53|  1700106|  PLANT16|         PORT09|        2331|  61.2|
|1447363527|2013-05-26|    PORT09|

In [40]:
# delta format

delta_path = '/content/transformed_data/plantports.delta'
plantports_df.write.format("delta").save(delta_path)

delta_df = spark.read.format("delta").load(delta_path)
delta_df.show()


+---------+------+
|PlantCode|  Port|
+---------+------+
|  PLANT01|PORT01|
|  PLANT01|PORT02|
|  PLANT02|PORT03|
|  PLANT03|PORT04|
|  PLANT04|PORT05|
|  PLANT05|PORT06|
|  PLANT06|PORT06|
|  PLANT07|PORT01|
|  PLANT07|PORT02|
|  PLANT08|PORT04|
|  PLANT09|PORT04|
|  PLANT10|PORT01|
|  PLANT10|PORT02|
|  PLANT11|PORT04|
|  PLANT12|PORT04|
|  PLANT13|PORT04|
|  PLANT14|PORT07|
|  PLANT15|PORT08|
|  PLANT16|PORT09|
|  PLANT17|PORT10|
+---------+------+
only showing top 20 rows



In [42]:
# delta format

delta_path = '/content/transformed_data/plantproducts.delta'
plantproducts_df.write.format("delta").save(delta_path)

delta_df = spark.read.format("delta").load(delta_path)
delta_df.show()


+---------+---------+
|PlantCode|ProductID|
+---------+---------+
|  PLANT15|  1698815|
|  PLANT17|  1664419|
|  PLANT17|  1664426|
|  PLANT17|  1672826|
|  PLANT17|  1674916|
|  PLANT17|  1674918|
|  PLANT17|  1675507|
|  PLANT17|  1676151|
|  PLANT17|  1676152|
|  PLANT17|  1677864|
|  PLANT17|  1677865|
|  PLANT17|  1679124|
|  PLANT17|  1685369|
|  PLANT17|  1685370|
|  PLANT17|  1685378|
|  PLANT17|  1685979|
|  PLANT17|  1691969|
|  PLANT17|  1694139|
|  PLANT17|  1694217|
|  PLANT17|  1696107|
+---------+---------+
only showing top 20 rows



In [43]:
# delta format

delta_path = '/content/transformed_data/plantcustomers.delta'
plantcustomers_df.write.format("delta").save(delta_path)

delta_df = spark.read.format("delta").load(delta_path)
delta_df.show()


+---------+--------------------+
|PlantCode|            Customer|
+---------+--------------------+
|  PLANT02|   V5555555555555_16|
|  PLANT02| V555555555555555_29|
|  PLANT02|        V555555555_3|
|  PLANT02|   V55555555555555_8|
|  PLANT02|         V55555555_9|
|  PLANT02|           V55555_10|
|  PLANT02|         V55555555_5|
|  PLANT06| V555555555555555_18|
|  PLANT06|           V55555_10|
|  PLANT10| V555555555555555_29|
|  PLANT10|          V555555_34|
|  PLANT10|V5555555555555555...|
|  PLANT10|           V55555_10|
|  PLANT11|V5555555555555555...|
+---------+--------------------+



In [44]:
# delta format

delta_path = '/content/transformed_data/plantcapacities.delta'
plantcapacities_df.write.format("delta").save(delta_path)

delta_df = spark.read.format("delta").load(delta_path)
delta_df.show()


+---------+-------------+
|PlantCode|DailyCapacity|
+---------+-------------+
|  PLANT15|           11|
|  PLANT17|            8|
|  PLANT18|          111|
|  PLANT05|          385|
|  PLANT02|          138|
|  PLANT01|         1070|
|  PLANT06|           49|
|  PLANT10|          118|
|  PLANT07|          265|
|  PLANT14|          549|
|  PLANT16|          457|
|  PLANT12|          209|
|  PLANT11|          332|
|  PLANT09|           11|
|  PLANT03|         1013|
|  PLANT13|          490|
|  PLANT19|            7|
|  PLANT08|           14|
|  PLANT04|          554|
+---------+-------------+



In [45]:
# delta format

delta_path = '/content/transformed_data/plantcost.delta'
plantcost_df.write.format("delta").save(delta_path)

delta_df = spark.read.format("delta").load(delta_path)
delta_df.show()


+---------+-----------+
|PlantCode|CostPerUnit|
+---------+-----------+
|  PLANT15|     1.4151|
|  PLANT17|     0.4289|
|  PLANT18|     2.0363|
|  PLANT05|     0.4881|
|  PLANT02|     0.4775|
|  PLANT01|      0.567|
|  PLANT06|     0.5541|
|  PLANT10|     0.4936|
|  PLANT07|     0.3714|
|  PLANT14|     0.6343|
|  PLANT16|     1.9198|
|  PLANT12|     0.7731|
|  PLANT11|     0.5552|
|  PLANT09|     0.4651|
|  PLANT03|     0.5175|
|  PLANT13|     0.4697|
|  PLANT19|     0.6398|
|  PLANT08|     0.5229|
|  PLANT04|     0.4285|
+---------+-----------+



In [46]:
# Check the history of the Delta table

delta_table_history = spark.sql(f"DESCRIBE HISTORY '/content/transformed_data/plantcapacities.delta'")
delta_table_history.show(truncate=False)
print('History displayed')

spark.sql("VACUUM '/content/transformed_data/plantcapacities.delta' RETAIN 168 HOURS")
print('Did vacuum operation')

+-------+-----------------------+------+--------+---------+------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation|operationParameters                       |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                           |userMetadata|engineInfo                         |
+-------+-----------------------+------+--------+---------+------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|0      |2024-09-27 10:17:54.971|NULL  |NULL    |WRITE    |{mode -> ErrorIfExists, partitionBy -> []}|NULL|NULL    |NULL     |NULL       |Serializable  |true         |{n

In [49]:
spark.sql("OPTIMIZE delta.`/content/transformed_data/orderlist.delta` ZORDER BY (ProductID)")
print('DATA OPTIMIZED')

DATA OPTIMIZED


In [50]:

time_travel = spark.read.format("delta").option("versionAsOf", 0).load('/content/transformed_data/orderlist.delta')
time_travel.show()


+----------+----------+----------+-------+---+------------+-----------------+----------------+---------+---------+---------+---------------+------------+------+
|   OrderID| OrderDate|OriginPort|Carrier|TPT|ServiceLevel|ShipAheadDayCount|ShipLateDayCount| Customer|ProductID|PlantCode|DestinationPort|UnitQuantity|Weight|
+----------+----------+----------+-------+---+------------+-----------------+----------------+---------+---------+---------+---------------+------------+------+
|1447296446|2013-05-26|    PORT09|  V44_3|  1|         CRF|                3|               0|V55555_53|  1700106|  PLANT16|         PORT09|         808|  14.3|
|1447158014|2013-05-26|    PORT09|  V44_3|  1|         CRF|                3|               0|V55555_53|  1700106|  PLANT16|         PORT09|        3188| 87.94|
|1447138898|2013-05-26|    PORT09|  V44_3|  1|         CRF|                3|               0|V55555_53|  1700106|  PLANT16|         PORT09|        2331|  61.2|
|1447363527|2013-05-26|    PORT09|

In [48]:
# to download completely in a zip file
import shutil
from google.colab import files

folder_path = '/content/new_data'
zip_file_path = '/content/new_data.zip'

# Create a ZIP archive of the folder
shutil.make_archive(zip_file_path.replace('.zip', ''), 'zip', folder_path)
files.download(zip_file_path)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>