In [1]:
from pyspark.sql.functions import *
import pyspark

spark = pyspark.sql.SparkSession.builder.appName("Product_Price_Tracking") \
    .config("spark.jars.packages", "io.delta:delta-core_2.11:0.6.1,org.apache.hadoop:hadoop-aws:2.7.3") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

from delta.tables import *

In [2]:
import configparser
import os
config = configparser.ConfigParser()
config.read(os.path.expanduser("~/.aws/credentials"))
aws_section = 'default'

_AWS_KEY_ID = config.get(aws_section, "aws_access_key_id")
_AWS_SECRET_KEY = config.get(aws_section, "aws_secret_access_key")

spark.sparkContext._conf.setAll([('spark.delta.logStore.class','org.apache.spark.sql.delta.storage.S3SingleDriverLogStore')])
spark._jsc.hadoopConfiguration().set("fs.s3a.awsAccessKeyId", _AWS_KEY_ID)
spark._jsc.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey", _AWS_SECRET_KEY)
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")

In [3]:
s3_raw_path = "s3a://odc-raw-data-ut-bucket/Arthur/delta_lake/raw"
s3_products_path = "s3a://odc-raw-data-ut-bucket/Arthur/delta_lake/products"

In [4]:
deltaTable = DeltaTable.forPath(spark, s3_products_path)

In [5]:
df_history = deltaTable.history()
df_history.show(20, False)

+-------+-------------------+------+--------+---------+----------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|timestamp          |userId|userName|operation|operationParameters                                             |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                                                                                                                                                          |
+-------+-------------------+------+--------+---------+----------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-------------------------------------

In [6]:
df_lastOperation = deltaTable.history(1)
df_lastOperation.show()

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+
|      9|2020-12-18 10:40:58|  null|    null|   UPDATE|[predicate -> (Pr...|null|    null|     null|          8|          null|        false|[numRemovedFiles ...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+



In [7]:
df_productsaug22 = spark.read.csv(f'{s3_raw_path}/products_aug22.csv', header=True, inferSchema=True)
df_productsaug22.show()     

+---------+-------------------+-----+--------+
|ProductID|               Date|Price|Quantity|
+---------+-------------------+-----+--------+
|      200|2020-08-22 00:00:00| 25.5|       2|
|      210|2020-08-22 00:00:00| 46.0|       5|
|      220|2020-08-22 00:00:00|34.56|       6|
|      230|2020-08-22 00:00:00|23.67|      11|
+---------+-------------------+-----+--------+



In [8]:
df_productsaug22.write.format("delta").mode("append").save(s3_products_path)

AnalysisException: 'A schema mismatch detected when writing to the Delta table (Table ID: a9599bc9-797d-474e-8987-e60dc07f4abc).\nTo enable schema migration, please set:\n\'.option("mergeSchema", "true")\'.\n\nTable schema:\nroot\n-- ProductID: integer (nullable = true)\n-- Date: timestamp (nullable = true)\n-- Price: double (nullable = true)\n\n\nData schema:\nroot\n-- ProductID: integer (nullable = true)\n-- Date: timestamp (nullable = true)\n-- Price: double (nullable = true)\n-- Quantity: integer (nullable = true)\n\n         \nIf Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE\ncommand for changing the schema.\n        ;'

In [9]:
df_productsaug22.write.format("delta").mode("append").option("mergeSchema", "true").save(s3_products_path)

In [10]:
df = spark.read.format("delta").load(s3_products_path)
df.show()

+---------+-------------------+------+--------+
|ProductID|               Date| Price|Quantity|
+---------+-------------------+------+--------+
|      200|2020-08-22 00:00:00|  25.5|       2|
|      210|2020-08-22 00:00:00|  46.0|       5|
|      220|2020-08-22 00:00:00| 34.56|       6|
|      230|2020-08-22 00:00:00| 23.67|      11|
|      220|2020-08-20 00:00:00| 44.56|    null|
|      240|2020-08-20 00:00:00|100.82|    null|
|      250|2020-08-21 00:00:00| 99.76|    null|
|      230|2020-08-20 00:00:00| 33.67|    null|
|      210|2020-08-21 00:00:00|  56.0|    null|
|      200|2020-08-20 00:00:00|  35.5|    null|
+---------+-------------------+------+--------+



In [11]:
df_productsaug_partition = spark.read.csv(f'{s3_raw_path}/*.csv', header=True, inferSchema=True)

In [12]:
df_productsaug_partition.write.format("delta").partitionBy("Date").option("path", "s3a://odc-raw-data-ut-bucket/Arthur/delta_lake/products_p").saveAsTable("products_p")

In [13]:
# 七天後執行 VACUUM 命令永久刪除滿足以下條件的所有資料檔案：
# 不再是活動表的一部分，並且
# 超過保留閾值（預設為七天）。
deltaTable.vacuum() 

DataFrame[]