# デモ「ソースデータの探索による新たなインサイトの獲得」

# 動機：このレポートの元になっているデータの源流にある生データから新たなインサイトを得たい
<img src="https://raw.githubusercontent.com/gho9o9/share/main/media/image/Demo_Lineage_0.png"/>

# 1. カタログ検索（Purview との統合により Synapse Studio 上からデータ資産の検索や資産詳細情報の参照が可能）
<img src="https://raw.githubusercontent.com/gho9o9/share/main/media/image/Demo_Lineage_1.png"/>
<img src="https://raw.githubusercontent.com/gho9o9/share/main/media/image/Demo_Lineage_2.png"/>

# 2. アセット情報参照
<img src="https://raw.githubusercontent.com/gho9o9/share/main/media/image/Demo_Lineage_3.png"/>

# 3. 系列（リネージ）確認
<img src="https://raw.githubusercontent.com/gho9o9/share/main/media/image/Demo_Lineage_4.png"/>
<img src="https://raw.githubusercontent.com/gho9o9/share/main/media/image/Demo_Lineage_5.png"/>

# 4. 生データ確認
<img src="https://raw.githubusercontent.com/gho9o9/share/main/media/image/Demo_Lineage_6.png"/>
<img src="https://raw.githubusercontent.com/gho9o9/share/main/media/image/Demo_Lineage_7.png"/>

# 5. 生データファイル確認
<img src="https://raw.githubusercontent.com/gho9o9/share/main/media/image/Demo_Lineage_8.png"/>

# 6. 生データファイル探索
<img src="https://raw.githubusercontent.com/gho9o9/share/main/media/image/Demo_Lineage_9.png"/>
<img src="https://raw.githubusercontent.com/gho9o9/share/main/media/image/Demo_Lineage_10.png"/>

# 7. Spark による生データの探索

In [2]:
%%pyspark
df_taxidata = spark.read.load('abfss://sample@o9o9adlsgen2.dfs.core.windows.net/ADPE2E/raw/nyctaxidata-raw/yellow_tripdata_*-*.csv', format='csv'
## If header exists uncomment line below
, header=True
)
display(df_taxidata.limit(10))

## 7-1. Spark データフレームお作法による加工

In [3]:
%%pyspark

# Use Data Frame API Operations to Filter Data
display(df_taxidata.select("tpep_pickup_datetime", "passenger_count", "total_amount").filter("passenger_count > 6 and total_amount > 50.0"))

## 7-2 SQLによるデータ加工

### 7-2-1. Spark一時テーブル化

In [4]:
%%pyspark

# Create Local Temp View
df_taxidata.limit(10000).createOrReplaceTempView('NYCTaxiDataTable') # デモ用にデータ量絞り込み 



### 7-2-2. 使い慣れたSQLによる分析（コード頭のマジックコードでランタイムコンテキストと言語を切り替え可能）

In [5]:
%%sql

--Use SQL to count NYC Taxi Data records
select count(*) from NYCTaxiDataTable


In [6]:
%%sql
-- Use SQL to filter NYC Taxi Data records

select cast(tpep_pickup_datetime as date) as pickup_date
  , tpep_dropoff_datetime
  , passenger_count
  , total_amount
from NYCTaxiDataTable
where cast(tpep_pickup_datetime as date) = '2019-01-07'
  and passenger_count > 2

In [7]:
%%sql
-- Use SQL to aggregate NYC Taxi Data records and visualize data
select case payment_type
            when 1 then 'Credit card'
            when 2 then 'Cash'
            when 3 then 'No charge'
            when 4 then 'Dispute'
            when 5 then 'Unknown'
            when 6 then 'Voided trip'
        end as PaymentType
  , count(*) as TotalRideCount
from NYCTaxiDataTable
group by payment_type
order by TotalRideCount desc



## 7-3. リレーショナルデータベース上のデータの Spark データフレーム化 と 結合
### 7-3-1. SQLDBテーブルにアクセスしSparkデータフレーム化

In [8]:
%%spark
val df_location = spark.read.sqlanalytics("sqlpool.adpe2e.TaxiLocationLookup")

### 7-3-2. 該当データフレームを Spark 一時テーブル化

