# 例で使うサンプルデータを作成する

In [1]:
import os
import boto3
from boto3.session import Session

起動時に設定しておいた環境変数`S3_TEST_URL`からベースとなるURLを取得する。
この例ではMinioを利用しており、あらかじめテスト用バケット`test`を作成しておいた。
Minio準備については、 https://dobachi.github.io/memo-blog/2021/06/13/Minio-getting-started/ を参照。

In [2]:
s3_base = os.environ['S3_TEST_URL']

In [3]:
s3_base

's3a://test'

念のため、SparkSessionを確かめる。

In [4]:
spark

サンプルデータを作成し、s3に書き込む。
ここでは、`s3://test/delta_table_01`と`s3://test/delta_table_02`を作成する。

予め設定しておいたAWSプロフィールを指す名を読み込む。

In [6]:
AWS_PROFILE = os.environ['AWS_PROFILE']
AWS_PROFILE

'delta_sharing'

上記の通り、`delta_sharing`という名称で、AWSプロフィールを作成してあることが前提である。

~/.aws/credentialsの例：

```
[delta_sharing]
aws_access_key_id = hoge
aws_secret_access_key = fuga
```

セッション生成

In [7]:
session = Session(profile_name=AWS_PROFILE)

ここではMinioを利用しているため、エンドポイントを指定する。

In [8]:
endpoint_url = os.environ.get('S3_ENDPOINT', None)
endpoint_url

'http://127.0.0.1:9000'

取得したエンドポイントを指定して、セッションを生成

In [10]:
s3 = session.resource('s3', endpoint_url=endpoint_url)

In [11]:
bucket = s3.Bucket('test')

ここから、対象データがあるかどうかを確認しながら、Minio上にS3プロトコルでDeltaテーブルのサンプルデータを作成する。

まず1個目。

In [12]:
key = '/delta_table_01'
print(key)
objects = bucket.objects.filter(Prefix=key)

/delta_table_01


In [13]:
if not any(True for _ in objects):
    spark.range(0, 10).write.format("delta").save(s3_base + '/delta_table_01')
else:
    print('delta_table_01 exists')

delta_table_01 exists


もしテーブルが存在しなければSparkを使ってデータが書き込まれる。
もし存在すればその旨メッセージを出力する。

2個目。

In [96]:
key = '/delta_table_02'
print(key)
objects = bucket.objects.filter(Prefix=key)

/delta_table_02


In [99]:
if not any(True for _ in objects):
    spark.range(0, 10).write.format("delta").save(s3_base + '/delta_table_02')
else:
    print('delta_table_02 exists')

delta_table_02 exists
