## テーブルの型を分析しやすいように変換

In [0]:
%run ./00_config

### 型を変換した新テーブルを作成

In [0]:
# silverテーブルを作成
spark.sql(
    f"""
    CREATE TABLE IF NOT EXISTS {silver_table_path}
    (
        `Timestamp` TIMESTAMP,
        `Temperature` FLOAT,
        `Humidity` FLOAT,
        _datasource STRING,
        _ingest_timestamp timestamp
    )
    USING DELTA
    """
)
          

### 古いテーブルのデータを抽出

1. `bronze`テーブルから主キー（`Timestamp`）ごとに`_ingest_timestamp`列の最大日を抽出したサブセットを作成
2. 主キー＋`_ingest_timestamp`列の条件で、1のサブセットと`bronze`テーブルを結合
3. `bronze`テーブルのデータ型をシルバーテーブルと同一のデータ型に変換

*つまり、古いテーブルのレコードを重複が無いように抽出し、データの型を変換しています*

In [0]:

bronze_to_silver_sql = f'''
with latest_bronze_records (
    SELECT
        Timestamp,
        MAX(_ingest_timestamp) AS max_ingest_timestamp
        FROM
            {bronze_table_path}
        GROUP BY
            Timestamp
)
SELECT
    bronze.`Timestamp`::timestamp,
    bronze.`Temperature`::FLOAT,
    bronze.`Humidity`::FLOAT,
    bronze._datasource,
    bronze._ingest_timestamp::timestamp
    
    FROM
        {bronze_table_path} AS bronze
    INNER JOIN 
        latest_bronze_records AS latest_bronze
        ON 
            bronze.Timestamp =  latest_bronze.Timestamp
            AND bronze._ingest_timestamp =  latest_bronze.max_ingest_timestamp
'''
df = spark.sql(bronze_to_silver_sql)

# dropDuplicates関数にて、主キーの一意性を保証。連携日ごとの一意性が保証されないことがあるため。
df = df.drop_duplicates(['Timestamp'])

In [0]:
# 処理後の結果を確認
df.display()

## 抽出した古いテーブルのデータを、新しいテーブルに挿入
1. もし2個目のテーブルに既に同一時間のデータが有り、かつデータが更新されていたら、テーブルのレコードを更新する
2. もし2個目のテーブルにないデータなら、レコードを挿入する

In [0]:
# 一時ビューから`product2__silver`に対して、MERGE文によりアップサート処理を実施。
## 一時ビューを作成
df_view_name = f'_tmp_silver'
df.createOrReplaceTempView(df_view_name)

## Merge処理を実行
returned_df = spark.sql(f'''
MERGE INTO {silver_table_path} AS silver
  USING {df_view_name} AS src
  
  ON silver.Timestamp = src.Timestamp

  WHEN MATCHED
  AND silver._ingest_timestamp < src._ingest_timestamp
    THEN UPDATE SET
      silver.Temperature = src.Temperature,
      silver.Humidity = src.Humidity,
      silver._ingest_timestamp = src._ingest_timestamp
  WHEN NOT MATCHED
    THEN INSERT (
      Timestamp,
      Temperature,
      Humidity,
      _datasource,
      _ingest_timestamp
    )
    VALUES (
      src.Timestamp,
      src.Temperature,
      src.Humidity,
      src._datasource,
      src._ingest_timestamp)
''')
returned_df.display()