In [None]:
%%spark
df_location.createOrReplaceTempView("NYCTaxiLocation")

### 7-3-3. 複数のSparkテーブルを結合した複雑な分析クエリを実行

In [9]:
%%sql
select 
    VendorID
    , cast(tpep_pickup_datetime as date) as PickUpDate
    , concat(year(tpep_pickup_datetime), '-', format_string('%02d',month(tpep_pickup_datetime),'##')) as PickUpYearMonth --Partition Key
    , cast(tpep_pickup_datetime as timestamp) as PickUpDateTime
    , cast(tpep_dropoff_datetime as date) as DropOffDate
    , cast(tpep_dropoff_datetime as timestamp) as DropOffDateTime
    , passenger_count as PassengerCount
    , trip_distance as TripDistance
    , cast(PULocationID as int) as PickUpLocationID
    , pu.Zone as PickUpLocationZone
    , pu.Borough as PickUpLocationBorough
    , cast(DOLocationID as int) as DropOffLocationID
    , do.Zone as DropOffLocationZone
    , do.Borough as DropOffLocationBorough
    , cast(payment_type as int) as PaymentTypeID
    , case payment_type
            when 1 then 'Credit card'
            when 2 then 'Cash'
            when 3 then 'No charge'
            when 4 then 'Dispute'
            when 5 then 'Unknown'
            when 6 then 'Voided trip'
        end as PaymentTypeDescription
    , cast(case when fare_amount < 0 then 0.00 else fare_amount end as decimal(8,2)) as FareAmount --Cleanse invalid data
    , cast(case when extra < 0 then 0.00 else extra end as decimal(8,2)) as ExtraAmount --Cleanse invalid data
    , cast(case when mta_tax < 0 then 0.00 else mta_tax end as decimal(8,2)) as MTATaxAmount --Cleanse invalid data
    , cast(case when tip_amount < 0 then 0.00 else tip_amount end as decimal(8,2)) as TipAmount --Cleanse invalid data
    , cast(case when tolls_amount < 0 then 0.00 else tolls_amount end as decimal(8,2)) as TollsAmount --Cleanse invalid data
    , cast(case when improvement_surcharge < 0 then 0.00 else improvement_surcharge end as decimal(8,2)) as ImprovementSurchargeAmount --Cleanse invalid data
    , cast(case when total_amount < 0 then 0.00 else total_amount end as decimal(8,2)) as TotalRideAmount --Cleanse invalid data
from NYCTaxiDataTable as rides
  join NYCTaxiLocation as pu
    on rides.PULocationID = pu.LocationID
  join NYCTaxiLocation as do
    on rides.DOLocationID = do.LocationID
where passenger_count > 0 --Data Cleanup Rules
  and year(tpep_pickup_datetime) = 2019
limit 10

## 7-4. 分析クエリ結果を Spark テーブルとして永続化（Spark テーブル以外にも Dedicated SQL Pool や データレイク への永続化も容易）

In [10]:
%%pyspark

