# トランザクションデータをリアルタイムに分析する（HTAP） 


## 1. シンプルなデータ探索
ここでは Synapse Link を有効化した Cosmos DB の分析コンテナを DataFrame にロードして簡単な探索を行ってみます。

### 1-1. データロード
format に cosmos.olap を指定することが分析コンテナへのアクセスを表します。cosmos.oltp を指定することで分析ストアの元データであるトランザクションストアへのアクセスも可能です。  
option の spark.synapse.linkedService は作成したリンクサービス名を指定します。

In [1]:
%%pyspark

dfCustomer = spark.read\
    .format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<Azure Cosmos DB リンクサービス名>")\
    .option("spark.cosmos.container", "Customer")\
    .load()

dfSalesOrder = spark.read\
    .format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<Azure Cosmos DB リンクサービス名>")\
    .option("spark.cosmos.container", "SalesOrder")\
    .load()

StatementMeta(spark31, 0, 1, Finished, Available)

### 1-2. DataFrame のクレンジング
分析ストア内のシステム属性を削除します。

In [2]:
%%pyspark

dfCustomer.printSchema()

StatementMeta(spark31, 0, 2, Finished, Available)

root
 |-- _rid: string (nullable = true)
 |-- _ts: long (nullable = true)
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- emailAddress: string (nullable = true)
 |-- phoneNumber: string (nullable = true)
 |-- creationDate: string (nullable = true)
 |-- addresses: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- addressLine1: string (nullable = true)
 |    |    |-- addressLine2: string (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- zipCode: string (nullable = true)
 |-- password: struct (nullable = true)
 |    |-- hash: string (nullable = true)
 |    |-- salt: string (nullable = true)
 |-- _etag: string (nullable = true)

In [3]:
%%pyspark

display(dfCustomer.limit(10))

StatementMeta(spark31, 0, 3, Finished, Available)

SynapseWidget(Synapse.DataFrame, cd5f2f5b-3cbc-4d52-8622-af9d76318c7d)

In [4]:
%%pyspark

system_document_properties = {'_attachments','_etag','_rid','_self','_ts'}
customer_columns = list(set(dfCustomer.columns) - system_document_properties)
dfCustomer = dfCustomer.select(customer_columns)

display(dfCustomer.limit(10))

StatementMeta(spark31, 0, 4, Finished, Available)

SynapseWidget(Synapse.DataFrame, 407a74b1-f79f-4ca9-ab7c-7080be8a06c6)

In [5]:
%%pyspark

dfSalesOrder.printSchema()

In [6]:
%%pyspark

display(dfSalesOrder.limit(10))

In [5]:
%%pyspark

system_document_properties = {'_attachments','_etag','_rid','_self','_ts'}
salesorder_columns = list(set(dfSalesOrder.columns) - system_document_properties)
dfSalesOrder = dfSalesOrder.select(salesorder_columns)

display(dfSalesOrder.limit(10))

StatementMeta(spark31, 0, 5, Finished, Available)

SynapseWidget(Synapse.DataFrame, dcd71fa7-6697-4c70-a5ad-5767f4d39ec1)

## 1-3. 簡単な集計
DataFrame API と SparkSQL による簡単な集計を行います。

In [6]:
%%pyspark

print(dfCustomer.count())

StatementMeta(spark31, 0, 6, Finished, Available)

19119

In [7]:
%%pyspark

# 国ごと都市ごとの顧客数
display(dfCustomer.groupBy("addresses.country","addresses.city").count().orderBy("count",  ascending=False).limit(10))

StatementMeta(spark31, 0, 7, Finished, Available)

SynapseWidget(Synapse.DataFrame, 354e6cd9-4d57-4890-b58b-00f522e7aaba)

In [8]:
%%pyspark

dfCustomer.createOrReplaceTempView("CustomerTempView")

StatementMeta(spark31, 0, 8, Finished, Available)

In [9]:
%%sql

-- 国ごと都市ごとの顧客数
SELECT addresses.country, addresses.city, count(*) as count FROM CustomerTempView Group By addresses.country,addresses.city order by count desc LIMIT 10

StatementMeta(spark31, 0, 9, Finished, Available)

<Spark SQL result set with 10 rows and 3 fields>

## 2. クロスコンテナー結合を活用した分析
ここでは SQL（Spark SQL）を使って分析を行ってみます。

### 2-1. データベースの作成と外部テーブルの作成
SparkSQL による分析に備えて Cosmos DB の分析ストアを参照する外部テーブルを定義します。
USING に cosmos.olap を指定することが分析コンテナへのアクセスを表します。なお、cosmos.oltp を指定したオブジェクトは作成することができますが機能しません。  

In [10]:
%%sql

CREATE DATABASE AdventureWorks

StatementMeta(spark31, 0, 10, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

In [16]:
%%sql

CREATE TABLE AdventureWorks.Customers USING cosmos.olap OPTIONS (
    spark.synapse.linkedService '<Azure Cosmos DB リンクサービス名>',
    spark.cosmos.container 'Customer'
)

StatementMeta(spark31, 0, 17, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

In [17]:
%%sql

CREATE TABLE AdventureWorks.SalesOrders USING cosmos.olap OPTIONS (
    spark.synapse.linkedService '<Azure Cosmos DB リンクサービス名>',
    spark.cosmos.container 'SalesOrder'
)

StatementMeta(spark31, 0, 18, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

### 2-2. クロスコンテナー結合を活用した分析クエリを実行
Cosmos DB 自体は OLTP に最適化されているため、クロスコンテナー結合はサポートされていませんが、分析ストアについては分析に最適化されたフォーマットに自動変換されておりクロスコンテナー結合が可能になります。  
ここではクロスコンテナー結合を実行するビューを作成します。

In [18]:
%%sql

CREATE OR REPLACE VIEW AdventureWorks.SalesOrderView
AS
SELECT s.id as SalesOrderId, 
        c.id AS CustomerId, c.addresses.country AS Country, c.addresses.city AS City, 
        to_date(s.orderdate) AS OrderDate, to_date(s.shipdate) AS ShipDate
    FROM AdventureWorks.Customers c 
    INNER JOIN AdventureWorks.SalesOrders s
        ON c.id = s.CustomerId

StatementMeta(spark31, 0, 19, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

In [19]:
%%sql

-- 国ごと都市ごとの販売注文数
SELECT Country, City, Count(*) AS Total_Orders
    FROM AdventureWorks.SalesOrderView
    GROUP BY Country, City 
ORDER BY Total_Orders DESC
LIMIT 10

StatementMeta(spark31, 0, 20, Finished, Available)

<Spark SQL result set with 10 rows and 3 fields>

In [20]:
%%sql

-- SalesOrdersコンテナのdetails属性は配列を持つことを確認
SELECT id as SalesOrderId, details
    FROM AdventureWorks.SalesOrders
    LIMIT 10

StatementMeta(spark31, 0, 21, Finished, Available)

<Spark SQL result set with 10 rows and 2 fields>

In [21]:
%%sql

-- 配列の要素を行に分割
SELECT id as SalesOrderId, explode(details)
    FROM AdventureWorks.SalesOrders
    LIMIT 10

StatementMeta(spark31, 0, 22, Finished, Available)

<Spark SQL result set with 10 rows and 2 fields>

In [22]:
%%sql

-- 配列の要素を行に分割し位置を示す列を追加
SELECT id as SalesOrderId, posexplode(details) 
    FROM AdventureWorks.SalesOrders
    LIMIT 10

StatementMeta(spark31, 0, 23, Finished, Available)

<Spark SQL result set with 10 rows and 3 fields>

In [23]:
%%sql
CREATE OR REPLACE VIEW AdventureWorks.SalesOrderDetailsView
AS
    SELECT Ax.SalesOrderId,
        pos+1 as SalesOrderLine,
        col.sku AS SKUCode,
        col.name AS Name,
        col.price AS Price, 
        col.quantity AS Quantity
    FROM 
        (
            SELECT id as SalesOrderId, posexplode(details) FROM AdventureWorks.SalesOrders 
        ) Ax

StatementMeta(spark31, 0, 24, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

In [24]:
%%sql
SELECT * 
FROM AdventureWorks.SalesOrderDetailsView
LIMIT 10

StatementMeta(spark31, 0, 25, Finished, Available)

<Spark SQL result set with 10 rows and 6 fields>

In [25]:
%%sql
CREATE OR REPLACE VIEW AdventureWorks.SalesOrderStatsView
AS
SELECT o.Country, o.City,
    COUNT(DISTINCT o.CustomerId) Total_Customers,
    COUNT(DISTINCT d.SalesOrderId) Total_Orders,
    COUNT(d.SalesOrderId) Total_OrderLines,
    SUM(d.Quantity*d.Price) AS Total_Revenue,
    dense_rank() OVER (ORDER BY SUM(d.Quantity*d.Price) DESC) as Rank_Revenue,
    dense_rank() OVER (ORDER BY COUNT(DISTINCT d.SalesOrderId) DESC) as Rank_Orders,
    dense_rank() OVER (ORDER BY COUNT(d.SalesOrderId) DESC) as Rank_OrderLines,
    dense_rank() OVER (PARTITION BY o.Country ORDER BY SUM(d.Quantity*d.Price) DESC) as Rank_Revenue_Country
FROM AdventureWorks.SalesOrderView o
INNER JOIN AdventureWorks.SalesOrderDetailsView d
    ON o.SalesOrderId = d.SalesOrderId
WHERE Country IS NOT NULL OR City IS NOT NULL
GROUP BY o.Country, o.City
ORDER BY Total_Revenue DESC

StatementMeta(spark31, 0, 26, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

In [26]:
%%sql
SELECT concat(Country[0],'-',replace(City[0],' ','')) AS id, 
    'SalesOrderStatistic' AS type, *
FROM AdventureWorks.SalesOrderStatsView
WHERE Country[0] IS NOT NULL
LIMIT 10

StatementMeta(spark31, 0, 27, Finished, Available)

<Spark SQL result set with 10 rows and 12 fields>

## 3. 分析結果のトランザクションストアへの書き戻し
id を生成
動作はupsertになる

https://docs.microsoft.com/ja-jp/learn/modules/query-azure-cosmos-db-with-apache-spark-for-azure-synapse-analytics/7-write-data-back-to-transactional-store

In [27]:
%%pyspark

dfSalesOrderStatistic = spark.sql("SELECT concat(Country[0],'-',replace(City[0],' ','')) AS id, \
                                    'SalesOrderStatistic' AS type, \
                                    * FROM AdventureWorks.SalesOrderStatsView WHERE Country[0] IS NOT NULL")


StatementMeta(spark31, 0, 28, Finished, Available)

In [30]:
%%pyspark

# 国コードと都市名を連結して各ドキュメントを一意に識別することで、都市の最新の統計情報を簡単に検索できます。 
dfSalesOrderStatistic.write\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<Azure Cosmos DB リンクサービス名>")\
    .option("spark.cosmos.container", "Sales")\
    .mode('append')\
    .save()

StatementMeta(spark31, 0, 31, Finished, Available)

## 4. クリーンアップ

In [15]:
%%sql
DROP VIEW AdventureWorks.SalesOrderStatsView;
DROP VIEW AdventureWorks.SalesOrderDetailsView;
DROP VIEW AdventureWorks.SalesOrderView;
DROP TABLE AdventureWorks.Customers;
DROP TABLE AdventureWorks.SalesOrders;
DROP DATABASE AdventureWorks CASCADE;

StatementMeta(, 0, -1, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

# ↓ デバッグ

In [93]:
%%sql
SELECT concat(Country[0],'-',replace(City[0],' ','')) AS id, 
    'SalesOrderStatistic' AS type, '1234' as test_col, *
FROM AdventureWorks.SalesOrderStatsView
WHERE (Country[0] == 'GB' AND City[0] == 'London') OR (Country[0] == 'AU' AND City[0] == 'Bendigo')

In [96]:
demo_df = spark.sql("SELECT concat(Country[0],'-',replace(City[0],' ','')) AS id,  \
    'SalesOrderStatistic' AS type, '1234' as test_col, * \
FROM AdventureWorks.SalesOrderStatsView \
WHERE (Country[0] == 'GB' AND City[0] == 'London') OR (Country[0] == 'AU' AND City[0] == 'Bendigo')")

In [97]:
demo_df.write\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "demo_synapselink_for_cosmosdb_adventure")\
    .option("spark.cosmos.container", "Test")\
    .mode('append')\
    .save()