# データ・レイクでのクエリ操作

- [データ登録](#データ登録)
  - CSVファイルをクエリに適した形式（Parquet）に変換する
  - Parquetファイルをデータ・レイクにアップロードする
  - Parquetファイルをデータ・ソースとして、テーブル定義を行う
  - 検索結果から新しいテーブルを作成する
- [データ探索](#データ探索)
  - テーブルを結合する
- [データ出力](#データ出力)
  - 検索結果をCSV形式で保存する

In [4]:
#!pip3 install ipython-sql==0.3.9
!pip3 install PyHive==0.6.1
!pip3 install SQLAlchemy==1.3.13
!pip3 install thrift==0.13.0
!pip3 install sasl==0.2.1
!pip3 install thrift_sasl==0.3.0

!pip3 install pyarrow

!pip3 install impyla

You should consider upgrading via the 'pip install --upgrade pip' command.[0m
You should consider upgrading via the 'pip install --upgrade pip' command.[0m
You should consider upgrading via the 'pip install --upgrade pip' command.[0m
You should consider upgrading via the 'pip install --upgrade pip' command.[0m
You should consider upgrading via the 'pip install --upgrade pip' command.[0m
Collecting pyarrow
[?25l  Downloading https://files.pythonhosted.org/packages/00/d2/695bab1e1e7a4554b6dbd287d55cca096214bd441037058a432afd724bb1/pyarrow-0.16.0-cp36-cp36m-manylinux2014_x86_64.whl (63.1MB)
[K     |████████████████████████████████| 63.2MB 65.5MB/s eta 0:00:01
Installing collected packages: pyarrow
Successfully installed pyarrow-0.16.0
You should consider upgrading via the 'pip install --upgrade pip' command.[0m
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [1]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

import impala

## データ登録

### CSVファイルをクエリに適した形式（Parquet）に変換する

プロジェクトにアップデートしたCSVファイルをデータフレームとして読み込む。

In [5]:
!mkdir ./input

In [2]:
!unzip ./data/ecommerce-data.zip

Archive:  ./data/ecommerce-data.zip
replace data.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: ^C


In [3]:
filename = './data.csv'
df = pd.read_csv(filename, encoding= 'unicode_escape')

In [4]:
print(df.shape)
df[:5]

(541909, 8)


Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850.0,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850.0,United Kingdom
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/2010 8:26,2.75,17850.0,United Kingdom
3,536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/2010 8:26,3.39,17850.0,United Kingdom
4,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,12/1/2010 8:26,3.39,17850.0,United Kingdom


Parquetフォーマットで書き出す。

In [5]:
parquet_filename = './data.parquet'
table = pa.Table.from_pandas(df)
pq.write_table(table, parquet_filename)

In [6]:
print(table.shape)
table[:5]

(541909, 8)


pyarrow.Table
InvoiceNo: string
StockCode: string
Description: string
Quantity: int64
InvoiceDate: string
UnitPrice: double
CustomerID: double
Country: string
metadata
--------
OrderedDict([(b'pandas',
              b'{"index_columns": [{"kind": "range", "name": null, "start": '
              b'0, "stop": 541909, "step": 1}], "column_indexes": [{"name": '
              b'null, "field_name": null, "pandas_type": "unicode", "numpy_t'
              b'ype": "object", "metadata": {"encoding": "UTF-8"}}], "column'
              b's": [{"name": "InvoiceNo", "field_name": "InvoiceNo", "panda'
              b's_type": "unicode", "numpy_type": "object", "metadata": null'
              b'}, {"name": "StockCode", "field_name": "StockCode", "pandas_'
              b'type": "unicode", "numpy_type": "object", "metadata": null},'
              b' {"name": "Description", "field_name": "Description", "panda'
              b's_type": "unicode", "numpy_type": "object", "metadata": null'
              b'},

### Parquetファイルをデータ・レイクにアップロードする

In [7]:
!hdfs dfs -mkdir /tmp/ext_db/
!hdfs dfs -mkdir /tmp/ext_db/data

In [8]:
!hdfs dfs -put data.parquet /tmp/ext_db/data/.

In [9]:
!hdfs dfs -ls /tmp/ext_db/data/.

Found 1 items
-rw-r--r--   3 admin supergroup    3632393 2020-03-02 05:16 /tmp/ext_db2/data/data.parquet


### Parquetファイルをデータ・ソースとして、テーブル定義を行う

データ・レイクへ接続する

In [10]:
%load_ext sql

In [11]:
%%sql
impala://ip-10-0-0-10.ap-northeast-1.compute.internal:21050/default

'Connected: None@default'

In [12]:
%%sql
show tables

Done.


name
country
e_commerce_data
order_items


In [13]:
%%sql
DROP TABLE IF EXISTS e_commerce_data;
DROP TABLE IF EXISTS order_items;
DROP TABLE IF EXISTS country;

Done.
Done.
Done.


[]

#### DATAテーブル作成

In [14]:
%%sql
CREATE EXTERNAL TABLE IF NOT EXISTS e_commerce_data
  LIKE PARQUET '/tmp/ext_db/data/data.parquet'
STORED AS PARQUET
LOCATION '/tmp/ext_db/data'

Done.


[]

In [15]:
%%sql
select * from default.e_commerce_data limit 10

Done.


invoiceno,stockcode,description,quantity,invoicedate,unitprice,customerid,country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850.0,United Kingdom
536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850.0,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/2010 8:26,2.75,17850.0,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/2010 8:26,3.39,17850.0,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,12/1/2010 8:26,3.39,17850.0,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,12/1/2010 8:26,7.65,17850.0,United Kingdom
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,12/1/2010 8:26,4.25,17850.0,United Kingdom
536366,22633,HAND WARMER UNION JACK,6,12/1/2010 8:28,1.85,17850.0,United Kingdom
536366,22632,HAND WARMER RED POLKA DOT,6,12/1/2010 8:28,1.85,17850.0,United Kingdom
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,12/1/2010 8:34,1.69,13047.0,United Kingdom


**参考：** プログラムとしての実行例

In [16]:
from impala.dbapi import connect
conn = connect(host='ip-10-0-0-10.ap-northeast-1.compute.internal', port=21050)
#conn = connect(host='10.0.0.', port=21050)

cur = conn.cursor()

cur.execute('select * from default.e_commerce_data limit 2')

from impala.util import as_pandas
df = as_pandas(cur)

df.head()

Unnamed: 0,invoiceno,stockcode,description,quantity,invoicedate,unitprice,customerid,country
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850.0,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850.0,United Kingdom


#### COUNTRYテーブル作成

In [17]:
filename = './country.csv'
df = pd.read_csv(filename, encoding= 'unicode_escape')
parquet_filename = './country.parquet'
table = pa.Table.from_pandas(df)
pq.write_table(table, parquet_filename)

In [18]:
print(table.shape)
table[:5]

(38, 2)


pyarrow.Table
ID: int64
NAME: string
metadata
--------
OrderedDict([(b'pandas',
              b'{"index_columns": [{"kind": "range", "name": null, "start": '
              b'0, "stop": 38, "step": 1}], "column_indexes": [{"name": null'
              b', "field_name": null, "pandas_type": "unicode", "numpy_type"'
              b': "object", "metadata": {"encoding": "UTF-8"}}], "columns": '
              b'[{"name": "ID", "field_name": "ID", "pandas_type": "int64", '
              b'"numpy_type": "int64", "metadata": null}, {"name": "NAME", "'
              b'field_name": "NAME", "pandas_type": "unicode", "numpy_type":'
              b' "object", "metadata": null}], "creator": {"library": "pyarr'
              b'ow", "version": "0.16.0"}, "pandas_version": "0.25.1"}')])

In [19]:
!hdfs dfs -mkdir /tmp/ext_db/
!hdfs dfs -mkdir /tmp/ext_db/country

mkdir: `/tmp/ext_db2': File exists


In [20]:
!hdfs dfs -put -f country.parquet /tmp/ext_db/country/.

In [21]:
!hdfs dfs -ls /tmp/ext_db/country/

Found 1 items
-rw-r--r--   3 admin supergroup       2786 2020-03-02 05:17 /tmp/ext_db2/country/country.parquet


In [22]:
%%sql
DROP TABLE IF EXISTS country

Done.


[]

In [23]:
%%sql
CREATE EXTERNAL TABLE IF NOT EXISTS country
  LIKE PARQUET '/tmp/ext_db/country/country.parquet'
STORED AS PARQUET
LOCATION '/tmp/ext_db/country'

Done.


[]

In [24]:
%%sql
SELECT * FROM country LIMIT 5

Done.


id,name
1,Australia
2,Austria
3,Bahrain
4,Belgium
5,Brazil


### 検索結果から新しいテーブルを作成する

#### ORDER_ITEMSテーブル作成
`e_commerce_data`(非正規化データ)から、`country`カラムを正規化した`order_items`テーブルを作成

In [25]:
!hdfs dfs -mkdir /tmp/ext_db/order_items
!hdfs dfs -chmod 777 /tmp/ext_db/order_items

In [26]:
%%sql
SELECT e.InvoiceNo,e.StockCode,e.Description,e.Quantity,e.InvoiceDate,e.UnitPrice,e.CustomerID,c.ID AS CountryCode 
from default.e_commerce_data e, default.country c where e.Country = c.Name LIMIT 5

Done.


invoiceno,stockcode,description,quantity,invoicedate,unitprice,customerid,countrycode
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850.0,37
536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850.0,37
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/2010 8:26,2.75,17850.0,37
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/2010 8:26,3.39,17850.0,37
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,12/1/2010 8:26,3.39,17850.0,37


In [27]:
%%sql
DROP TABLE IF EXISTS order_items

Done.


[]

In [28]:
%%sql
CREATE EXTERNAL TABLE IF NOT EXISTS order_items
STORED AS PARQUET
LOCATION '/tmp/ext_db/order_items'
AS SELECT e.InvoiceNo,e.StockCode,e.Description,e.Quantity,e.InvoiceDate,e.UnitPrice,e.CustomerID,c.ID AS CountryCode 
from default.e_commerce_data e, default.country c WHERE e.Country = c.Name

Done.


summary
Inserted 541909 row(s)


In [29]:
%%sql
CREATE EXTERNAL TABLE IF NOT EXISTS order_items
STORED AS PARQUET
LOCATION '/tmp/ext_db/order_items'
AS select invoiceno,stockcode,quantity,invoicedate,unitprice,customerid,country from default.e_commerce_data

Done.


summary
Inserted 0 row(s)


In [30]:
%%sql
select * from default.order_items o limit 30

Done.


invoiceno,stockcode,description,quantity,invoicedate,unitprice,customerid,countrycode
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850.0,37
536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850.0,37
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/2010 8:26,2.75,17850.0,37
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/2010 8:26,3.39,17850.0,37
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,12/1/2010 8:26,3.39,17850.0,37
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,12/1/2010 8:26,7.65,17850.0,37
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,12/1/2010 8:26,4.25,17850.0,37
536366,22633,HAND WARMER UNION JACK,6,12/1/2010 8:28,1.85,17850.0,37
536366,22632,HAND WARMER RED POLKA DOT,6,12/1/2010 8:28,1.85,17850.0,37
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,12/1/2010 8:34,1.69,13047.0,37


## データ探索
### テーブルを結合する

In [31]:
%%sql
select o.invoiceno,o.stockcode,o.description, o.quantity,o.invoicedate,o.unitprice,o.customerid,c.name as country 
from default.order_items o 
INNER JOIN country c ON o.countrycode = c.id
limit 30

Done.


invoiceno,stockcode,description,quantity,invoicedate,unitprice,customerid,country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850.0,United Kingdom
536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850.0,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/2010 8:26,2.75,17850.0,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/2010 8:26,3.39,17850.0,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,12/1/2010 8:26,3.39,17850.0,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,12/1/2010 8:26,7.65,17850.0,United Kingdom
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,12/1/2010 8:26,4.25,17850.0,United Kingdom
536366,22633,HAND WARMER UNION JACK,6,12/1/2010 8:28,1.85,17850.0,United Kingdom
536366,22632,HAND WARMER RED POLKA DOT,6,12/1/2010 8:28,1.85,17850.0,United Kingdom
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,12/1/2010 8:34,1.69,13047.0,United Kingdom


### 製品情報の出力

## データ出力

ファイル出力関数の定義

In [32]:
from impala.dbapi import connect
import csv

def write_to_file(query,filename):
    conn = connect(host='ip-10-0-0-10.ap-northeast-1.compute.internal', port=21050)
    cur = conn.cursor()
    cur.execute(query)
    list = cur.fetchall()
    with open(filename, 'w') as f:
        writer = csv.writer(f)
        writer.writerows(list) 

出力内容の確認

In [33]:
%%sql
select distinct o.stockcode, o.description from order_items o order by o.stockcode

Done.


stockcode,description
10002,
10002,INFLATABLE POLITICAL GLOBE
10080,
10080,GROOVY CACTUS INFLATABLE
10080,check
10120,DOGGY RUBBER
10123C,
10123C,HEARTS WRAPPING TAPE
10123G,
10124A,SPOTS ON RED BOOKCOVER TAPE


ファイル出力実行

In [34]:
query = "select distinct o.stockcode, o.description from order_items o order by o.stockcode"
filename = "./products.csv"
write_to_file(query,filename)

出力内容確認

In [35]:
!ls -l

total 80424
drwxr-xr-x 2 cdsw cdsw     4096 Feb 27 02:53 bin
-rw-r--r-- 1 cdsw cdsw      501 Mar  2 02:59 country.csv
-rw-r--r-- 1 cdsw cdsw     2786 Mar  2 05:16 country.parquet
-rw-r--r-- 1 cdsw cdsw  5227042 Mar  2 05:14 customer-segmentation.ipynb
drwxr-xr-x 2 cdsw cdsw     4096 Feb 27 02:48 data
-rw-r--r-- 1 cdsw cdsw 45580638 Sep 20 22:35 data.csv
-rw-r--r-- 1 cdsw cdsw  3632393 Mar  2 05:15 data.parquet
-rw-r--r-- 1 cdsw cdsw 26822113 Mar  2 04:07 df_cleaned.p
-rw-r--r-- 1 cdsw cdsw      278 Feb 27 02:48 git_env.sh
-rw-r--r-- 1 cdsw cdsw   866216 Mar  2 05:18 impala.ipynb
drwxr-xr-x 8 cdsw cdsw     4096 Feb 27 02:53 ipython-sql
drwxr-xr-x 3 cdsw cdsw     4096 Feb 27 02:52 lib
drwxr-xr-x 4 cdsw cdsw     4096 Mar  2 04:11 nltk_data
-rw-r--r-- 1 cdsw cdsw   166870 Mar  2 05:19 products.csv
-rw-r--r-- 1 cdsw cdsw     6226 Feb 27 09:05 README.md
-rw-r--r-- 1 cdsw cdsw     1459 Mar  2 04:44 requirements.txt


In [36]:
!head products.csv

10002,
10002,INFLATABLE POLITICAL GLOBE 
10080,
10080,GROOVY CACTUS INFLATABLE
10080,check
10120,DOGGY RUBBER
10123C,
10123C,HEARTS WRAPPING TAPE 
10123G,
10124A,SPOTS ON RED BOOKCOVER TAPE