df_preped = spark.sql(" \
    select \
        VendorID \
        , cast(tpep_pickup_datetime as date) as PickUpDate \
        , concat(year(tpep_pickup_datetime), '-', format_string('%02d',month(tpep_pickup_datetime),'##')) as PickUpYearMonth \
        , cast(tpep_pickup_datetime as timestamp) as PickUpDateTime \
        , cast(tpep_dropoff_datetime as date) as DropOffDate \
        , cast(tpep_dropoff_datetime as timestamp) as DropOffDateTime \
        , passenger_count as PassengerCount \
        , trip_distance as TripDistance \
        , cast(PULocationID as int) as PickUpLocationID \
        , pu.Zone as PickUpLocationZone \
        , pu.Borough as PickUpLocationBorough \
        , cast(DOLocationID as int) as DropOffLocationID \
        , do.Zone as DropOffLocationZone \
        , do.Borough as DropOffLocationBorough \
        , cast(payment_type as int) as PaymentTypeID \
        , case payment_type \
                when 1 then 'Credit card' \
                when 2 then 'Cash' \
                when 3 then 'No charge' \
                when 4 then 'Dispute' \
                when 5 then 'Unknown' \
                when 6 then 'Voided trip' \
            end as PaymentTypeDescription \
        , cast(case when fare_amount < 0 then 0.00 else fare_amount end as decimal(8,2)) as FareAmount \
        , cast(case when extra < 0 then 0.00 else extra end as decimal(8,2)) as ExtraAmount \
        , cast(case when mta_tax < 0 then 0.00 else mta_tax end as decimal(8,2)) as MTATaxAmount  \
        , cast(case when tip_amount < 0 then 0.00 else tip_amount end as decimal(8,2)) as TipAmount  \
        , cast(case when tolls_amount < 0 then 0.00 else tolls_amount end as decimal(8,2)) as TollsAmount  \
        , cast(case when improvement_surcharge < 0 then 0.00 else improvement_surcharge end as decimal(8,2)) as ImprovementSurchargeAmount  \
        , cast(case when total_amount < 0 then 0.00 else total_amount end as decimal(8,2)) as TotalRideAmount  \
    from NYCTaxiDataTable as rides \
    join NYCTaxiLocation as pu \
        on rides.PULocationID = pu.LocationID \
    join NYCTaxiLocation as do \
        on rides.DOLocationID = do.LocationID \
    where passenger_count > 0 --Data Cleanup Rules \
    and year(tpep_pickup_datetime) = 2019 \
")
df_preped.write.mode("overwrite").saveAsTable("sparkdb01.adpe2e_NYCTaxiData_preped")


# 8. Serverless SQL Pool クエリエンジン（アドホッククエリエンジン）による探索

## 8-1. 生データをロードしたデータフレームを Parquet ファイルとして保存

In [11]:
df_taxidata.limit(10000).write.parquet("abfss://sample@o9o9adlsgen2.dfs.core.windows.net/ADPE2E/raw/nyctaxidata-raw-parquet/") # デモ用にデータ量絞り込み 

## 8-2. ファイルの探索１（ファイルパスを直接指定）

### 8-2-1. Synapse Studio 上のエクスプローラから Parquet ファイルを出力したディレクトリを辿り 右クリック -> 上位100行を選択
<img src="https://raw.githubusercontent.com/gho9o9/share/main/media/image/Demo_Lineage_11.png"/>

### 8-2-2. ファイルの種類で Parquet 形式を選択
<img src="https://raw.githubusercontent.com/gho9o9/share/main/media/image/Demo_Lineage_12.png"/>

### 8-2-3. 生成されたサンプルスクリプトを実行
<img src="https://raw.githubusercontent.com/gho9o9/share/main/media/image/Demo_Lineage_13.png"/>

### 8-2-4. サンプルスクリプトを編集し分析クエリを実行
<img src="https://raw.githubusercontent.com/gho9o9/share/main/media/image/Demo_Lineage_14.png"/>

## 8-3. ファイルの探索２（外部テーブル化しファイルパスを隠蔽）

## 8-3-1. Synapse Studio 上のエクスプローラから Parquet ファイルを出力したディレクトリを辿り 右クリック -> 外部テーブルの作成 
<img src="https://raw.githubusercontent.com/gho9o9/share/main/media/image/Demo_Lineage_15.png"/>
<img src="https://raw.githubusercontent.com/gho9o9/share/main/media/image/Demo_Lineage_16.png"/>

## 8-3-2. 外部テーブルの作成先のデータベースとテーブル名を指定 
<img src="https://raw.githubusercontent.com/gho9o9/share/main/media/image/Demo_Lineage_17.png"/>

## 8-3-3. 生成される外部テーブル化のためのスクリプトを実行
<img src="https://raw.githubusercontent.com/gho9o9/share/main/media/image/Demo_Lineage_18.png"/>

## 8-3-4. 標準SQL互換クエリでデータを探索可能
<img src="https://raw.githubusercontent.com/gho9o9/share/main/media/image/Demo_Lineage_19.png"/>

In [12]:
%%pyspark

# データソースファイルの削除
#   Ref https://docs.microsoft.com/en-us/azure/synapse-analytics/spark/microsoft-spark-utilities?pivots=programming-language-python
mssparkutils.fs \
    .rm('abfss://sample@o9o9adlsgen2.dfs.core.windows.net/ADPE2E/raw/nyctaxidata-raw-parquet/', True) 