<a href="https://colab.research.google.com/github/t-hashiguchi1995/100knock_pyspark/blob/main/preprocess_knock_Python_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# データサイエンス100本ノック（構造化データ加工編） - Python
# for Google Colab

# 処理対象のデータをgitで取得

In [1]:
!git clone https://github.com/The-Japan-DataScientist-Society/100knocks-preprocess

fatal: destination path '100knocks-preprocess' already exists and is not an empty directory.


In [2]:
cd /content/100knocks-preprocess/docker/work/data

/content/100knocks-preprocess/docker/work/data


# Sparkの準備
## Spark参考ドキュメント
- https://spark.apache.org/docs/3.2.0/api/python/reference/index.html

In [3]:
!pip install --upgrade pip
!pip install pyspark
!pip install -U pandas numpy scikit-learn imbalanced-learn

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
[0mLooking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
[0mLooking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting imbalanced-learn
  Using cached imbalanced_learn-0.9.1-py3-none-any.whl (199 kB)
[0m

In [4]:
from pyspark.sql import SparkSession 
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("IAB") \
    .getOrCreate()

In [5]:
spark

## はじめに
- 初めに以下のセルを実行してください
- 必要なライブラリのインポートとデータベース（PostgreSQL）からのデータ読み込みを行います
- pandas等、利用が想定されるライブラリは以下セルでインポートしています
- その他利用したいライブラリがあれば適宜インストールしてください（"!pip install ライブラリ名"でインストールも可能）
- 処理は複数回に分けても構いません
- 名前、住所等はダミーデータであり、実在するものではありません


In [6]:
# pipでオリジナルの解答に必要なライブラリーをインポート
import os
import pandas as pd
import numpy as np
from datetime import datetime, date
from dateutil.relativedelta import relativedelta
import math
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
from imblearn.under_sampling import RandomUnderSampler

# Spark SQL用
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.window import Window

# Spark DataFrame用
import pyspark.pandas as ps



In [7]:
# データを読み込み
# inferSchemaがfalseの場合、すべてのカラムはstringとなる。(datetime不可。timestamp可)
df_customer = spark.read\
              .format("csv")\
              .options(header="true", inferSchema="true")\
              .load('customer.csv')
df_category = spark.read\
              .format("csv")\
              .options(header="true", inferSchema="true")\
              .load('./category.csv')
df_product = spark.read\
              .format("csv")\
              .options(header="true", inferSchema="true")\
              .load('./product.csv')
df_receipt = spark.read\
              .format("csv")\
              .options(header="true", inferSchema="true")\
              .load('./receipt.csv')
df_store = spark.read\
            .format("csv")\
            .options(header="true", inferSchema="true")\
            .load('./store.csv')
df_geocode = spark.read\
            .format("csv")\
            .options(header="true", inferSchema="true")\
            .load('./geocode.csv')

In [8]:
# 業務環境では実行不可
# tmp_test = ps.read_csv('./geocode.csv')
# tmp_test.head()

# 演習問題

---
> P-001: レシート明細のデータフレーム（df_receipt）から全項目の先頭10件を表示し、どのようなデータを保有しているか目視で確認せよ。

In [9]:
df_receipt.show(10, truncate=False)

+---------+-----------+--------+----------+--------------+--------------+----------+--------+------+
|sales_ymd|sales_epoch|store_cd|receipt_no|receipt_sub_no|customer_id   |product_cd|quantity|amount|
+---------+-----------+--------+----------+--------------+--------------+----------+--------+------+
|20181103 |1541203200 |S14006  |112       |1             |CS006214000001|P070305012|1       |158   |
|20181118 |1542499200 |S13008  |1132      |2             |CS008415000097|P070701017|1       |81    |
|20170712 |1499817600 |S14028  |1102      |1             |CS028414000014|P060101005|1       |170   |
|20190205 |1549324800 |S14042  |1132      |1             |ZZ000000000000|P050301001|1       |25    |
|20180821 |1534809600 |S14025  |1102      |2             |CS025415000050|P060102007|1       |90    |
|20190605 |1559692800 |S13003  |1112      |1             |CS003515000195|P050102002|1       |138   |
|20181205 |1543968000 |S14024  |1102      |2             |CS024514000042|P080101005|1      

---
> P-002: レシート明細のデータフレーム（df_receipt）から売上日（sales_ymd）、顧客ID（customer_id）、商品コード（product_cd）、売上金額（amount）の順に列を指定し、10件表示させよ。

In [10]:
df_receipt.select(["sales_ymd", "customer_id", "product_cd", "amount"]).show(10)

+---------+--------------+----------+------+
|sales_ymd|   customer_id|product_cd|amount|
+---------+--------------+----------+------+
| 20181103|CS006214000001|P070305012|   158|
| 20181118|CS008415000097|P070701017|    81|
| 20170712|CS028414000014|P060101005|   170|
| 20190205|ZZ000000000000|P050301001|    25|
| 20180821|CS025415000050|P060102007|    90|
| 20190605|CS003515000195|P050102002|   138|
| 20181205|CS024514000042|P080101005|    30|
| 20190922|CS040415000178|P070501004|   128|
| 20170504|ZZ000000000000|P071302010|   770|
| 20191010|CS027514000015|P071101003|   680|
+---------+--------------+----------+------+
only showing top 10 rows



---
> P-003: レシート明細のデータフレーム（df_receipt）から売上日（sales_ymd）、顧客ID（customer_id）、商品コード（product_cd）、売上金額（amount）の順に列を指定し、10件表示させよ。ただし、sales_ymdはsales_dateに項目名を変更しながら抽出すること。

In [11]:
df_receipt.select(["sales_ymd", "customer_id", "product_cd", "amount"])\
          .withColumnRenamed("sales_ymd", "sales_date").show(10)

+----------+--------------+----------+------+
|sales_date|   customer_id|product_cd|amount|
+----------+--------------+----------+------+
|  20181103|CS006214000001|P070305012|   158|
|  20181118|CS008415000097|P070701017|    81|
|  20170712|CS028414000014|P060101005|   170|
|  20190205|ZZ000000000000|P050301001|    25|
|  20180821|CS025415000050|P060102007|    90|
|  20190605|CS003515000195|P050102002|   138|
|  20181205|CS024514000042|P080101005|    30|
|  20190922|CS040415000178|P070501004|   128|
|  20170504|ZZ000000000000|P071302010|   770|
|  20191010|CS027514000015|P071101003|   680|
+----------+--------------+----------+------+
only showing top 10 rows



---
> P-004: レシート明細のデータフレーム（df_receipt）から売上日（sales_ymd）、顧客ID（customer_id）、商品コード（product_cd）、売上金額（amount）の順に列を指定し、以下の条件を満たすデータを抽出せよ。
> - 顧客ID（customer_id）が"CS018205000001"

In [12]:
df_receipt.select(["sales_ymd", "customer_id", "product_cd", "amount"])\
          .filter(df_receipt["customer_id"]=="CS018205000001").show()

+---------+--------------+----------+------+
|sales_ymd|   customer_id|product_cd|amount|
+---------+--------------+----------+------+
| 20180911|CS018205000001|P071401012|  2200|
| 20180414|CS018205000001|P060104007|   600|
| 20170614|CS018205000001|P050206001|   990|
| 20170614|CS018205000001|P060702015|   108|
| 20190216|CS018205000001|P071005024|   102|
| 20180414|CS018205000001|P071101002|   278|
| 20190226|CS018205000001|P070902035|   168|
| 20190924|CS018205000001|P060805001|   495|
| 20190226|CS018205000001|P071401020|  2200|
| 20180911|CS018205000001|P071401005|  1100|
| 20190216|CS018205000001|P040101002|   218|
| 20190924|CS018205000001|P091503001|   280|
+---------+--------------+----------+------+



---
> P-005: レシート明細のデータフレーム（df_receipt）から売上日（sales_ymd）、顧客ID（customer_id）、商品コード（product_cd）、売上金額（amount）の順に列を指定し、以下の条件を満たすデータを抽出せよ。
> - 顧客ID（customer_id）が"CS018205000001"
> - 売上金額（amount）が1,000以上

In [13]:
df_receipt.select(["sales_ymd", "customer_id", "product_cd", "amount"])\
          .filter((df_receipt["customer_id"]=="CS018205000001") & (df_receipt["amount"]>=1000)).show()

+---------+--------------+----------+------+
|sales_ymd|   customer_id|product_cd|amount|
+---------+--------------+----------+------+
| 20180911|CS018205000001|P071401012|  2200|
| 20190226|CS018205000001|P071401020|  2200|
| 20180911|CS018205000001|P071401005|  1100|
+---------+--------------+----------+------+



---
> P-006: レシート明細データフレーム「df_receipt」から売上日（sales_ymd）、顧客ID（customer_id）、商品コード（product_cd）、売上数量（quantity）、売上金額（amount）の順に列を指定し、以下の条件を満たすデータを抽出せよ。
> - 顧客ID（customer_id）が"CS018205000001"
> - 売上金額（amount）が1,000以上または売上数量（quantity）が5以上

In [14]:
df_receipt.select(["sales_ymd", "customer_id", "product_cd", "quantity", "amount"])\
          .filter((df_receipt["customer_id"]=="CS018205000001") & ((df_receipt["amount"]>=1000) | (df_receipt["quantity"]>5))).show()

+---------+--------------+----------+--------+------+
|sales_ymd|   customer_id|product_cd|quantity|amount|
+---------+--------------+----------+--------+------+
| 20180911|CS018205000001|P071401012|       1|  2200|
| 20180414|CS018205000001|P060104007|       6|   600|
| 20190226|CS018205000001|P071401020|       1|  2200|
| 20180911|CS018205000001|P071401005|       1|  1100|
+---------+--------------+----------+--------+------+



---
> P-007: レシート明細のデータフレーム（df_receipt）から売上日（sales_ymd）、顧客ID（customer_id）、商品コード（product_cd）、売上金額（amount）の順に列を指定し、以下の条件を満たすデータを抽出せよ。
> - 顧客ID（customer_id）が"CS018205000001"
> - 売上金額（amount）が1,000以上2,000以下

In [15]:
df_receipt.select(["sales_ymd", "customer_id", "product_cd", "amount"])\
          .filter((F.col("customer_id")=="CS018205000001") & ((F.col("amount")>=1000) & (F.col("amount")<=2000))).show()

+---------+--------------+----------+------+
|sales_ymd|   customer_id|product_cd|amount|
+---------+--------------+----------+------+
| 20180911|CS018205000001|P071401005|  1100|
+---------+--------------+----------+------+



---
> P-008: レシート明細のデータフレーム（df_receipt）から売上日（sales_ymd）、顧客ID（customer_id）、商品コード（product_cd）、売上金額（amount）の順に列を指定し、以下の条件を満たすデータを抽出せよ。
> - 顧客ID（customer_id）が"CS018205000001"
> - 商品コード（product_cd）が"P071401019"以外

In [16]:
df_receipt.select(["sales_ymd", "customer_id", "product_cd", "amount"])\
          .filter((F.col("customer_id")=="CS018205000001") & (F.col("product_cd")!="P071401019")).show()

+---------+--------------+----------+------+
|sales_ymd|   customer_id|product_cd|amount|
+---------+--------------+----------+------+
| 20180911|CS018205000001|P071401012|  2200|
| 20180414|CS018205000001|P060104007|   600|
| 20170614|CS018205000001|P050206001|   990|
| 20170614|CS018205000001|P060702015|   108|
| 20190216|CS018205000001|P071005024|   102|
| 20180414|CS018205000001|P071101002|   278|
| 20190226|CS018205000001|P070902035|   168|
| 20190924|CS018205000001|P060805001|   495|
| 20190226|CS018205000001|P071401020|  2200|
| 20180911|CS018205000001|P071401005|  1100|
| 20190216|CS018205000001|P040101002|   218|
| 20190924|CS018205000001|P091503001|   280|
+---------+--------------+----------+------+



In [17]:
# AI Programmerのテスト
# pysparkでレシート明細のデータフレーム（df_receipt）から売上日（sales_ymd）、顧客ID（customer_id）、商品コード（product_cd）、売上金額（amount）の順に列を指定し、以下の条件を満たすデータを抽出せよ。顧客ID（customer_id）が"CS018205000001"商品コード（product_cd）が"P071401019"以外である
# df_receipt.select("sales_ymd","customer_id","product_cd","amount").where(col("customer_id") != "CS018205000001" and col("product_cd") != "P071401019")

df_receipt.select("sales_ymd","customer_id","product_cd","amount").where((F.col("customer_id") != "CS018205000001") & (F.col("product_cd") != "P071401019")).show()

+---------+--------------+----------+------+
|sales_ymd|   customer_id|product_cd|amount|
+---------+--------------+----------+------+
| 20181103|CS006214000001|P070305012|   158|
| 20181118|CS008415000097|P070701017|    81|
| 20170712|CS028414000014|P060101005|   170|
| 20190205|ZZ000000000000|P050301001|    25|
| 20180821|CS025415000050|P060102007|    90|
| 20190605|CS003515000195|P050102002|   138|
| 20181205|CS024514000042|P080101005|    30|
| 20190922|CS040415000178|P070501004|   128|
| 20170504|ZZ000000000000|P071302010|   770|
| 20191010|CS027514000015|P071101003|   680|
| 20190918|CS025415000134|P070401002|   138|
| 20171010|CS021515000126|P040402001|   228|
| 20180506|CS039414000052|P059001019|   428|
| 20190326|CS016215000032|P091401190|   780|
| 20180329|ZZ000000000000|P050104001|   115|
| 20170116|ZZ000000000000|P080803001|   100|
| 20190621|ZZ000000000000|P040102001|   268|
| 20181225|ZZ000000000000|P071401002|  1100|
| 20190603|CS026515000042|P070504016|   188|
| 20190606

---
> P-009: 以下の処理において、出力結果を変えずにORをANDに書き換えよ。

`df_store.query('not(prefecture_cd == "13" | floor_area > 900)')`

In [18]:
df_store.filter((F.col("prefecture_cd")!="13") & (F.col("floor_area")<=900)).show()

+--------+----------+-------------+----------+----------------------------------+--------------------------------------+------------+---------+--------+----------+
|store_cd|store_name|prefecture_cd|prefecture|                           address|                          address_kana|      tel_no|longitude|latitude|floor_area|
+--------+----------+-------------+----------+----------------------------------+--------------------------------------+------------+---------+--------+----------+
|  S14046|  北山田店|           14|  神奈川県|  神奈川県横浜市都筑区北山田一丁目| カナガワケンヨコハマシツヅキクキタ...|045-123-4049| 139.5916|35.56189|     831.0|
|  S14011|日吉本町店|           14|  神奈川県|神奈川県横浜市港北区日吉本町四丁目| カナガワケンヨコハマシコウホククヒ...|045-123-4033| 139.6316|35.54655|     890.0|
|  S12013|  習志野店|           12|    千葉県|          千葉県習志野市芝園一丁目|チバケンナラシノシシバゾノイッチョウメ|047-123-4002|  140.022|35.66122|     808.0|
+--------+----------+-------------+----------+----------------------------------+--------------------------------------+------------+-----

---
> P-010: 店舗データフレーム（df_store）から、店舗コード（store_cd）が"S14"で始まるものだけ全項目抽出し、10件だけ表示せよ。

In [19]:
df_store.filter(F.col("store_cd").rlike('S14.*')).show(10)

+--------+------------+-------------+----------+--------------------------------------+----------------------------------------+------------+---------+--------+----------+
|store_cd|  store_name|prefecture_cd|prefecture|                               address|                            address_kana|      tel_no|longitude|latitude|floor_area|
+--------+------------+-------------+----------+--------------------------------------+----------------------------------------+------------+---------+--------+----------+
|  S14010|      菊名店|           14|  神奈川県|        神奈川県横浜市港北区菊名一丁目|   カナガワケンヨコハマシコウホククキ...|045-123-4032| 139.6326|35.50049|    1732.0|
|  S14033|    阿久和店|           14|  神奈川県|    神奈川県横浜市瀬谷区阿久和西一丁目|   カナガワケンヨコハマシセヤクアクワ...|045-123-4043| 139.4961|35.45918|    1495.0|
|  S14036|相模原中央店|           14|  神奈川県|            神奈川県相模原市中央二丁目|   カナガワケンサガミハラシチュウオウ...|042-123-4045| 139.3716|35.57327|    1679.0|
|  S14040|    長津田店|           14|  神奈川県|神奈川県横浜市緑区長津田みなみ台五丁目|   カナガワケンヨコハマシミドリクナガ...|045-1

---
> P-011: 顧客データフレーム（df_customer）から顧客ID（customer_id）の末尾が1のものだけ全項目抽出し、10件だけ表示せよ。

In [20]:
df_customer.filter(F.col("customer_id").rlike('.*1$')).show(10)

+--------------+-------------+---------+------+-------------------+---+---------+---------------------------------+--------------------+----------------+------------+
|   customer_id|customer_name|gender_cd|gender|          birth_day|age|postal_cd|                          address|application_store_cd|application_date|   status_cd|
+--------------+-------------+---------+------+-------------------+---+---------+---------------------------------+--------------------+----------------+------------+
|CS037613000071|    六角 雅彦|        9|  不明|1952-04-01 00:00:00| 66| 136-0076|       東京都江東区南砂**********|              S13037|        20150414|0-00000000-0|
|CS028811000001|  堀井 かおり|        1|  女性|1933-03-27 00:00:00| 86| 245-0016| 神奈川県横浜市泉区和泉町*****...|              S14028|        20160115|0-00000000-0|
|CS040412000191|    川井 郁恵|        1|  女性|1977-01-05 00:00:00| 42| 226-0021|神奈川県横浜市緑区北八朔町****...|              S14040|        20151101|1-20091025-4|
|CS028314000011|  小菅 あおい|        1|  女性|1983-11-26

---
> P-012: 店舗データフレーム（df_store）から横浜市の店舗だけ全項目表示せよ。

In [21]:
df_store.filter(F.col("address").rlike('.*横浜市.*')).show()

+--------+----------+-------------+----------+--------------------------------------+----------------------------------------+------------+---------+--------+----------+
|store_cd|store_name|prefecture_cd|prefecture|                               address|                            address_kana|      tel_no|longitude|latitude|floor_area|
+--------+----------+-------------+----------+--------------------------------------+----------------------------------------+------------+---------+--------+----------+
|  S14010|    菊名店|           14|  神奈川県|        神奈川県横浜市港北区菊名一丁目|   カナガワケンヨコハマシコウホククキ...|045-123-4032| 139.6326|35.50049|    1732.0|
|  S14033|  阿久和店|           14|  神奈川県|    神奈川県横浜市瀬谷区阿久和西一丁目|   カナガワケンヨコハマシセヤクアクワ...|045-123-4043| 139.4961|35.45918|    1495.0|
|  S14040|  長津田店|           14|  神奈川県|神奈川県横浜市緑区長津田みなみ台五丁目|   カナガワケンヨコハマシミドリクナガ...|045-123-4046| 139.4994|35.52398|    1548.0|
|  S14050|阿久和西店|           14|  神奈川県|    神奈川県横浜市瀬谷区阿久和西一丁目|   カナガワケンヨコハマシセヤクアクワ...|045-123-4053| 139.4961

---
> P-013: 顧客データフレーム（df_customer）から、ステータスコード（status_cd）の先頭がアルファベットのA〜Fで始まるデータを全項目抽出し、10件だけ表示せよ。

In [22]:
df_customer.filter(F.col("status_cd").rlike('^[A-F]')).show(10)

+--------------+-------------+---------+------+-------------------+---+---------+----------------------------------+--------------------+----------------+------------+
|   customer_id|customer_name|gender_cd|gender|          birth_day|age|postal_cd|                           address|application_store_cd|application_date|   status_cd|
+--------------+-------------+---------+------+-------------------+---+---------+----------------------------------+--------------------+----------------+------------+
|CS031415000172|宇多田 貴美子|        1|  女性|1976-10-04 00:00:00| 42| 151-0053|      東京都渋谷区代々木**********|              S13031|        20150529|D-20100325-C|
|CS015414000103|    奥野 陽子|        1|  女性|1977-08-09 00:00:00| 41| 136-0073|        東京都江東区北砂**********|              S13015|        20150722|B-20100609-B|
|CS011215000048|    芦田 沙耶|        1|  女性|1992-02-01 00:00:00| 27| 223-0062|神奈川県横浜市港北区日吉本町***...|              S14011|        20150228|C-20100421-9|
|CS029415000023|    梅田 里穂|        1|  女性|19

---
> P-014: 顧客データフレーム（df_customer）から、ステータスコード（status_cd）の末尾が数字の1〜9で終わるデータを全項目抽出し、10件だけ表示せよ。

In [23]:
df_customer.filter(F.col("status_cd").rlike('[1-9]$')).show(10)

+--------------+-------------+---------+------+-------------------+---+---------+----------------------------------+--------------------+----------------+------------+
|   customer_id|customer_name|gender_cd|gender|          birth_day|age|postal_cd|                           address|application_store_cd|application_date|   status_cd|
+--------------+-------------+---------+------+-------------------+---+---------+----------------------------------+--------------------+----------------+------------+
|CS001215000145|    田崎 美紀|        1|  女性|1995-03-29 00:00:00| 24| 144-0055|      東京都大田区仲六郷**********|              S13001|        20170605|6-20090929-2|
|CS033513000180|      安斎 遥|        1|  女性|1962-07-11 00:00:00| 56| 241-0823|  神奈川県横浜市旭区善部町*****...|              S14033|        20150728|6-20080506-5|
|CS011215000048|    芦田 沙耶|        1|  女性|1992-02-01 00:00:00| 27| 223-0062|神奈川県横浜市港北区日吉本町***...|              S14011|        20150228|C-20100421-9|
|CS040412000191|    川井 郁恵|        1|  女性|197

---
> P-015: 顧客データフレーム（df_customer）から、ステータスコード（status_cd）の先頭がアルファベットのA〜Fで始まり、末尾が数字の1〜9で終わるデータを全項目抽出し、10件だけ表示せよ。

In [24]:
df_customer.filter((F.col("status_cd").rlike('^[A-F].*[1-9]$'))).show(10)

+--------------+-------------+---------+------+-------------------+---+---------+----------------------------------+--------------------+----------------+------------+
|   customer_id|customer_name|gender_cd|gender|          birth_day|age|postal_cd|                           address|application_store_cd|application_date|   status_cd|
+--------------+-------------+---------+------+-------------------+---+---------+----------------------------------+--------------------+----------------+------------+
|CS011215000048|    芦田 沙耶|        1|  女性|1992-02-01 00:00:00| 27| 223-0062|神奈川県横浜市港北区日吉本町***...|              S14011|        20150228|C-20100421-9|
|CS022513000105|  島村 貴美子|        1|  女性|1962-03-12 00:00:00| 57| 249-0002|    神奈川県逗子市山の根**********|              S14022|        20150320|A-20091115-7|
|CS001515000096|    水野 陽子|        9|  不明|1960-11-29 00:00:00| 58| 144-0053|    東京都大田区蒲田本町**********|              S13001|        20150614|A-20100724-7|
|CS013615000053|    西脇 季衣|        1|  女性|1953

---
> P-016: 店舗データフレーム（df_store）から、電話番号（tel_no）が3桁-3桁-4桁のデータを全項目表示せよ。

In [25]:
df_store.filter(F.col("tel_no").rlike('[0-9]{3}-[0-9]{3}-[0-9]{4}')).show()

+--------+------------+-------------+----------+--------------------------------------+----------------------------------------+------------+---------+--------+----------+
|store_cd|  store_name|prefecture_cd|prefecture|                               address|                            address_kana|      tel_no|longitude|latitude|floor_area|
+--------+------------+-------------+----------+--------------------------------------+----------------------------------------+------------+---------+--------+----------+
|  S12014|    千草台店|           12|    千葉県|        千葉県千葉市稲毛区千草台一丁目|   チバケンチバシイナゲクチグサダイイ...|043-123-4003|  140.118|35.63559|    1698.0|
|  S13002|    国分寺店|           13|    東京都|              東京都国分寺市本多二丁目|トウキョウトコクブンジシホンダニチョウメ|042-123-4008| 139.4802|35.70566|    1735.0|
|  S14010|      菊名店|           14|  神奈川県|        神奈川県横浜市港北区菊名一丁目|   カナガワケンヨコハマシコウホククキ...|045-123-4032| 139.6326|35.50049|    1732.0|
|  S14033|    阿久和店|           14|  神奈川県|    神奈川県横浜市瀬谷区阿久和西一丁目|   カナガワケンヨコハマシセヤクアクワ...

---
> P-17: 顧客データフレーム（df_customer）を生年月日（birth_day）で高齢順にソートし、先頭10件を全項目表示せよ。

In [26]:
df_customer.orderBy(F.col("birth_day")).show(10)

+--------------+-------------+---------+------+-------------------+---+---------+--------------------------------+--------------------+----------------+------------+
|   customer_id|customer_name|gender_cd|gender|          birth_day|age|postal_cd|                         address|application_store_cd|application_date|   status_cd|
+--------------+-------------+---------+------+-------------------+---+---------+--------------------------------+--------------------+----------------+------------+
|CS003813000014|  村山 菜々美|        1|  女性|1928-11-26 00:00:00| 90| 182-0007|    東京都調布市菊野台**********|              S13003|        20160214|0-00000000-0|
|CS026813000004|    吉村 朝陽|        1|  女性|1928-12-14 00:00:00| 90| 251-0043| 神奈川県藤沢市辻堂元町******...|              S14026|        20150723|0-00000000-0|
|CS018811000003|    熊沢 美里|        1|  女性|1929-01-07 00:00:00| 90| 204-0004|      東京都清瀬市野塩**********|              S13018|        20150403|0-00000000-0|
|CS027803000004|    内村 拓郎|        0|  男性|1929-01-12

---
> P-18: 顧客データフレーム（df_customer）を生年月日（birth_day）で若い順にソートし、先頭10件を全項目表示せよ。

In [27]:
df_customer.orderBy(F.col("birth_day").desc()).show(10)

+--------------+-------------+---------+------+-------------------+---+---------+---------------------------------+--------------------+----------------+------------+
|   customer_id|customer_name|gender_cd|gender|          birth_day|age|postal_cd|                          address|application_store_cd|application_date|   status_cd|
+--------------+-------------+---------+------+-------------------+---+---------+---------------------------------+--------------------+----------------+------------+
|CS035114000004|    大村 美里|        1|  女性|2007-11-25 00:00:00| 11| 156-0053|       東京都世田谷区桜**********|              S13035|        20150619|6-20091205-6|
|CS022103000002|  福山 はじめ|        9|  不明|2007-10-02 00:00:00| 11| 249-0006|     神奈川県逗子市逗子**********|              S14022|        20160909|0-00000000-0|
|CS002113000009|  柴田 真悠子|        1|  女性|2007-09-17 00:00:00| 11| 184-0014|  東京都小金井市貫井南町******...|              S13002|        20160304|0-00000000-0|
|CS004115000014|    松井 京子|        1|  女性|2007-

---
> P-19: レシート明細データフレーム（df_receipt）に対し、1件あたりの売上金額（amount）が高い順にランクを付与し、先頭10件を抽出せよ。項目は顧客ID（customer_id）、売上金額（amount）、付与したランクを表示させること。なお、売上金額（amount）が等しい場合は同一順位を付与するものとする。

In [28]:
# customer_idごとにランク付けする場合
# df_receipt.withColumn("rank", F.rank().over(Window.partitionBy("customer_id").orderBy(F.col("amount").desc()))).show()
df_receipt.withColumn("rank", F.rank().over(Window.orderBy(F.col("amount").desc())))\
          .select(["customer_id", "amount", "rank"]).show(10)

+--------------+------+----+
|   customer_id|amount|rank|
+--------------+------+----+
|CS011415000006| 10925|   1|
|ZZ000000000000|  6800|   2|
|CS028605000002|  5780|   3|
|CS015515000034|  5480|   4|
|ZZ000000000000|  5480|   4|
|ZZ000000000000|  5480|   4|
|ZZ000000000000|  5440|   7|
|CS021515000089|  5440|   7|
|ZZ000000000000|  5280|   9|
|CS017414000114|  5280|   9|
+--------------+------+----+
only showing top 10 rows



---
> P-020: レシート明細データフレーム（df_receipt）に対し、1件あたりの売上金額（amount）が高い順にランクを付与し、先頭10件を抽出せよ。項目は顧客ID（customer_id）、売上金額（amount）、付与したランクを表示させること。なお、売上金額（amount）が等しい場合でも別順位を付与すること。

In [29]:
df_receipt.withColumn("row_number", F.row_number().over(Window.orderBy(F.col("amount").desc())))\
          .select(["customer_id", "amount", "row_number"]).show(10)

+--------------+------+----------+
|   customer_id|amount|row_number|
+--------------+------+----------+
|CS011415000006| 10925|         1|
|ZZ000000000000|  6800|         2|
|CS028605000002|  5780|         3|
|CS015515000034|  5480|         4|
|ZZ000000000000|  5480|         5|
|ZZ000000000000|  5480|         6|
|ZZ000000000000|  5440|         7|
|CS021515000089|  5440|         8|
|ZZ000000000000|  5280|         9|
|CS017414000114|  5280|        10|
+--------------+------+----------+
only showing top 10 rows



---
> P-021: レシート明細データフレーム（df_receipt）に対し、件数をカウントせよ。

In [30]:
df_receipt.count()

104681

---
> P-022: レシート明細データフレーム（df_receipt）の顧客ID（customer_id）に対し、ユニーク件数をカウントせよ。

In [31]:
df_receipt.select("customer_id").distinct().count()

8307

---
> P-023: レシート明細データフレーム（df_receipt）に対し、店舗コード（store_cd）ごとに売上金額（amount）と売上数量（quantity）を合計せよ。

In [32]:
df_receipt.groupBy("store_cd").sum("amount", "quantity")\
          .withColumnRenamed("sum(amount)", "amount")\
          .withColumnRenamed("sum(quantity)", "quantity")\
          .show()

+--------+------+--------+
|store_cd|amount|quantity|
+--------+------+--------+
|  S14024|736323|    2417|
|  S14045|458484|    1398|
|  S13020|796383|    2383|
|  S13038|708884|    2337|
|  S14047|338329|    1041|
|  S12014|725167|    2358|
|  S13037|693087|    2344|
|  S13051|107452|     354|
|  S12013|787513|    2425|
|  S14046|412646|    1354|
|  S14011|805724|    2434|
|  S13016|793773|    2432|
|  S13035|715869|    2219|
|  S14006|712839|    2284|
|  S14027|714550|    2303|
|  S13008|809288|    2491|
|  S14025|755581|    2394|
|  S13031|705968|    2336|
|  S14036|203694|     635|
|  S14040|701858|    2233|
+--------+------+--------+
only showing top 20 rows



---
> P-024: レシート明細データフレーム（df_receipt）に対し、顧客ID（customer_id）ごとに最も新しい売上日（sales_ymd）を求め、10件表示せよ。

In [33]:
df_receipt.groupBy("customer_id").max("sales_ymd").show(10)

+--------------+--------------+
|   customer_id|max(sales_ymd)|
+--------------+--------------+
|CS024415000195|      20190907|
|CS011515000044|      20181226|
|CS017415000245|      20190615|
|CS011512000113|      20190829|
|CS025414000093|      20191016|
|CS010605000007|      20181009|
|CS011415000097|      20180909|
|CS018515000090|      20190717|
|CS015415000222|      20190928|
|CS038605000003|      20190910|
+--------------+--------------+
only showing top 10 rows



---
> P-025: レシート明細データフレーム（df_receipt）に対し、顧客ID（customer_id）ごとに最も古い売上日（sales_ymd）を求め、10件表示せよ。

In [34]:
df_receipt.groupBy("customer_id").min("sales_ymd").show(10)

+--------------+--------------+
|   customer_id|min(sales_ymd)|
+--------------+--------------+
|CS024415000195|      20170119|
|CS011515000044|      20170505|
|CS017415000245|      20180110|
|CS011512000113|      20181213|
|CS025414000093|      20170806|
|CS010605000007|      20170125|
|CS011415000097|      20170108|
|CS018515000090|      20170131|
|CS015415000222|      20170313|
|CS038605000003|      20170321|
+--------------+--------------+
only showing top 10 rows



---
> P-026: レシート明細データフレーム（df_receipt）に対し、顧客ID（customer_id）ごとに最も新しい売上日（sales_ymd）と古い売上日を求め、両者が異なるデータを10件表示せよ。

In [35]:
df_receipt_min = df_receipt.groupBy("customer_id")\
                            .agg({"sales_ymd":"min"})
df_receipt_max = df_receipt.groupBy("customer_id")\
                            .agg({"sales_ymd":"max"})
df_receipt_merge = df_receipt_min.join(df_receipt_max, df_receipt_min["customer_id"]==df_receipt_max["customer_id"])

In [36]:
df_receipt_merge.filter(F.col("min(sales_ymd)") != F.col("max(sales_ymd)")).show()

+--------------+--------------+--------------+--------------+
|   customer_id|min(sales_ymd)|   customer_id|max(sales_ymd)|
+--------------+--------------+--------------+--------------+
|CS024415000195|      20170119|CS024415000195|      20190907|
|CS011515000044|      20170505|CS011515000044|      20181226|
|CS017415000245|      20180110|CS017415000245|      20190615|
|CS011512000113|      20181213|CS011512000113|      20190829|
|CS025414000093|      20170806|CS025414000093|      20191016|
|CS010605000007|      20170125|CS010605000007|      20181009|
|CS011415000097|      20170108|CS011415000097|      20180909|
|CS018515000090|      20170131|CS018515000090|      20190717|
|CS015415000222|      20170313|CS015415000222|      20190928|
|CS038605000003|      20170321|CS038605000003|      20190910|
|CS028515000242|      20190106|CS028515000242|      20190804|
|CS040215000044|      20170219|CS040215000044|      20190924|
|CS049513000008|      20181220|CS049513000008|      20190927|
|CS00241

---
> P-027: レシート明細データフレーム（df_receipt）に対し、店舗コード（store_cd）ごとに売上金額（amount）の平均を計算し、降順でTOP5を表示せよ。

In [37]:
df_receipt.groupBy("store_cd").avg("amount").orderBy(F.col("avg(amount)").desc()).show(5)

+--------+------------------+
|store_cd|       avg(amount)|
+--------+------------------+
|  S13052|402.86746987951807|
|  S13015|351.11196043165467|
|  S13003| 350.9155188246097|
|  S14010|348.79126213592235|
|  S13001| 348.4703862660944|
+--------+------------------+
only showing top 5 rows



---
> P-028: レシート明細データフレーム（df_receipt）に対し、店舗コード（store_cd）ごとに売上金額（amount）の中央値を計算し、降順でTOP5を表示せよ。

In [38]:
import numpy as np
import scipy as sp


@F.udf(DoubleType())
def udf_median(values_list):
    med = np.median(values_list)
    return float(med)

@F.udf(DoubleType())
def udf_mode(values_list):
    mode, count = sp.stats.mode(values_list)
    return float(count)

In [39]:
df_receipt.groupBy("store_cd").agg(udf_median(F.collect_list('amount')).alias('median_amount')).orderBy(F.col("median_amount").desc()).show(5)

+--------+-------------+
|store_cd|median_amount|
+--------+-------------+
|  S13052|        190.0|
|  S14010|        188.0|
|  S14050|        185.0|
|  S14040|        180.0|
|  S13003|        180.0|
+--------+-------------+
only showing top 5 rows



---
> P-029: レシート明細データフレーム（df_receipt）に対し、店舗コード（store_cd）ごとに商品コード（product_cd）の最頻値を求めよ。

In [40]:
# df_receipt.groupby(["store_cd", "product_cd"]).count().orderBy(F.col("count").desc()).show()
df_receipt.groupBy("store_cd").agg(udf_mode(F.collect_list('product_cd')).alias('mode_product_cd')).orderBy(F.col("mode_product_cd").desc()).show()

+--------+---------------+
|store_cd|mode_product_cd|
+--------+---------------+
|  S14027|          152.0|
|  S14012|          142.0|
|  S14028|          140.0|
|  S13031|          115.0|
|  S12030|          115.0|
|  S12013|          107.0|
|  S14024|           96.0|
|  S13044|           96.0|
|  S12029|           92.0|
|  S13037|           88.0|
|  S13004|           88.0|
|  S13032|           85.0|
|  S14040|           80.0|
|  S13020|           79.0|
|  S13002|           78.0|
|  S13008|           77.0|
|  S12007|           72.0|
|  S14046|           71.0|
|  S14022|           71.0|
|  S14034|           71.0|
+--------+---------------+
only showing top 20 rows



---
> P-030: レシート明細データフレーム（df_receipt）に対し、店舗コード（store_cd）ごとに売上金額（amount）の標本分散を計算し、降順でTOP5を表示せよ。

In [41]:
# df_receipt.select(F.stddev(F.col("amount")).alias("std")).show()
df_receipt.groupBy("store_cd").agg(F.variance(F.col("amount")).alias("variance")).orderBy(F.col("variance").desc()).show(5)

+--------+------------------+
|store_cd|          variance|
+--------+------------------+
|  S13052| 441863.2525262341|
|  S14011|306442.24243156874|
|  S14034| 297068.3927400612|
|  S13001| 295558.8426177121|
|  S13015| 295427.1970858535|
+--------+------------------+
only showing top 5 rows



---
> P-031: レシート明細データフレーム（df_receipt）に対し、店舗コード（store_cd）ごとに売上金額（amount）の標本標準偏差を計算し、降順でTOP5を表示せよ。

In [42]:
# df_receipt.select(F.stddev(F.col("amount")).alias("std")).show()
df_receipt.groupBy("store_cd").agg(F.stddev(F.col("amount")).alias("std")).orderBy(F.col("std").desc()).show(5)

+--------+-----------------+
|store_cd|              std|
+--------+-----------------+
|  S13052|664.7279537722436|
|  S14011|553.5722558361905|
|  S14034| 545.039808399406|
|  S13001|543.6532374756101|
|  S13015|543.5321490821435|
+--------+-----------------+
only showing top 5 rows



---
> P-032: レシート明細データフレーム（df_receipt）の売上金額（amount）について、25％刻みでパーセンタイル値を求めよ。

In [43]:
df_receipt.select("amount").summary().show()

+-------+------------------+
|summary|            amount|
+-------+------------------+
|  count|            104681|
|   mean| 320.5600825364679|
| stddev|477.70274931379606|
|    min|                10|
|    25%|               102|
|    50%|               170|
|    75%|               288|
|    max|             10925|
+-------+------------------+



---
> P-033: レシート明細データフレーム（df_receipt）に対し、店舗コード（store_cd）ごとに売上金額（amount）の平均を計算し、330以上のものを抽出せよ。

In [44]:
df_receipt.groupBy("store_cd").agg(F.mean(F.col("amount")).alias("mean_amount")).filter(F.col("mean_amount")>=330).show()

+--------+------------------+
|store_cd|       mean_amount|
+--------+------------------+
|  S14045| 330.0820734341253|
|  S13020|  337.879932117098|
|  S14047| 330.0770731707317|
|  S12013|330.19412997903567|
|  S14011| 335.7183333333333|
|  S14010|348.79126213592235|
|  S13004|330.94394904458596|
|  S13052|402.86746987951807|
|  S13003| 350.9155188246097|
|  S13019|330.20861587554845|
|  S13001| 348.4703862660944|
|  S13015|351.11196043165467|
|  S14026|332.34058847239015|
+--------+------------------+



---
> P-034: レシート明細データフレーム（df_receipt）に対し、顧客ID（customer_id）ごとに売上金額（amount）を合計して全顧客の平均を求めよ。ただし、顧客IDが"Z"から始まるのものは非会員を表すため、除外して計算すること。


In [45]:
df_receipt.filter(F.col("customer_id").rlike('^(?!Z).*'))\
          .groupBy("customer_id").agg(F.sum(F.col("amount")))\
          .agg(F.mean(F.col("sum(amount)"))).show()

+-----------------+
| avg(sum(amount))|
+-----------------+
|2547.742234529256|
+-----------------+



---
> P-035: レシート明細データフレーム（df_receipt）に対し、顧客ID（customer_id）ごとに売上金額（amount）を合計して全顧客の平均を求め、平均以上に買い物をしている顧客を抽出せよ。ただし、顧客IDが"Z"から始まるのものは非会員を表すため、除外して計算すること。なお、データは10件だけ表示させれば良い。

In [46]:
df_amount_sum = df_receipt.filter(F.col("customer_id").rlike('^(?!Z).*'))\
                        .groupBy("customer_id").agg(F.sum(F.col("amount")))
amount_mean = df_amount_sum.agg(F.mean(F.col("sum(amount)"))).first()[0]
df_amount_sum.filter(F.col("sum(amount)")>=amount_mean).show(10)

+--------------+-----------+
|   customer_id|sum(amount)|
+--------------+-----------+
|CS024415000195|       4638|
|CS025414000093|       3929|
|CS010605000007|       6420|
|CS011415000097|       3462|
|CS018515000090|       3756|
|CS015415000222|      11472|
|CS038605000003|       2833|
|CS049513000008|       4450|
|CS017514000012|       4699|
|CS020515000105|       5283|
+--------------+-----------+
only showing top 10 rows



---
> P-036: レシート明細データフレーム（df_receipt）と店舗データフレーム（df_store）を内部結合し、レシート明細データフレームの全項目と店舗データフレームの店舗名（store_name）を10件表示させよ。

In [47]:
df_receipt.join(df_store, "store_cd", "inner").select(df_receipt.columns+["store_name"]).show(10)

+---------+-----------+--------+----------+--------------+--------------+----------+--------+------+----------+
|sales_ymd|sales_epoch|store_cd|receipt_no|receipt_sub_no|   customer_id|product_cd|quantity|amount|store_name|
+---------+-----------+--------+----------+--------------+--------------+----------+--------+------+----------+
| 20181103| 1541203200|  S14006|       112|             1|CS006214000001|P070305012|       1|   158|  葛が谷店|
| 20181118| 1542499200|  S13008|      1132|             2|CS008415000097|P070701017|       1|    81|    成城店|
| 20170712| 1499817600|  S14028|      1102|             1|CS028414000014|P060101005|       1|   170|  二ツ橋店|
| 20190205| 1549324800|  S14042|      1132|             1|ZZ000000000000|P050301001|       1|    25|  新山下店|
| 20180821| 1534809600|  S14025|      1102|             2|CS025415000050|P060102007|       1|    90|    大和店|
| 20190605| 1559692800|  S13003|      1112|             1|CS003515000195|P050102002|       1|   138|    狛江店|
| 20181205| 1

---
> P-037: 商品データフレーム（df_product）とカテゴリデータフレーム（df_category）を内部結合し、商品データフレームの全項目とカテゴリデータフレームの小区分名（category_small_name）を10件表示させよ。

In [48]:
df_product.join(df_category, ["category_major_cd", "category_medium_cd", "category_small_cd"], "inner").select(df_product.columns+["category_small_name"]).show(10)

+----------+-----------------+------------------+-----------------+----------+---------+-------------------+
|product_cd|category_major_cd|category_medium_cd|category_small_cd|unit_price|unit_cost|category_small_name|
+----------+-----------------+------------------+-----------------+----------+---------+-------------------+
|P040101001|                4|               401|            40101|       198|      149|             弁当類|
|P040101002|                4|               401|            40101|       218|      164|             弁当類|
|P040101003|                4|               401|            40101|       230|      173|             弁当類|
|P040101004|                4|               401|            40101|       248|      186|             弁当類|
|P040101005|                4|               401|            40101|       268|      201|             弁当類|
|P040101006|                4|               401|            40101|       298|      224|             弁当類|
|P040101007|                4|       

---
> P-038: 顧客データフレーム（df_customer）とレシート明細データフレーム（df_receipt）から、各顧客ごとの売上金額合計を求めよ。ただし、買い物の実績がない顧客については売上金額を0として表示させること。また、顧客は性別コード（gender_cd）が女性（1）であるものを対象とし、非会員（顧客IDが'Z'から始まるもの）は除外すること。なお、結果は10件だけ表示させれば良い。

In [49]:
df_customer.join(df_receipt, "customer_id", "inner")\
            .filter((F.col("gender_cd")==1) & (F.col("customer_id").rlike('^(?!Z).*')))\
            .groupBy("customer_id").agg(F.sum(F.col("amount"))).show()

+--------------+-----------+
|   customer_id|sum(amount)|
+--------------+-----------+
|CS024415000195|       4638|
|CS011515000044|       2533|
|CS017415000245|       1289|
|CS011512000113|        686|
|CS025414000093|       3929|
|CS011415000097|       3462|
|CS018515000090|       3756|
|CS015415000222|      11472|
|CS028515000242|       1525|
|CS040215000044|       1439|
|CS049513000008|       4450|
|CS002411000018|       1054|
|CS017514000103|        348|
|CS010413000069|        278|
|CS038415000161|       2512|
|CS017514000012|       4699|
|CS030412000172|        766|
|CS002513000664|        398|
|CS002514000021|        413|
|CS034515000045|       1105|
+--------------+-----------+
only showing top 20 rows



---
> P-039: レシート明細データフレーム（df_receipt）から売上日数の多い顧客の上位20件と、売上金額合計の多い顧客の上位20件を抽出し、完全外部結合せよ。ただし、非会員（顧客IDが'Z'から始まるもの）は除外すること。

In [50]:
df_receipt_tmp1 = df_receipt.filter(F.col("customer_id").rlike('^(?!Z).*'))\
                            .groupBy("customer_id")\
                            .agg(F.count(F.col("sales_ymd")))\
                            .orderBy(F.col("count(sales_ymd)").desc()).select(["customer_id", "count(sales_ymd)"]).limit(20)
df_receipt_tmp2 = df_receipt.filter(F.col("customer_id").rlike('^(?!Z).*'))\
                            .groupBy("customer_id")\
                            .agg(F.sum(F.col("amount")))\
                            .orderBy(F.col("sum(amount)").desc()).select(["customer_id", "sum(amount)"]).limit(20)
df_receipt_tmp1.join(df_receipt_tmp2, "customer_id", "outer").show()

+--------------+----------------+-----------+
|   customer_id|count(sales_ymd)|sum(amount)|
+--------------+----------------+-----------+
|CS001605000009|            null|      18925|
|CS006515000023|            null|      18372|
|CS007514000094|            null|      15735|
|CS007515000107|              40|       null|
|CS009414000059|            null|      15492|
|CS010214000002|              42|       null|
|CS010214000010|              44|      18585|
|CS011414000106|            null|      18338|
|CS011415000006|            null|      16094|
|CS014214000023|              38|       null|
|CS014415000077|              38|       null|
|CS015415000185|              44|      20153|
|CS015515000034|            null|      15300|
|CS016415000101|            null|      16348|
|CS016415000141|              40|      18372|
|CS017415000097|              40|      23086|
|CS021514000008|              36|       null|
|CS021514000045|              40|       null|
|CS021515000089|            null| 

---
> P-040: 全ての店舗と全ての商品を組み合わせると何件のデータとなるか調査したい。店舗（df_store）と商品（df_product）を直積した件数を計算せよ。

In [51]:
df_store.count() * df_product.count()

531590

---
> P-041: レシート明細データフレーム（df_receipt）の売上金額（amount）を日付（sales_ymd）ごとに集計し、前日からの売上金額増減を計算せよ。なお、計算結果は10件表示すればよい。

In [52]:
df_sales_amount_by_date = df_receipt.select(["sales_ymd", "amount"]).groupby("sales_ymd").agg(F.sum("amount"))
w = Window.orderBy("sales_ymd")
df_sales_amount_by_date = df_sales_amount_by_date.withColumn("lag_ymd", F.lag("sales_ymd").over(w))\
                                                 .withColumn("lag_amount", F.lag("sum(amount)").over(w))\
                                                 .withColumn("diff_amount", F.col("sum(amount)") - F.col("lag_amount"))

df_sales_amount_by_date.orderBy("sales_ymd").show(10)

+---------+-----------+--------+----------+-----------+
|sales_ymd|sum(amount)| lag_ymd|lag_amount|diff_amount|
+---------+-----------+--------+----------+-----------+
| 20170101|      33723|    null|      null|       null|
| 20170102|      24165|20170101|     33723|      -9558|
| 20170103|      27503|20170102|     24165|       3338|
| 20170104|      36165|20170103|     27503|       8662|
| 20170105|      37830|20170104|     36165|       1665|
| 20170106|      32387|20170105|     37830|      -5443|
| 20170107|      23415|20170106|     32387|      -8972|
| 20170108|      24737|20170107|     23415|       1322|
| 20170109|      26718|20170108|     24737|       1981|
| 20170110|      20143|20170109|     26718|      -6575|
+---------+-----------+--------+----------+-----------+
only showing top 10 rows



---
> P-042: レシート明細データフレーム（df_receipt）の売上金額（amount）を日付（sales_ymd）ごとに集計し、各日付のデータに対し、１日前、２日前、３日前のデータを結合せよ。結果は10件表示すればよい。

In [53]:
df_sales_amount_by_date = df_receipt.select(["sales_ymd", "amount"]).groupby("sales_ymd").agg(F.sum("amount"))
w = Window.orderBy("sales_ymd")
df_sales_amount_by_date = df_sales_amount_by_date.withColumn("lag_ymd_d1", F.lag("sales_ymd").over(w))\
                                                 .withColumn("lag_amount_d1", F.lag("sum(amount)").over(w))\
                                                 .withColumn("lag_ymd_d2", F.lag("sales_ymd", offset=2).over(w))\
                                                 .withColumn("lag_amount_d2", F.lag("sum(amount)", offset=2).over(w))\
                                                 .withColumn("lag_ymd_d3", F.lag("sales_ymd", offset=3).over(w))\
                                                 .withColumn("lag_amount_d3", F.lag("sum(amount)", offset=3).over(w))
                                                 
df_sales_amount_by_date.dropna().orderBy("sales_ymd").show(10)                              

+---------+-----------+----------+-------------+----------+-------------+----------+-------------+
|sales_ymd|sum(amount)|lag_ymd_d1|lag_amount_d1|lag_ymd_d2|lag_amount_d2|lag_ymd_d3|lag_amount_d3|
+---------+-----------+----------+-------------+----------+-------------+----------+-------------+
| 20170104|      36165|  20170103|        27503|  20170102|        24165|  20170101|        33723|
| 20170105|      37830|  20170104|        36165|  20170103|        27503|  20170102|        24165|
| 20170106|      32387|  20170105|        37830|  20170104|        36165|  20170103|        27503|
| 20170107|      23415|  20170106|        32387|  20170105|        37830|  20170104|        36165|
| 20170108|      24737|  20170107|        23415|  20170106|        32387|  20170105|        37830|
| 20170109|      26718|  20170108|        24737|  20170107|        23415|  20170106|        32387|
| 20170110|      20143|  20170109|        26718|  20170108|        24737|  20170107|        23415|
| 20170111

---
> P-043： レシート明細データフレーム（df_receipt）と顧客データフレーム（df_customer）を結合し、性別（gender）と年代（ageから計算）ごとに売上金額（amount）を合計した売上サマリデータフレーム（df_sales_summary）を作成せよ。性別は0が男性、1が女性、9が不明を表すものとする。
>
> ただし、項目構成は年代、女性の売上金額、男性の売上金額、性別不明の売上金額の4項目とすること（縦に年代、横に性別のクロス集計）。また、年代は10歳ごとの階級とすること。

In [54]:
df_customer_tmp = df_customer.withColumn("age_group",
                                         F.when(F.col("age") < 10, 1)\
                                          .when((F.col("age") >= 10) & (F.col("age") < 20), 10)\
                                          .when((F.col("age") >= 20) & (F.col("age") < 30), 20)\
                                          .when((F.col("age") >= 30) & (F.col("age") < 40), 30)\
                                          .when((F.col("age") >= 40) & (F.col("age") < 50), 40)\
                                          .when((F.col("age") >= 50) & (F.col("age") < 60), 50)\
                                          .when((F.col("age") >= 60) & (F.col("age") < 70), 60)\
                                          .when((F.col("age") >= 70) & (F.col("age") < 80), 70)\
                                          .otherwise(80)
                                         )
df_receipt_tmp = df_receipt.join(df_customer_tmp, "customer_id", "inner")
# df_sales_summary = df_receipt_tmp.groupby("age_group").pivot("gender").agg(F.sum("amount").alias("gender_age_amount"))
# ↓パフォーマンスよい
df_sales_summary = df_receipt_tmp.groupby("age_group", "gender").sum("amount").groupby("age_group").pivot("gender").agg(F.sum("sum(amount)").alias("gender_age_amount"))

---
> P-044： 前設問で作成した売上サマリデータフレーム（df_sales_summary）は性別の売上を横持ちさせたものであった。このデータフレームから性別を縦持ちさせ、年代、性別コード、売上金額の3項目に変換せよ。ただし、性別コードは男性を'00'、女性を'01'、不明を'99'とする。

In [55]:
df_sales_summary_pivot = df_sales_summary.withColumnRenamed("男性", "00").withColumnRenamed("女性", "01").withColumnRenamed("不明", "99")

In [56]:
df_sales_summary_tmp = df_sales_summary_pivot.toPandas()

df_sales_summary_tmp.melt(id_vars=["age_group"])

Unnamed: 0,age_group,variable,value
0,40,99,483512
1,20,99,44328
2,10,99,4317
3,50,99,342923
4,80,99,5111
5,70,99,2427
6,60,99,71418
7,30,99,50441
8,40,1,9320791
9,20,1,1363724


---
> P-045: 顧客データフレーム（df_customer）の生年月日（birth_day）は日付型（Date）でデータを保有している。これをYYYYMMDD形式の文字列に変換し、顧客ID（customer_id）とともに抽出せよ。データは10件を抽出すれば良い。

In [57]:
df_customer.withColumn("birth_day", F.to_date("birth_day")).select(["customer_id", "birth_day"]).show(10)

+--------------+----------+
|   customer_id| birth_day|
+--------------+----------+
|CS021313000114|1981-04-29|
|CS037613000071|1952-04-01|
|CS031415000172|1976-10-04|
|CS028811000001|1933-03-27|
|CS001215000145|1995-03-29|
|CS020401000016|1974-09-15|
|CS015414000103|1977-08-09|
|CS029403000008|1973-08-17|
|CS015804000004|1931-05-02|
|CS033513000180|1962-07-11|
+--------------+----------+
only showing top 10 rows



---
> P-046: 顧客データフレーム（df_customer）の申し込み日（application_date）はYYYYMMD形式の文字列型でデータを保有している。これを日付型（dateやdatetime）に変換し、顧客ID（customer_id）とともに抽出せよ。データは10件を抽出すれば良い。

In [58]:
df_customer.withColumn("application_date", F.col('application_date').cast(StringType()))\
           .withColumn("application_date", F.to_date(F.unix_timestamp(F.col("application_date"), "yyyyMMdd").cast("timestamp")))\
           .select(["customer_id", "application_date"]).show(10)

+--------------+----------------+
|   customer_id|application_date|
+--------------+----------------+
|CS021313000114|      2015-09-05|
|CS037613000071|      2015-04-14|
|CS031415000172|      2015-05-29|
|CS028811000001|      2016-01-15|
|CS001215000145|      2017-06-05|
|CS020401000016|      2015-02-25|
|CS015414000103|      2015-07-22|
|CS029403000008|      2015-05-15|
|CS015804000004|      2015-06-07|
|CS033513000180|      2015-07-28|
+--------------+----------------+
only showing top 10 rows



---
> P-047: レシート明細データフレーム（df_receipt）の売上日（sales_ymd）はYYYYMMDD形式の数値型でデータを保有している。これを日付型（dateやdatetime）に変換し、レシート番号(receipt_no)、レシートサブ番号（receipt_sub_no）とともに抽出せよ。データは10件を抽出すれば良い。

In [59]:
df_receipt.withColumn("sales_ymd", F.col('sales_ymd').cast(StringType()))\
          .withColumn("sales_ymd", F.to_date(F.unix_timestamp(F.col("sales_ymd"), "yyyyMMdd").cast("timestamp")))\
          .select(["receipt_no", "receipt_sub_no", "sales_ymd"]).show(10)

+----------+--------------+----------+
|receipt_no|receipt_sub_no| sales_ymd|
+----------+--------------+----------+
|       112|             1|2018-11-03|
|      1132|             2|2018-11-18|
|      1102|             1|2017-07-12|
|      1132|             1|2019-02-05|
|      1102|             2|2018-08-21|
|      1112|             1|2019-06-05|
|      1102|             2|2018-12-05|
|      1102|             1|2019-09-22|
|      1112|             2|2017-05-04|
|      1102|             1|2019-10-10|
+----------+--------------+----------+
only showing top 10 rows



---
> P-048: レシート明細データフレーム（df_receipt）の売上エポック秒（sales_epoch）は数値型のUNIX秒でデータを保有している。これを日付型（dateやdatetime）に変換し、レシート番号(receipt_no)、レシートサブ番号（receipt_sub_no）とともに抽出せよ。データは10件を抽出すれば良い。

In [60]:
df_receipt.withColumn("sales_epoch", F.col('sales_epoch').cast("timestamp"))\
          .withColumn("sales_epoch", F.to_date("sales_epoch"))\
          .select(["receipt_no", "receipt_sub_no", "sales_epoch"]).show(10)

+----------+--------------+-----------+
|receipt_no|receipt_sub_no|sales_epoch|
+----------+--------------+-----------+
|       112|             1| 2018-11-03|
|      1132|             2| 2018-11-18|
|      1102|             1| 2017-07-12|
|      1132|             1| 2019-02-05|
|      1102|             2| 2018-08-21|
|      1112|             1| 2019-06-05|
|      1102|             2| 2018-12-05|
|      1102|             1| 2019-09-22|
|      1112|             2| 2017-05-04|
|      1102|             1| 2019-10-10|
+----------+--------------+-----------+
only showing top 10 rows



---
> P-049: レシート明細データフレーム（df_receipt）の売上エポック秒（sales_epoch）を日付型（timestamp型）に変換し、"年"だけ取り出してレシート番号(receipt_no)、レシートサブ番号（receipt_sub_no）とともに抽出せよ。データは10件を抽出すれば良い。

In [61]:
df_receipt.withColumn("sales_epoch", F.col('sales_epoch').cast("timestamp"))\
          .withColumn("sales_epoch", F.year("sales_epoch"))\
          .select(["receipt_no", "receipt_sub_no", "sales_epoch"]).show(10)

+----------+--------------+-----------+
|receipt_no|receipt_sub_no|sales_epoch|
+----------+--------------+-----------+
|       112|             1|       2018|
|      1132|             2|       2018|
|      1102|             1|       2017|
|      1132|             1|       2019|
|      1102|             2|       2018|
|      1112|             1|       2019|
|      1102|             2|       2018|
|      1102|             1|       2019|
|      1112|             2|       2017|
|      1102|             1|       2019|
+----------+--------------+-----------+
only showing top 10 rows



---
> P-050: レシート明細データフレーム（df_receipt）の売上エポック秒（sales_epoch）を日付型（timestamp型）に変換し、"月"だけ取り出してレシート番号(receipt_no)、レシートサブ番号（receipt_sub_no）とともに抽出せよ。なお、"月"は0埋め2桁で取り出すこと。データは10件を抽出すれば良い。

In [62]:
df_receipt.withColumn("sales_epoch", F.col('sales_epoch').cast("timestamp"))\
          .withColumn("sales_epoch", F.date_format("sales_epoch", "MM"))\
          .select(["receipt_no", "receipt_sub_no", "sales_epoch"]).show(10)

+----------+--------------+-----------+
|receipt_no|receipt_sub_no|sales_epoch|
+----------+--------------+-----------+
|       112|             1|         11|
|      1132|             2|         11|
|      1102|             1|         07|
|      1132|             1|         02|
|      1102|             2|         08|
|      1112|             1|         06|
|      1102|             2|         12|
|      1102|             1|         09|
|      1112|             2|         05|
|      1102|             1|         10|
+----------+--------------+-----------+
only showing top 10 rows



---
> P-051: レシート明細データフレーム（df_receipt）の売上エポック秒（sales_epoch）を日付型（timestamp型）に変換し、"日"だけ取り出してレシート番号(receipt_no)、レシートサブ番号（receipt_sub_no）とともに抽出せよ。なお、"日"は0埋め2桁で取り出すこと。データは10件を抽出すれば良い。

In [63]:
df_receipt.withColumn("sales_epoch", F.col('sales_epoch').cast("timestamp"))\
          .withColumn("sales_epoch", F.date_format("sales_epoch", "dd"))\
          .select(["receipt_no", "receipt_sub_no", "sales_epoch"]).show(10)

+----------+--------------+-----------+
|receipt_no|receipt_sub_no|sales_epoch|
+----------+--------------+-----------+
|       112|             1|         03|
|      1132|             2|         18|
|      1102|             1|         12|
|      1132|             1|         05|
|      1102|             2|         21|
|      1112|             1|         05|
|      1102|             2|         05|
|      1102|             1|         22|
|      1112|             2|         04|
|      1102|             1|         10|
+----------+--------------+-----------+
only showing top 10 rows



---
> P-052: レシート明細データフレーム（df_receipt）の売上金額（amount）を顧客ID（customer_id）ごとに合計の上、売上金額合計に対して2000円以下を0、2000円超を1に2値化し、顧客ID、売上金額合計とともに10件表示せよ。ただし、顧客IDが"Z"から始まるのものは非会員を表すため、除外して計算すること。

In [64]:
df_receipt.filter(F.col("customer_id").rlike('^(?!Z).*'))\
          .groupBy("customer_id").agg(F.sum(F.col("amount")).alias("sum_amount"))\
          .withColumn("2000over_flg", F.when(F.col("sum_amount") > 2000, 1).otherwise(0))\
          .select(["customer_id", "sum_amount", "2000over_flg"]).show(10)

+--------------+----------+------------+
|   customer_id|sum_amount|2000over_flg|
+--------------+----------+------------+
|CS024415000195|      4638|           1|
|CS011515000044|      2533|           1|
|CS017415000245|      1289|           0|
|CS011512000113|       686|           0|
|CS025414000093|      3929|           1|
|CS010605000007|      6420|           1|
|CS011415000097|      3462|           1|
|CS018515000090|      3756|           1|
|CS015415000222|     11472|           1|
|CS038605000003|      2833|           1|
+--------------+----------+------------+
only showing top 10 rows



---
> P-053: 顧客データフレーム（df_customer）の郵便番号（postal_cd）に対し、東京（先頭3桁が100〜209のもの）を1、それ以外のものを0に２値化せよ。さらにレシート明細データフレーム（df_receipt）と結合し、全期間において買い物実績のある顧客数を、作成した2値ごとにカウントせよ。

In [65]:
df_customer_tmp = df_customer.withColumn("tokyo_flg", F.when((100 <= F.substring("postal_cd", 1, 3).cast(IntegerType())) & \
                                                             (F.substring("postal_cd", 1, 3).cast(IntegerType()) <= 209), 1).otherwise(0))
df_receipt.join(df_customer_tmp, "customer_id", "inner").groupby("tokyo_flg").agg(F.countDistinct("customer_id")).show()

+---------+------------------+
|tokyo_flg|count(customer_id)|
+---------+------------------+
|        1|              4400|
|        0|              3906|
+---------+------------------+



---
> P-054: 顧客データデータフレーム（df_customer）の住所（address）は、埼玉県、千葉県、東京都、神奈川県のいずれかとなっている。都道府県毎にコード値を作成し、顧客ID、住所とともに抽出せよ。値は埼玉県を11、千葉県を12、東京都を13、神奈川県を14とすること。結果は10件表示させれば良い。

In [66]:
df_customer.withColumn("prefecture_cd", F.when(F.substring("address", 1, 3)=="埼玉県", "11")\
                                         .when(F.substring("address", 1, 3)=="千葉県", "12")\
                                         .when(F.substring("address", 1, 3)=="東京都", "13")\
                                         .when(F.substring("address", 1, 3)=="神奈川", "14")).show(10)

+--------------+-------------+---------+------+-------------------+---+---------+--------------------------------+--------------------+----------------+------------+-------------+
|   customer_id|customer_name|gender_cd|gender|          birth_day|age|postal_cd|                         address|application_store_cd|application_date|   status_cd|prefecture_cd|
+--------------+-------------+---------+------+-------------------+---+---------+--------------------------------+--------------------+----------------+------------+-------------+
|CS021313000114|  大野 あや子|        1|  女性|1981-04-29 00:00:00| 37| 259-1113|  神奈川県伊勢原市粟窪**********|              S14021|        20150905|0-00000000-0|           14|
|CS037613000071|    六角 雅彦|        9|  不明|1952-04-01 00:00:00| 66| 136-0076|      東京都江東区南砂**********|              S13037|        20150414|0-00000000-0|           13|
|CS031415000172|宇多田 貴美子|        1|  女性|1976-10-04 00:00:00| 42| 151-0053|    東京都渋谷区代々木**********|              S13031|        20150

---
> P-055: レシート明細データフレーム（df_receipt）の売上金額（amount）を顧客ID（customer_id）ごとに合計し、その合計金額の四分位点を求めよ。その上で、顧客ごとの売上金額合計に対して以下の基準でカテゴリ値を作成し、顧客ID、売上金額と合計ともに表示せよ。カテゴリ値は上から順に1〜4とする。結果は10件表示させれば良い。
>
> - 最小値以上第一四分位未満
> - 第一四分位以上第二四分位未満
> - 第二四分位以上第三四分位未満
> - 第三四分位以上

In [67]:
df_receipt_tmp = df_receipt.groupby("customer_id").agg(F.sum("amount").alias("sum_amount"))

In [68]:
perc25 = df_receipt_tmp.select(F.percentile_approx("sum_amount", 0.25)).collect()[0][0]
perc50 = df_receipt_tmp.select(F.percentile_approx("sum_amount", 0.50)).collect()[0][0]
perc75 = df_receipt_tmp.select(F.percentile_approx("sum_amount", 0.75)).collect()[0][0]

In [69]:
df_receipt_tmp.withColumn("pct_group", \
                          F.when(F.col("sum_amount") >= perc75, 4)\
                           .when((F.col("sum_amount") < perc75) & (F.col("sum_amount") >= perc50), 3)\
                           .when((F.col("sum_amount") < perc50) & (F.col("sum_amount") >= perc25), 2)\
                           .when(F.col("sum_amount") < perc25, 1))\
                          .show()

+--------------+----------+---------+
|   customer_id|sum_amount|pct_group|
+--------------+----------+---------+
|CS024415000195|      4638|        4|
|CS011515000044|      2533|        3|
|CS017415000245|      1289|        2|
|CS011512000113|       686|        2|
|CS025414000093|      3929|        4|
|CS010605000007|      6420|        4|
|CS011415000097|      3462|        3|
|CS018515000090|      3756|        4|
|CS015415000222|     11472|        4|
|CS038605000003|      2833|        3|
|CS028515000242|      1525|        3|
|CS040215000044|      1439|        2|
|CS049513000008|      4450|        4|
|CS002411000018|      1054|        2|
|CS017514000103|       348|        1|
|CS010413000069|       278|        1|
|CS038415000161|      2512|        3|
|CS017514000012|      4699|        4|
|CS030412000172|       766|        2|
|CS002513000664|       398|        1|
+--------------+----------+---------+
only showing top 20 rows



---
> P-056: 顧客データフレーム（df_customer）の年齢（age）をもとに10歳刻みで年代を算出し、顧客ID（customer_id）、生年月日（birth_day）とともに抽出せよ。ただし、60歳以上は全て60歳代とすること。年代を表すカテゴリ名は任意とする。先頭10件を表示させればよい。

In [70]:
df_customer_tmp = df_customer.withColumn("age_group",
                                         F.when(F.col("age") < 10, 1)\
                                          .when((F.col("age") >= 10) & (F.col("age") < 20), 10)\
                                          .when((F.col("age") >= 20) & (F.col("age") < 30), 20)\
                                          .when((F.col("age") >= 30) & (F.col("age") < 40), 30)\
                                          .when((F.col("age") >= 40) & (F.col("age") < 50), 40)\
                                          .when((F.col("age") >= 50) & (F.col("age") < 60), 50)\
                                          .otherwise(60)
                                         )
df_customer_tmp.select(["customer_id", "birth_day", "age_group"]).show(10)

+--------------+-------------------+---------+
|   customer_id|          birth_day|age_group|
+--------------+-------------------+---------+
|CS021313000114|1981-04-29 00:00:00|       30|
|CS037613000071|1952-04-01 00:00:00|       60|
|CS031415000172|1976-10-04 00:00:00|       40|
|CS028811000001|1933-03-27 00:00:00|       60|
|CS001215000145|1995-03-29 00:00:00|       20|
|CS020401000016|1974-09-15 00:00:00|       40|
|CS015414000103|1977-08-09 00:00:00|       40|
|CS029403000008|1973-08-17 00:00:00|       40|
|CS015804000004|1931-05-02 00:00:00|       60|
|CS033513000180|1962-07-11 00:00:00|       50|
+--------------+-------------------+---------+
only showing top 10 rows



---
> P-057: 前問題の抽出結果と性別（gender）を組み合わせ、新たに性別×年代の組み合わせを表すカテゴリデータを作成せよ。組み合わせを表すカテゴリの値は任意とする。先頭10件を表示させればよい。

In [71]:
df_customer_tmp.withColumn("gemder_age_agegroup", F.concat(F.col("gender"), F.col("age_group").cast(StringType()))).select(["customer_id", "birth_day", "age_group", "gemder_age_agegroup"]).show(10)

+--------------+-------------------+---------+-------------------+
|   customer_id|          birth_day|age_group|gemder_age_agegroup|
+--------------+-------------------+---------+-------------------+
|CS021313000114|1981-04-29 00:00:00|       30|             女性30|
|CS037613000071|1952-04-01 00:00:00|       60|             不明60|
|CS031415000172|1976-10-04 00:00:00|       40|             女性40|
|CS028811000001|1933-03-27 00:00:00|       60|             女性60|
|CS001215000145|1995-03-29 00:00:00|       20|             女性20|
|CS020401000016|1974-09-15 00:00:00|       40|             男性40|
|CS015414000103|1977-08-09 00:00:00|       40|             女性40|
|CS029403000008|1973-08-17 00:00:00|       40|             男性40|
|CS015804000004|1931-05-02 00:00:00|       60|             男性60|
|CS033513000180|1962-07-11 00:00:00|       50|             女性50|
+--------------+-------------------+---------+-------------------+
only showing top 10 rows



---
> P-058: 顧客データフレーム（df_customer）の性別コード（gender_cd）をダミー変数化し、顧客ID（customer_id）とともに抽出せよ。結果は10件表示させれば良い。

In [72]:
df_customer.withColumn("gender_code_1", F.when(F.col("gender_cd") == 1, 1).otherwise(0))\
           .withColumn("gender_code_2", F.when(F.col("gender_cd") == 2, 1).otherwise(0))\
           .withColumn("gender_code_9", F.when(F.col("gender_cd") == 9, 1).otherwise(0)).select(["customer_id", "gender_code_1", "gender_code_2", "gender_code_9"]).show(10)

+--------------+-------------+-------------+-------------+
|   customer_id|gender_code_1|gender_code_2|gender_code_9|
+--------------+-------------+-------------+-------------+
|CS021313000114|            1|            0|            0|
|CS037613000071|            0|            0|            1|
|CS031415000172|            1|            0|            0|
|CS028811000001|            1|            0|            0|
|CS001215000145|            1|            0|            0|
|CS020401000016|            0|            0|            0|
|CS015414000103|            1|            0|            0|
|CS029403000008|            0|            0|            0|
|CS015804000004|            0|            0|            0|
|CS033513000180|            1|            0|            0|
+--------------+-------------+-------------+-------------+
only showing top 10 rows



---
> P-059: レシート明細データフレーム（df_receipt）の売上金額（amount）を顧客ID（customer_id）ごとに合計し、合計した売上金額を平均0、標準偏差1に標準化して顧客ID、売上金額合計とともに表示せよ。標準化に使用する標準偏差は、不偏標準偏差と標本標準偏差のどちらでも良いものとする。ただし、顧客IDが"Z"から始まるのものは非会員を表すため、除外して計算すること。結果は10件表示させれば良い。

In [73]:
def z_score_w(col, w):
    avg_ = F.avg(col).over(w)
    avg_sq = F.avg(col * col).over(w)
    sd_ = F.sqrt(avg_sq - avg_ * avg_)
    return (col - avg_) / sd_


df_receipt_tmp1 = df_receipt.filter(F.col("customer_id").rlike('^(?!Z).*'))\
                            .groupBy("customer_id")\
                            .agg(F.sum(F.col("amount")).alias("sum_amount"))
w = Window().partitionBy()
df_receipt_tmp1.withColumn("zscore", z_score_w(df_receipt_tmp1.sum_amount, w)).show(10)

+--------------+----------+--------------------+
|   customer_id|sum_amount|              zscore|
+--------------+----------+--------------------+
|CS024415000195|      4638|  0.7683329849668896|
|CS011515000044|      2533|-0.00541892260756...|
|CS017415000245|      1289|-0.46268608318838106|
|CS011512000113|       686| -0.6843356795149347|
|CS025414000093|      3929|  0.5077201096840928|
|CS010605000007|      6420|  1.4233571651558095|
|CS011415000097|      3462| 0.33606113541460586|
|CS018515000090|      3756| 0.44412909780267684|
|CS015415000222|     11472|  3.2803617433344976|
|CS038605000003|      2833| 0.10485450840067179|
+--------------+----------+--------------------+
only showing top 10 rows



---
> P-060: レシート明細データフレーム（df_receipt）の売上金額（amount）を顧客ID（customer_id）ごとに合計し、合計した売上金額を最小値0、最大値1に正規化して顧客ID、売上金額合計とともに表示せよ。ただし、顧客IDが"Z"から始まるのものは非会員を表すため、除外して計算すること。結果は10件表示させれば良い。

In [74]:
def min_max_w(col, w):
    min_ = F.min(col).over(w)
    max_ = F.max(col).over(w)
    return (col - min_) / (max_ - min_)

df_receipt_tmp1 = df_receipt.filter(F.col("customer_id").rlike('^(?!Z).*'))\
                            .groupBy("customer_id")\
                            .agg(F.sum(F.col("amount")).alias("sum_amount"))
w = Window().partitionBy()
df_receipt_tmp1.withColumn("min_max_scaler", min_max_w(df_receipt_tmp1.sum_amount, w)).show(10)

+--------------+----------+--------------------+
|   customer_id|sum_amount|      min_max_scaler|
+--------------+----------+--------------------+
|CS024415000195|      4638| 0.19847062912756344|
|CS011515000044|      2533| 0.10701251303441084|
|CS017415000245|      1289|0.052963156065345844|
|CS011512000113|       686|  0.0267639902676399|
|CS025414000093|      3929|  0.1676659714980883|
|CS010605000007|      6420|  0.2758950295446646|
|CS011415000097|      3462| 0.14737573861661452|
|CS018515000090|      3756| 0.16014946124435175|
|CS015415000222|     11472|  0.4953945081682308|
|CS038605000003|      2833| 0.12004692387904067|
+--------------+----------+--------------------+
only showing top 10 rows



---
> P-061: レシート明細データフレーム（df_receipt）の売上金額（amount）を顧客ID（customer_id）ごとに合計し、合計した売上金額を常用対数化（底=10）して顧客ID、売上金額合計とともに表示せよ。ただし、顧客IDが"Z"から始まるのものは非会員を表すため、除外して計算すること。結果は10件表示させれば良い。

In [75]:
df_receipt_tmp1 = df_receipt.filter(F.col("customer_id").rlike('^(?!Z).*'))\
                            .groupBy("customer_id")\
                            .agg(F.sum(F.col("amount")).alias("sum_amount"))
df_receipt_tmp1.withColumn("log_amount", F.log10("sum_amount")).show(10)

+--------------+----------+------------------+
|   customer_id|sum_amount|        log_amount|
+--------------+----------+------------------+
|CS024415000195|      4638|3.6663307443019684|
|CS011515000044|      2533| 3.403635189790548|
|CS017415000245|      1289| 3.110252917353403|
|CS011512000113|       686|2.8363241157067516|
|CS025414000093|      3929| 3.594282028811806|
|CS010605000007|      6420| 3.807535028068853|
|CS011415000097|      3462| 3.539327063539375|
|CS018515000090|      3756|3.5747255835940734|
|CS015415000222|     11472| 4.059639138323725|
|CS038605000003|      2833| 3.452246574520437|
+--------------+----------+------------------+
only showing top 10 rows



---
> P-062: レシート明細データフレーム（df_receipt）の売上金額（amount）を顧客ID（customer_id）ごとに合計し、合計した売上金額を自然対数化(底=e）して顧客ID、売上金額合計とともに表示せよ。ただし、顧客IDが"Z"から始まるのものは非会員を表すため、除外して計算すること。結果は10件表示させれば良い。

In [76]:
df_receipt_tmp1 = df_receipt.filter(F.col("customer_id").rlike('^(?!Z).*'))\
                            .groupBy("customer_id")\
                            .agg(F.sum(F.col("amount")).alias("sum_amount"))
df_receipt_tmp1.withColumn("log_amount", F.log("sum_amount")).show(10)

+--------------+----------+-----------------+
|   customer_id|sum_amount|       log_amount|
+--------------+----------+-----------------+
|CS024415000195|      4638|8.442038517815478|
|CS011515000044|      2533|7.837159650001675|
|CS017415000245|      1289|7.161622002939187|
|CS011512000113|       686|6.530877627725885|
|CS025414000093|      3929| 8.27614021955846|
|CS010605000007|      6420|8.767173396684006|
|CS011415000097|      3462|8.149601735736155|
|CS018515000090|      3756|8.231109840328154|
|CS015415000222|     11472|9.347664562839402|
|CS038605000003|      2833|7.949091499830517|
+--------------+----------+-----------------+
only showing top 10 rows



---
> P-063: 商品データフレーム（df_product）の単価（unit_price）と原価（unit_cost）から、各商品の利益額を算出せよ。結果は10件表示させれば良い。

In [77]:
df_product.withColumn("unit_profit", F.col("unit_price") - F.col("unit_cost")).show(10)

+----------+-----------------+------------------+-----------------+----------+---------+-----------+
|product_cd|category_major_cd|category_medium_cd|category_small_cd|unit_price|unit_cost|unit_profit|
+----------+-----------------+------------------+-----------------+----------+---------+-----------+
|P040101001|                4|               401|            40101|       198|      149|         49|
|P040101002|                4|               401|            40101|       218|      164|         54|
|P040101003|                4|               401|            40101|       230|      173|         57|
|P040101004|                4|               401|            40101|       248|      186|         62|
|P040101005|                4|               401|            40101|       268|      201|         67|
|P040101006|                4|               401|            40101|       298|      224|         74|
|P040101007|                4|               401|            40101|       338|      254|   

---
> P-064: 商品データフレーム（df_product）の単価（unit_price）と原価（unit_cost）から、各商品の利益率の全体平均を算出せよ。
ただし、単価と原価にはNULLが存在することに注意せよ。

In [78]:
df_product.withColumn("unit_profit_rate", (F.col("unit_price") - F.col("unit_cost")) / F.col("unit_price"))\
          .agg(F.mean(F.col("unit_profit_rate"))).show()

+---------------------+
|avg(unit_profit_rate)|
+---------------------+
|  0.24911389885176904|
+---------------------+



---
> P-065: 商品データフレーム（df_product）の各商品について、利益率が30%となる新たな単価を求めよ。ただし、1円未満は切り捨てること。そして結果を10件表示させ、利益率がおよそ30％付近であることを確認せよ。ただし、単価（unit_price）と原価（unit_cost）にはNULLが存在することに注意せよ。

In [79]:
df_product.withColumn("new_price", F.floor(F.col("unit_cost") / 0.7))\
          .withColumn("new_profit_rate", (F.col("new_price") - F.col("unit_cost")) / F.col("new_price")).show(10)

+----------+-----------------+------------------+-----------------+----------+---------+---------+-------------------+
|product_cd|category_major_cd|category_medium_cd|category_small_cd|unit_price|unit_cost|new_price|    new_profit_rate|
+----------+-----------------+------------------+-----------------+----------+---------+---------+-------------------+
|P040101001|                4|               401|            40101|       198|      149|      212| 0.2971698113207547|
|P040101002|                4|               401|            40101|       218|      164|      234|0.29914529914529914|
|P040101003|                4|               401|            40101|       230|      173|      247|0.29959514170040485|
|P040101004|                4|               401|            40101|       248|      186|      265| 0.2981132075471698|
|P040101005|                4|               401|            40101|       268|      201|      287|0.29965156794425085|
|P040101006|                4|               401

---
> P-066: 商品データフレーム（df_product）の各商品について、利益率が30%となる新たな単価を求めよ。今回は、1円未満を四捨五入すること（0.5については偶数方向の丸めで良い）。そして結果を10件表示させ、利益率がおよそ30％付近であることを確認せよ。ただし、単価（unit_price）と原価（unit_cost）にはNULLが存在することに注意せよ。

In [80]:
df_product.withColumn("new_price", F.round(F.col("unit_cost") / 0.7))\
          .withColumn("new_profit_rate", (F.col("new_price") - F.col("unit_cost")) / F.col("new_price")).show(10)

+----------+-----------------+------------------+-----------------+----------+---------+---------+-------------------+
|product_cd|category_major_cd|category_medium_cd|category_small_cd|unit_price|unit_cost|new_price|    new_profit_rate|
+----------+-----------------+------------------+-----------------+----------+---------+---------+-------------------+
|P040101001|                4|               401|            40101|       198|      149|    213.0| 0.3004694835680751|
|P040101002|                4|               401|            40101|       218|      164|    234.0|0.29914529914529914|
|P040101003|                4|               401|            40101|       230|      173|    247.0|0.29959514170040485|
|P040101004|                4|               401|            40101|       248|      186|    266.0| 0.3007518796992481|
|P040101005|                4|               401|            40101|       268|      201|    287.0|0.29965156794425085|
|P040101006|                4|               401

---
> P-067: 商品データフレーム（df_product）の各商品について、利益率が30%となる新たな単価を求めよ。今回は、1円未満を切り上げること。そして結果を10件表示させ、利益率がおよそ30％付近であることを確認せよ。ただし、単価（unit_price）と原価（unit_cost）にはNULLが存在することに注意せよ。

In [81]:
df_product.withColumn("new_price", F.ceil(F.col("unit_cost") / 0.7))\
          .withColumn("new_profit_rate", (F.col("new_price") - F.col("unit_cost")) / F.col("new_price")).show(10)

+----------+-----------------+------------------+-----------------+----------+---------+---------+-------------------+
|product_cd|category_major_cd|category_medium_cd|category_small_cd|unit_price|unit_cost|new_price|    new_profit_rate|
+----------+-----------------+------------------+-----------------+----------+---------+---------+-------------------+
|P040101001|                4|               401|            40101|       198|      149|      213| 0.3004694835680751|
|P040101002|                4|               401|            40101|       218|      164|      235| 0.3021276595744681|
|P040101003|                4|               401|            40101|       230|      173|      248| 0.3024193548387097|
|P040101004|                4|               401|            40101|       248|      186|      266| 0.3007518796992481|
|P040101005|                4|               401|            40101|       268|      201|      288| 0.3020833333333333|
|P040101006|                4|               401

---
> P-068: 商品データフレーム（df_product）の各商品について、消費税率10%の税込み金額を求めよ。 1円未満の端数は切り捨てとし、結果は10件表示すれば良い。ただし、単価（unit_price）にはNULLが存在することに注意せよ。

In [82]:
df_product.withColumn("tax_price", F.floor(F.col("unit_price") * 1.1)).show(10)

+----------+-----------------+------------------+-----------------+----------+---------+---------+
|product_cd|category_major_cd|category_medium_cd|category_small_cd|unit_price|unit_cost|tax_price|
+----------+-----------------+------------------+-----------------+----------+---------+---------+
|P040101001|                4|               401|            40101|       198|      149|      217|
|P040101002|                4|               401|            40101|       218|      164|      239|
|P040101003|                4|               401|            40101|       230|      173|      253|
|P040101004|                4|               401|            40101|       248|      186|      272|
|P040101005|                4|               401|            40101|       268|      201|      294|
|P040101006|                4|               401|            40101|       298|      224|      327|
|P040101007|                4|               401|            40101|       338|      254|      371|
|P04010100

---
> P-069: レシート明細データフレーム（df_receipt）と商品データフレーム（df_product）を結合し、顧客毎に全商品の売上金額合計と、カテゴリ大区分（category_major_cd）が"07"（瓶詰缶詰）の売上金額合計を計算の上、両者の比率を求めよ。抽出対象はカテゴリ大区分"07"（瓶詰缶詰）の購入実績がある顧客のみとし、結果は10件表示させればよい。

In [83]:
df_tmp1 = df_receipt.groupby("customer_id").agg(F.sum("amount").alias("sum_amount"))
df_tmp2 = df_receipt.join(df_product, "product_cd", "inner").filter(F.col("category_major_cd")=="07").groupby("customer_id").agg(F.sum("amount").alias("sum_amount_07"))
df_tmp3 = df_tmp1.join(df_tmp2, "customer_id", "inner")
df_tmp3.withColumn("rate", F.col("sum_amount_07") / F.col("sum_amount")).show(10)

+--------------+----------+-------------+-------------------+
|   customer_id|sum_amount|sum_amount_07|               rate|
+--------------+----------+-------------+-------------------+
|CS017415000245|      1289|          607| 0.4709076803723817|
|CS025414000093|      3929|         1010| 0.2570628658691779|
|CS024415000195|      4638|         1431|0.30853816300129366|
|CS018515000090|      3756|         1594|0.42438764643237487|
|CS038605000003|      2833|         1673| 0.5905400635368867|
|CS040215000044|      1439|         1271| 0.8832522585128562|
|CS049513000008|      4450|         3305| 0.7426966292134831|
|CS017514000103|       348|           50|0.14367816091954022|
|CS028515000242|      1525|          438| 0.2872131147540984|
|CS038415000161|      2512|          680|0.27070063694267515|
+--------------+----------+-------------+-------------------+
only showing top 10 rows



---
> P-070: レシート明細データフレーム（df_receipt）の売上日（sales_ymd）に対し、顧客データフレーム（df_customer）の会員申込日（application_date）からの経過日数を計算し、顧客ID（customer_id）、売上日、会員申込日とともに表示せよ。結果は10件表示させれば良い（なお、sales_ymdは数値、application_dateは文字列でデータを保持している点に注意）。

In [84]:
df_tmp = df_receipt.select(["customer_id", "sales_ymd"]).drop_duplicates()

df_tmp.join(df_customer, "customer_id", "inner")\
      .withColumn("sales_ymd", F.col('sales_ymd').cast(StringType()))\
      .withColumn("sales_ymd", F.to_date(F.unix_timestamp(F.col("sales_ymd"), "yyyyMMdd").cast("timestamp")))\
      .withColumn("application_date", F.col('application_date').cast(StringType()))\
      .withColumn("application_date", F.to_date(F.unix_timestamp(F.col("application_date"), "yyyyMMdd").cast("timestamp")))\
      .withColumn("elapsed_days", F.datediff(F.col("sales_ymd"), F.col("application_date")))\
      .select(["customer_id", "sales_ymd", "application_date", "elapsed_days"]).show(10)

+--------------+----------+----------------+------------+
|   customer_id| sales_ymd|application_date|elapsed_days|
+--------------+----------+----------------+------------+
|CS023415000033|2017-06-19|      2015-10-07|         621|
|CS007515000170|2018-09-04|      2015-10-20|        1050|
|CS006415000221|2019-04-13|      2015-04-21|        1453|
|CS011415000107|2017-02-27|      2015-08-30|         547|
|CS025415000019|2017-10-09|      2015-06-22|         840|
|CS021515000069|2018-03-27|      2015-05-01|        1061|
|CS033411000030|2017-07-10|      2015-08-25|         685|
|CS016515000156|2018-08-08|      2015-02-15|        1270|
|CS032515000080|2017-08-03|      2015-09-26|         677|
|CS009415000112|2018-03-21|      2015-08-06|         958|
+--------------+----------+----------------+------------+
only showing top 10 rows



---
> P-071: レシート明細データフレーム（df_receipt）の売上日（sales_ymd）に対し、顧客データフレーム（df_customer）の会員申込日（application_date）からの経過月数を計算し、顧客ID（customer_id）、売上日、会員申込日とともに表示せよ。結果は10件表示させれば良い（なお、sales_ymdは数値、application_dateは文字列でデータを保持している点に注意）。1ヶ月未満は切り捨てること。

In [85]:
df_tmp = df_receipt.select(["customer_id", "sales_ymd"]).drop_duplicates()

df_tmp.join(df_customer, "customer_id", "inner")\
      .withColumn("sales_ymd", F.col('sales_ymd').cast(StringType()))\
      .withColumn("sales_ymd", F.to_date(F.unix_timestamp(F.col("sales_ymd"), "yyyyMMdd").cast("timestamp")))\
      .withColumn("application_date", F.col('application_date').cast(StringType()))\
      .withColumn("application_date", F.to_date(F.unix_timestamp(F.col("application_date"), "yyyyMMdd").cast("timestamp")))\
      .withColumn("elapsed_months", F.floor(F.months_between(F.col("sales_ymd"), F.col("application_date"))))\
      .select(["customer_id", "sales_ymd", "application_date", "elapsed_months"]).show(10)

+--------------+----------+----------------+--------------+
|   customer_id| sales_ymd|application_date|elapsed_months|
+--------------+----------+----------------+--------------+
|CS023415000033|2017-06-19|      2015-10-07|            20|
|CS007515000170|2018-09-04|      2015-10-20|            34|
|CS006415000221|2019-04-13|      2015-04-21|            47|
|CS011415000107|2017-02-27|      2015-08-30|            17|
|CS025415000019|2017-10-09|      2015-06-22|            27|
|CS021515000069|2018-03-27|      2015-05-01|            34|
|CS033411000030|2017-07-10|      2015-08-25|            22|
|CS016515000156|2018-08-08|      2015-02-15|            41|
|CS032515000080|2017-08-03|      2015-09-26|            22|
|CS009415000112|2018-03-21|      2015-08-06|            31|
+--------------+----------+----------------+--------------+
only showing top 10 rows



---
> P-072: レシート明細データフレーム（df_receipt）の売上日（sales_ymd）に対し、顧客データフレーム（df_customer）の会員申込日（application_date）からの経過年数を計算し、顧客ID（customer_id）、売上日、会員申込日とともに表示せよ。結果は10件表示させれば良い。（なお、sales_ymdは数値、application_dateは文字列でデータを保持している点に注意）。1年未満は切り捨てること。

In [86]:
df_tmp = df_receipt.select(["customer_id", "sales_ymd"]).drop_duplicates()

df_tmp.join(df_customer, "customer_id", "inner")\
      .withColumn("sales_ymd", F.col('sales_ymd').cast(StringType()))\
      .withColumn("sales_ymd", F.to_date(F.unix_timestamp(F.col("sales_ymd"), "yyyyMMdd").cast("timestamp")))\
      .withColumn("application_date", F.col('application_date').cast(StringType()))\
      .withColumn("application_date", F.to_date(F.unix_timestamp(F.col("application_date"), "yyyyMMdd").cast("timestamp")))\
      .withColumn("elapsed_years", F.floor(F.datediff(F.col("sales_ymd"), F.col("application_date")) / 365))\
      .select(["customer_id", "sales_ymd", "application_date", "elapsed_years"]).show(10)

+--------------+----------+----------------+-------------+
|   customer_id| sales_ymd|application_date|elapsed_years|
+--------------+----------+----------------+-------------+
|CS023415000033|2017-06-19|      2015-10-07|            1|
|CS007515000170|2018-09-04|      2015-10-20|            2|
|CS006415000221|2019-04-13|      2015-04-21|            3|
|CS011415000107|2017-02-27|      2015-08-30|            1|
|CS025415000019|2017-10-09|      2015-06-22|            2|
|CS021515000069|2018-03-27|      2015-05-01|            2|
|CS033411000030|2017-07-10|      2015-08-25|            1|
|CS016515000156|2018-08-08|      2015-02-15|            3|
|CS032515000080|2017-08-03|      2015-09-26|            1|
|CS009415000112|2018-03-21|      2015-08-06|            2|
+--------------+----------+----------------+-------------+
only showing top 10 rows



---
> P-073: レシート明細データフレーム（df_receipt）の売上日（sales_ymd）に対し、顧客データフレーム（df_customer）の会員申込日（application_date）からのエポック秒による経過時間を計算し、顧客ID（customer_id）、売上日、会員申込日とともに表示せよ。結果は10件表示させれば良い（なお、sales_ymdは数値、application_dateは文字列でデータを保持している点に注意）。なお、時間情報は保有していないため各日付は0時0分0秒を表すものとする。

In [87]:
df_tmp = df_receipt.select(["customer_id", "sales_ymd"]).drop_duplicates()

df_tmp.join(df_customer, "customer_id", "inner")\
      .withColumn("sales_ymd", F.col('sales_ymd').cast(StringType()))\
      .withColumn("sales_ymd", F.unix_timestamp(F.col("sales_ymd"), "yyyyMMdd"))\
      .withColumn("application_date", F.col('application_date').cast(StringType()))\
      .withColumn("application_date", F.unix_timestamp(F.col("application_date"), "yyyyMMdd"))\
      .withColumn("elapsed_times", F.col("sales_ymd") - F.col("application_date"))\
      .select(["customer_id", "sales_ymd", "application_date", "elapsed_times"]).show(10)

+--------------+----------+----------------+-------------+
|   customer_id| sales_ymd|application_date|elapsed_times|
+--------------+----------+----------------+-------------+
|CS023415000033|1497830400|      1444176000|     53654400|
|CS007515000170|1536019200|      1445299200|     90720000|
|CS006415000221|1555113600|      1429574400|    125539200|
|CS011415000107|1488153600|      1440892800|     47260800|
|CS025415000019|1507507200|      1434931200|     72576000|
|CS021515000069|1522108800|      1430438400|     91670400|
|CS033411000030|1499644800|      1440460800|     59184000|
|CS016515000156|1533686400|      1423958400|    109728000|
|CS032515000080|1501718400|      1443225600|     58492800|
|CS009415000112|1521590400|      1438819200|     82771200|
+--------------+----------+----------------+-------------+
only showing top 10 rows



---
> P-074: レシート明細データフレーム（df_receipt）の売上日（sales_ymd）に対し、当該週の月曜日からの経過日数を計算し、売上日、当該週の月曜日付とともに表示せよ。結果は10件表示させれば良い（なお、sales_ymdは数値でデータを保持している点に注意）。

In [88]:
df_tmp = df_receipt.select(["customer_id", "sales_ymd"]).drop_duplicates()

df_tmp.join(df_customer, "customer_id", "inner")\
      .withColumn("sales_ymd", F.col('sales_ymd').cast(StringType()))\
      .withColumn("sales_ymd", F.to_date(F.unix_timestamp(F.col("sales_ymd"), "yyyyMMdd").cast("timestamp")))\
      .withColumn("weekday", F.date_format("sales_ymd", "EEEE"))\
      .withColumn("weekday_number", F.expr('weekday(sales_ymd)'))\
      .select(["customer_id", "sales_ymd", "weekday", "weekday_number"]).show(10)

+--------------+----------+---------+--------------+
|   customer_id| sales_ymd|  weekday|weekday_number|
+--------------+----------+---------+--------------+
|CS023415000033|2017-06-19|   Monday|             0|
|CS007515000170|2018-09-04|  Tuesday|             1|
|CS006415000221|2019-04-13| Saturday|             5|
|CS011415000107|2017-02-27|   Monday|             0|
|CS025415000019|2017-10-09|   Monday|             0|
|CS021515000069|2018-03-27|  Tuesday|             1|
|CS033411000030|2017-07-10|   Monday|             0|
|CS016515000156|2018-08-08|Wednesday|             2|
|CS032515000080|2017-08-03| Thursday|             3|
|CS009415000112|2018-03-21|Wednesday|             2|
+--------------+----------+---------+--------------+
only showing top 10 rows



---
> P-075: 顧客データフレーム（df_customer）からランダムに1%のデータを抽出し、先頭から10件データを抽出せよ。

In [89]:
df_customer.sample(0.01).head(10)

[Row(customer_id='CS001715000206', customer_name='細谷 たまき', gender_cd=1, gender='女性', birth_day=datetime.datetime(1944, 11, 14, 0, 0), age=74, postal_cd='144-0046', address='東京都大田区東六郷**********', application_store_cd='S13001', application_date=20170822, status_cd='0-00000000-0'),
 Row(customer_id='CS026515000132', customer_name='江崎 くるみ', gender_cd=1, gender='女性', birth_day=datetime.datetime(1964, 10, 14, 0, 0), age=54, postal_cd='251-0045', address='神奈川県藤沢市辻堂東海岸**********', application_store_cd='S14026', application_date=20150527, status_cd='D-20100306-D'),
 Row(customer_id='CS002503000087', customer_name='松居 ケンイチ', gender_cd=0, gender='男性', birth_day=datetime.datetime(1960, 5, 9, 0, 0), age=58, postal_cd='187-0021', address='東京都小平市上水南町**********', application_store_cd='S13002', application_date=20160613, status_cd='0-00000000-0'),
 Row(customer_id='CS011415000005', customer_name='米谷 璃奈子', gender_cd=1, gender='女性', birth_day=datetime.datetime(1969, 10, 18, 0, 0), age=49, postal_cd='223-

---
> P-076: 顧客データフレーム（df_customer）から性別（gender_cd）の割合に基づきランダムに10%のデータを層化抽出データし、性別ごとに件数を集計せよ。

In [90]:
df_customer.groupBy("gender_cd").count().show()

+---------+-----+
|gender_cd|count|
+---------+-----+
|        1|17918|
|        9| 1072|
|        0| 2981|
+---------+-----+



In [91]:
df_customer.sampleBy("gender_cd", fractions={0: 0.1, 1: 0.1, 9:0.1}).groupBy("gender_cd").count().show()

+---------+-----+
|gender_cd|count|
+---------+-----+
|        1| 1811|
|        9|  105|
|        0|  275|
+---------+-----+



---
> P-077: レシート明細データフレーム（df_receipt）の売上金額（amount）を顧客単位に合計し、合計した売上金額の外れ値を抽出せよ。ただし、顧客IDが"Z"から始まるのものは非会員を表すため、除外して計算すること。なお、ここでは外れ値を平均から3σ以上離れたものとする。結果は10件表示させれば良い。

In [92]:
df_receipt_tmp1 = df_receipt.filter(F.col("customer_id").rlike('^(?!Z).*'))\
                            .groupBy("customer_id")\
                            .agg(F.sum(F.col("amount")).alias("sum_amount"))
stddev_pop_sum_amount = df_receipt_tmp1.select(F.stddev_pop('sum_amount')).first()[0]
mean_sum_amount = df_receipt_tmp1.select(F.mean('sum_amount')).first()[0]
df_receipt_tmp1.where(F.abs(F.col("sum_amount") - mean_sum_amount) > (stddev_pop_sum_amount * 3)).show(10)

+--------------+----------+
|   customer_id|sum_amount|
+--------------+----------+
|CS015415000222|     11472|
|CS038415000071|     12696|
|CS026515000201|     11144|
|CS006515000023|     18372|
|CS013415000176|     12383|
|CS010415000121|     11761|
|CS015515000034|     15300|
|CS032414000072|     16563|
|CS008414000053|     12276|
|CS009415000214|     13742|
+--------------+----------+
only showing top 10 rows



---
> P-078: レシート明細データフレーム（df_receipt）の売上金額（amount）を顧客単位に合計し、合計した売上金額の外れ値を抽出せよ。ただし、顧客IDが"Z"から始まるのものは非会員を表すため、除外して計算すること。なお、ここでは外れ値を第一四分位と第三四分位の差であるIQRを用いて、「第一四分位数-1.5×IQR」よりも下回るもの、または「第三四分位数+1.5×IQR」を超えるものとする。結果は10件表示させれば良い。

In [93]:
df_receipt_tmp1 = df_receipt.filter(F.col("customer_id").rlike('^(?!Z).*'))\
                            .groupBy("customer_id")\
                            .agg(F.sum(F.col("amount")).alias("sum_amount"))

In [94]:
perc25 = df_receipt_tmp.select(F.percentile_approx("sum_amount", 0.25)).collect()[0][0]
perc75 = df_receipt_tmp.select(F.percentile_approx("sum_amount", 0.75)).collect()[0][0]

iqr = perc75 - perc25
amount_low = perc25 - (iqr * 1.5)
amount_hight = perc75 + (iqr * 1.5)

In [95]:
df_receipt_tmp1.filter((F.col("sum_amount") < amount_low) | (F.col("sum_amount") > amount_hight)).show(10)

+--------------+----------+
|   customer_id|sum_amount|
+--------------+----------+
|CS015415000222|     11472|
|CS038415000071|     12696|
|CS021514000045|      9741|
|CS026515000201|     11144|
|CS006515000023|     18372|
|CS013214000003|      8589|
|CS034415000196|      8656|
|CS023414000057|     10075|
|CS031414000048|     10538|
|CS018205000001|      8739|
+--------------+----------+
only showing top 10 rows



---
> P-079: 商品データフレーム（df_product）の各項目に対し、欠損数を確認せよ。

In [96]:
df_product.select([F.count(F.when(F.isnull(F.col(c)), c)).alias(c) for c in df_product.columns]).show()

+----------+-----------------+------------------+-----------------+----------+---------+
|product_cd|category_major_cd|category_medium_cd|category_small_cd|unit_price|unit_cost|
+----------+-----------------+------------------+-----------------+----------+---------+
|         0|                0|                 0|                0|         7|        7|
+----------+-----------------+------------------+-----------------+----------+---------+



---
> P-080: 商品データフレーム（df_product）のいずれかの項目に欠損が発生しているレコードを全て削除した新たなdf_product_1を作成せよ。なお、削除前後の件数を表示させ、前設問で確認した件数だけ減少していることも確認すること。

In [97]:
df_product_1 = df_product.dropna()
print('削除前:', df_product.count())
print('削除後:', df_product_1.count())

削除前: 10030
削除後: 10023


---
> P-081: 単価（unit_price）と原価（unit_cost）の欠損値について、それぞれの平均値で補完した新たなdf_product_2を作成せよ。なお、平均値について1円未満は四捨五入とし、0.5については偶数寄せでかまわない。補完実施後、各項目について欠損が生じていないことも確認すること。

In [98]:
mean_unit_price = df_product.select(F.round(F.mean("unit_price"))).first()[0]
mean_unit_cost = df_product.select(F.round(F.mean("unit_cost"))).first()[0]
fill_dict = {"unit_price":mean_unit_price, "unit_cost":mean_unit_cost}
df_product_2 = df_product.fillna(fill_dict)
df_product_2.select([F.count(F.when(F.isnull(F.col(c)), c)).alias(c) for c in df_product_2.columns]).show()

+----------+-----------------+------------------+-----------------+----------+---------+
|product_cd|category_major_cd|category_medium_cd|category_small_cd|unit_price|unit_cost|
+----------+-----------------+------------------+-----------------+----------+---------+
|         0|                0|                 0|                0|         0|        0|
+----------+-----------------+------------------+-----------------+----------+---------+



---
> P-082: 単価（unit_price）と原価（unit_cost）の欠損値について、それぞれの中央値で補完した新たなdf_product_3を作成せよ。なお、中央値について1円未満は四捨五入とし、0.5については偶数寄せでかまわない。補完実施後、各項目について欠損が生じていないことも確認すること。

In [99]:
import numpy as np

@F.udf(DoubleType())
def udf_median(values_list):
    med = np.median(values_list)
    return float(med)

In [100]:
median_unit_price = df_product.select(F.round(udf_median(F.collect_list('unit_price'))).alias('median_unit_price')).first()[0]
median_unit_cost = df_product.select(F.round(udf_median(F.collect_list('unit_cost'))).alias('median_unit_cost')).first()[0]
fill_dict = {"unit_price":median_unit_price, "unit_cost":median_unit_cost}
df_product_2 = df_product.fillna(fill_dict)
df_product_2.select([F.count(F.when(F.isnull(F.col(c)), c)).alias(c) for c in df_product_2.columns]).show()

+----------+-----------------+------------------+-----------------+----------+---------+
|product_cd|category_major_cd|category_medium_cd|category_small_cd|unit_price|unit_cost|
+----------+-----------------+------------------+-----------------+----------+---------+
|         0|                0|                 0|                0|         0|        0|
+----------+-----------------+------------------+-----------------+----------+---------+



---
> P-083: 単価（unit_price）と原価（unit_cost）の欠損値について、各商品の小区分（category_small_cd）ごとに算出した中央値で補完した新たなdf_product_4を作成せよ。なお、中央値について1円未満は四捨五入とし、0.5については偶数寄せでかまわない。補完実施後、各項目について欠損が生じていないことも確認すること。

In [101]:
df_tmp = df_product.groupBy("category_small_cd").agg(F.round(udf_median(F.collect_list("unit_price"))).alias("median_unit_price"),\
                                                     F.round(udf_median(F.collect_list("unit_cost"))).alias("median_unit_cost"))
df_product_4 = df_product.join(df_tmp, "category_small_cd", "inner")
df_product_4 = df_product_4.withColumn("unit_price", F.when(F.isnull("unit_price"), F.col("median_unit_price")).otherwise(F.col("unit_price")))\
                           .withColumn("unit_cost", F.when(F.isnull("unit_cost"), F.col("median_unit_cost")).otherwise(F.col("unit_cost")))
df_product_4.select([F.count(F.when(F.isnull(F.col(c)), c)).alias(c) for c in df_product_4.columns]).show()

+-----------------+----------+-----------------+------------------+----------+---------+-----------------+----------------+
|category_small_cd|product_cd|category_major_cd|category_medium_cd|unit_price|unit_cost|median_unit_price|median_unit_cost|
+-----------------+----------+-----------------+------------------+----------+---------+-----------------+----------------+
|                0|         0|                0|                 0|         0|        0|                0|               0|
+-----------------+----------+-----------------+------------------+----------+---------+-----------------+----------------+



---
> P-084: 顧客データフレーム（df_customer）の全顧客に対し、全期間の売上金額に占める2019年売上金額の割合を計算せよ。ただし、販売実績のない場合は0として扱うこと。そして計算した割合が0超のものを抽出せよ。 結果は10件表示させれば良い。また、作成したデータにNAやNANが存在しないことを確認せよ。

In [102]:
df_customer.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- gender_cd: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- birth_day: timestamp (nullable = true)
 |-- age: integer (nullable = true)
 |-- postal_cd: string (nullable = true)
 |-- address: string (nullable = true)
 |-- application_store_cd: string (nullable = true)
 |-- application_date: integer (nullable = true)
 |-- status_cd: string (nullable = true)



In [103]:
df_receipt_2019 = df_receipt.filter(("20190101" <= F.col("sales_ymd")) & ("20191231" > F.col("sales_ymd")))\
                            .groupBy("customer_id").agg(F.sum("amount").alias("sum_amount_2019"))
df_receipt_all = df_receipt.groupBy("customer_id").agg(F.sum("amount").alias("sum_amount_all"))
df_sales_rate = df_customer.join(df_receipt_all, "customer_id", "left").join(df_receipt_2019, "customer_id", "left").fillna(0)
df_sales_rate = df_sales_rate.withColumn("sales_rate", F.col("sum_amount_2019") / F.col("sum_amount_all")).fillna(0).filter(F.col("sales_rate") > 0)
df_sales_rate.show(10)

+--------------+-------------+---------+------+-------------------+---+---------+----------------------------------+--------------------+----------------+------------+--------------+---------------+--------------------+
|   customer_id|customer_name|gender_cd|gender|          birth_day|age|postal_cd|                           address|application_store_cd|application_date|   status_cd|sum_amount_all|sum_amount_2019|          sales_rate|
+--------------+-------------+---------+------+-------------------+---+---------+----------------------------------+--------------------+----------------+------------+--------------+---------------+--------------------+
|CS031415000172|宇多田 貴美子|        1|  女性|1976-10-04 00:00:00| 42| 151-0053|      東京都渋谷区代々木**********|              S13031|        20150529|D-20100325-C|          5088|           2971|  0.5839229559748428|
|CS015414000103|    奥野 陽子|        1|  女性|1977-08-09 00:00:00| 41| 136-0073|        東京都江東区北砂**********|              S13015|        201507

In [104]:
df_sales_rate.select([F.count(F.when(F.isnull(F.col(c)), c)).alias(c) for c in df_sales_rate.columns]).show()

+-----------+-------------+---------+------+---------+---+---------+-------+--------------------+----------------+---------+--------------+---------------+----------+
|customer_id|customer_name|gender_cd|gender|birth_day|age|postal_cd|address|application_store_cd|application_date|status_cd|sum_amount_all|sum_amount_2019|sales_rate|
+-----------+-------------+---------+------+---------+---+---------+-------+--------------------+----------------+---------+--------------+---------------+----------+
|          0|            0|        0|     0|        0|  0|        0|      0|                   0|               0|        0|             0|              0|         0|
+-----------+-------------+---------+------+---------+---+---------+-------+--------------------+----------------+---------+--------------+---------------+----------+



---
> P-085: 顧客データフレーム（df_customer）の全顧客に対し、郵便番号（postal_cd）を用いて経度緯度変換用データフレーム（df_geocode）を紐付け、新たなdf_customer_1を作成せよ。ただし、複数紐づく場合は経度（longitude）、緯度（latitude）それぞれ平均を算出すること。


In [105]:
df_geocode_tmp1 = df_geocode.groupby("postal_cd")\
                            .agg(F.mean(F.col("longitude")).alias("mean_longitude"), 
                                 F.mean(F.col("latitude")).alias("mean_latitude"))
df_customer_1 = df_customer.join(df_geocode_tmp1, "postal_cd", "inner")
df_customer_1.show(10)

+---------+--------------+-------------+---------+------+-------------------+---+----------------------------------+--------------------+----------------+------------+--------------+-------------+
|postal_cd|   customer_id|customer_name|gender_cd|gender|          birth_day|age|                           address|application_store_cd|application_date|   status_cd|mean_longitude|mean_latitude|
+---------+--------------+-------------+---------+------+-------------------+---+----------------------------------+--------------------+----------------+------------+--------------+-------------+
| 246-0006|CS040411000041|  永野 メイサ|        1|  女性|1973-03-27 00:00:00| 46|神奈川県横浜市瀬谷区上瀬谷町***...|              S14040|        20150522|1-20081219-4|     139.47385|     35.49305|
| 246-0006|CS040611000018|  ほしの 佳乃|        1|  女性|1950-04-16 00:00:00| 68|神奈川県横浜市瀬谷区上瀬谷町***...|              S14040|        20150403|0-00000000-0|     139.47385|     35.49305|
| 246-0006|CS028302000032|    永島 達士|        0|  男性|1987-0

---
> P-086: 前設問で作成した緯度経度つき顧客データフレーム（df_customer_1）に対し、申込み店舗コード（application_store_cd）をキーに店舗データフレーム（df_store）と結合せよ。そして申込み店舗の緯度（latitude）・経度情報（longitude)と顧客の緯度・経度を用いて距離（km）を求め、顧客ID（customer_id）、顧客住所（address）、店舗住所（address）とともに表示せよ。計算式は簡易式で良いものとするが、その他精度の高い方式を利用したライブラリを利用してもかまわない。結果は10件表示すれば良い。

$$
緯度（ラジアン）：\phi \\
経度（ラジアン）：\lambda \\
距離L = 6371 * arccos(sin \phi_1 * sin \phi_2
+ cos \phi_1 * cos \phi_2 * cos(\lambda_1 − \lambda_2))
$$

In [106]:
import numpy as np

@F.udf(DoubleType())
def udf_calc_distance(x1, y1, x2, y2):
    x1_r = np.radians(x1)
    x2_r = np.radians(x2)
    y1_r = np.radians(y1)
    y2_r = np.radians(y2)
    return float(6371 * np.arccos(np.sin(x1_r) * np.sin(x2_r) 
                            + np.cos(x1_r) * np.cos(x2_r) 
                            * np.cos(y1_r - y2_r)))

df_customer_1 = df_customer_1.withColumnRenamed("address", "customer_address")
df_store = df_store.withColumnRenamed("address", "store_address")

df_tmp = df_customer_1.join(df_store, df_customer_1.application_store_cd==df_store.store_cd, "inner")
df_tmp.withColumn("distance", udf_calc_distance("mean_latitude", "mean_longitude", "latitude", "longitude"))\
      .select(["customer_id", "customer_address", "store_address", "distance"])\
      .show(10)

+--------------+----------------------------------+--------------------------------------+------------------+
|   customer_id|                  customer_address|                         store_address|          distance|
+--------------+----------------------------------+--------------------------------------+------------------+
|CS040411000041|神奈川県横浜市瀬谷区上瀬谷町***...|神奈川県横浜市緑区長津田みなみ台五丁目| 4.144514368949817|
|CS040611000018|神奈川県横浜市瀬谷区上瀬谷町***...|神奈川県横浜市緑区長津田みなみ台五丁目| 4.144514368949817|
|CS028302000032|神奈川県横浜市瀬谷区上瀬谷町***...|          神奈川県横浜市瀬谷区二ツ橋町| 3.907402208950033|
|CS028712000003|神奈川県横浜市瀬谷区上瀬谷町***...|          神奈川県横浜市瀬谷区二ツ橋町| 3.907402208950033|
|CS032311000014|      東京都大田区昭和島**********|              東京都大田区仲六郷三丁目|  4.16276122876881|
|CS044212000002|      東京都大田区昭和島**********|              東京都大田区南六郷二丁目|3.6929367899442016|
|CS001112000023|      東京都大田区昭和島**********|              東京都大田区仲六郷二丁目| 3.783014514861316|
|CS001112000019|      東京都大田区昭和島**********|              東京都大田区仲六郷二丁目| 3.7830145148613

---
> P-087:  顧客データフレーム（df_customer）では、異なる店舗での申込みなどにより同一顧客が複数登録されている。名前（customer_name）と郵便番号（postal_cd）が同じ顧客は同一顧客とみなし、1顧客1レコードとなるように名寄せした名寄顧客データフレーム（df_customer_u）を作成せよ。ただし、同一顧客に対しては売上金額合計が最も高いものを残すものとし、売上金額合計が同一もしくは売上実績の無い顧客については顧客ID（customer_id）の番号が小さいものを残すこととする。

In [107]:
df_receipt_tmp = df_receipt.groupBy("customer_id").agg(F.sum("amount").alias("sum_amount"))
df_customer_u = df_customer.join(df_receipt_tmp, "customer_id", "left").fillna({"sum_amount":0})

df_customer_u = df_customer_u.orderBy("sum_amount", ascending = False)\
                             .coalesce(1).dropDuplicates(subset = ["customer_name", "postal_cd"])\
                             .select(["customer_name", "customer_id", "postal_cd", "sum_amount"])\
                             .withColumnRenamed("customer_id", "integration_id")

print('df_customer_cnt:', df_customer.count(),
      'df_customer_u_cnt:', df_customer_u.count(),
      'diff:', df_customer.count() - df_customer_u.count())

df_customer_cnt: 21971 df_customer_u_cnt: 21941 diff: 30


---
> P-088: 前設問で作成したデータを元に、顧客データフレームに統合名寄IDを付与したデータフレーム（df_customer_n）を作成せよ。ただし、統合名寄IDは以下の仕様で付与するものとする。
>
> - 重複していない顧客：顧客ID（customer_id）を設定
> - 重複している顧客：前設問で抽出したレコードの顧客IDを設定

In [108]:
df_customer_n = df_customer.join(df_customer_u, ["customer_name", "postal_cd"], "inner")
df_customer_n.select("customer_id").distinct().count() - df_customer_n.select("integration_id").distinct().count()

30

---
> P-閑話: df_customer_1, df_customer_nは使わないので削除する。

In [109]:
import gc
del df_customer_1, df_customer_n
gc.collect()

200

---
> P-089: 売上実績のある顧客に対し、予測モデル構築のため学習用データとテスト用データに分割したい。それぞれ8:2の割合でランダムにデータを分割せよ。

In [110]:
df_receipt_tmp = df_receipt.groupBy("customer_id").agg(F.sum("amount").alias("sum_amount")).filter(F.col("sum_amount") > 0)
df_sales = df_receipt_tmp.join(df_customer, "customer_id", "inner")
df_train, df_test = df_sales.randomSplit([8.0, 2.0])

print(df_train.count() / df_sales.count())
print(df_test.count() / df_sales.count())


0.7995424993980256
0.20045750060197448


---
> P-090: レシート明細データフレーム（df_receipt）は2017年1月1日〜2019年10月31日までのデータを有している。売上金額（amount）を月次で集計し、学習用に12ヶ月、テスト用に6ヶ月のモデル構築用データを3セット作成せよ。

In [111]:
df_receipt_tmp = df_receipt.withColumn("sales_ymd", F.col('sales_ymd').cast(StringType()))\
                           .withColumn("sales_ymd", F.to_date(F.unix_timestamp(F.col("sales_ymd"), "yyyyMMdd").cast("timestamp")))\
                           .withColumn("sales_ym", F.date_format("sales_ymd", "yyyyMM"))\
                           .groupBy("sales_ym").agg(F.sum("amount").alias("sum_amount"))\
                           .orderBy("sales_ym").coalesce(1)

In [112]:
w = Window().orderBy(F.col("sales_ym"))
df_receipt_tmp = df_receipt_tmp.withColumn("row_num", F.row_number().over(w))
point = 1
df_train_1 = df_receipt_tmp.filter((F.col("row_num") <= 12*point) & (F.col("row_num") > 12*(point-1)))
df_test_1 = df_receipt_tmp.filter((F.col("row_num") > 12*point) & (F.col("row_num") <= 12*point+6))
point += 1
df_train_2 = df_receipt_tmp.filter((F.col("row_num") <= 12*point) & (F.col("row_num") > 12*(point-1)))
df_test_2 = df_receipt_tmp.filter((F.col("row_num") > 12*point) & (F.col("row_num") <= 12*point+6))
point += 1
df_train_3 = df_receipt_tmp.filter((F.col("row_num") <= 12*point) & (F.col("row_num") > 12*(point-1)))
df_test_3 = df_receipt_tmp.filter((F.col("row_num") > 12*point) & (F.col("row_num") <= 12*point+6))

In [113]:
df_train_1.show()
df_test_1.show()
df_train_2.show()
df_test_2.show()

+--------+----------+-------+
|sales_ym|sum_amount|row_num|
+--------+----------+-------+
|  201701|    902056|      1|
|  201702|    764413|      2|
|  201703|    962945|      3|
|  201704|    847566|      4|
|  201705|    884010|      5|
|  201706|    894242|      6|
|  201707|    959205|      7|
|  201708|    954836|      8|
|  201709|    902037|      9|
|  201710|    905739|     10|
|  201711|    932157|     11|
|  201712|    939654|     12|
+--------+----------+-------+

+--------+----------+-------+
|sales_ym|sum_amount|row_num|
+--------+----------+-------+
|  201801|    944509|     13|
|  201802|    864128|     14|
|  201803|    946588|     15|
|  201804|    937099|     16|
|  201805|   1004438|     17|
|  201806|   1012329|     18|
+--------+----------+-------+

+--------+----------+-------+
|sales_ym|sum_amount|row_num|
+--------+----------+-------+
|  201801|    944509|     13|
|  201802|    864128|     14|
|  201803|    946588|     15|
|  201804|    937099|     16|
|  20180

---
> P-091: 顧客データフレーム（df_customer）の各顧客に対し、売上実績のある顧客数と売上実績のない顧客数が1:1となるようにアンダーサンプリングで抽出せよ。

In [114]:
df_receipt_tmp = df_receipt.groupBy("customer_id").agg(F.sum("amount").alias("sum_amount"))
df_tmp = df_customer.join(df_receipt_tmp, "customer_id", "left")\
                    .withColumn("buy_flg", F.when(F.col("sum_amount").isNull(), 0).otherwise(1))
df_tmp.groupBy("buy_flg").count().show()

+-------+-----+
|buy_flg|count|
+-------+-----+
|      1| 8306|
|      0|13665|
+-------+-----+



In [115]:
minor = df_tmp.filter(F.col("buy_flg")==1)
major = df_tmp.filter(F.col("buy_flg")==0)
ratio = major.count() / minor.count()
sampled_major = major.sample(False, 1/ratio)
sampling_df = sampled_major.unionAll(minor)
sampling_df.groupBy("buy_flg").count().show()

+-------+-----+
|buy_flg|count|
+-------+-----+
|      0| 8376|
|      1| 8306|
+-------+-----+



---
> P-092: 顧客データフレーム（df_customer）では、性別に関する情報が非正規化の状態で保持されている。これを第三正規化せよ。

In [116]:
df_gender_tmp = df_customer.select(["gender_cd", "gender"]).drop_duplicates()

df_customer_tmp = df_customer.drop(F.col("gender"))

In [117]:
df_gender_tmp.show(3)

+---------+------+
|gender_cd|gender|
+---------+------+
|        9|  不明|
|        0|  男性|
|        1|  女性|
+---------+------+



In [118]:
df_customer_tmp.show(3)

+--------------+-------------+---------+-------------------+---+---------+------------------------------+--------------------+----------------+------------+
|   customer_id|customer_name|gender_cd|          birth_day|age|postal_cd|                       address|application_store_cd|application_date|   status_cd|
+--------------+-------------+---------+-------------------+---+---------+------------------------------+--------------------+----------------+------------+
|CS021313000114|  大野 あや子|        1|1981-04-29 00:00:00| 37| 259-1113|神奈川県伊勢原市粟窪**********|              S14021|        20150905|0-00000000-0|
|CS037613000071|    六角 雅彦|        9|1952-04-01 00:00:00| 66| 136-0076|    東京都江東区南砂**********|              S13037|        20150414|0-00000000-0|
|CS031415000172|宇多田 貴美子|        1|1976-10-04 00:00:00| 42| 151-0053|  東京都渋谷区代々木**********|              S13031|        20150529|D-20100325-C|
+--------------+-------------+---------+-------------------+---+---------+--------------------------

---
> P-093: 商品データフレーム（df_product）では各カテゴリのコード値だけを保有し、カテゴリ名は保有していない。カテゴリデータフレーム（df_category）と組み合わせて非正規化し、カテゴリ名を保有した新たな商品データフレームを作成せよ。

In [119]:
df_product.columns

['product_cd',
 'category_major_cd',
 'category_medium_cd',
 'category_small_cd',
 'unit_price',
 'unit_cost']

In [120]:
df_category.columns

['category_major_cd',
 'category_major_name',
 'category_medium_cd',
 'category_medium_name',
 'category_small_cd',
 'category_small_name']

In [121]:
df_product_tmp = df_product.join(df_category, ["category_major_cd", "category_medium_cd", "category_small_cd"], "inner")
df_product_tmp.show()

+-----------------+------------------+-----------------+----------+----------+---------+-------------------+--------------------+-------------------+
|category_major_cd|category_medium_cd|category_small_cd|product_cd|unit_price|unit_cost|category_major_name|category_medium_name|category_small_name|
+-----------------+------------------+-----------------+----------+----------+---------+-------------------+--------------------+-------------------+
|                4|               401|            40101|P040101001|       198|      149|               惣菜|              御飯類|             弁当類|
|                4|               401|            40101|P040101002|       218|      164|               惣菜|              御飯類|             弁当類|
|                4|               401|            40101|P040101003|       230|      173|               惣菜|              御飯類|             弁当類|
|                4|               401|            40101|P040101004|       248|      186|               惣菜|              御飯類|

---
> P-094: 先に作成したカテゴリ名付き商品データを以下の仕様でファイル出力せよ。なお、出力先のパスはdata配下とする。
>
> - ファイル形式はCSV（カンマ区切り）
> - ヘッダ有り
> - 文字コードはUTF-8

In [122]:
outputpath = 'df_product_tmp_header_true_utf8.csv'
pdf = df_product_tmp.toPandas()
pdf.to_csv(outputpath, header=True, index=False, encoding="utf8")

---
> P-095: 先に作成したカテゴリ名付き商品データを以下の仕様でファイル出力せよ。なお、出力先のパスはdata配下とする。
>
> - ファイル形式はCSV（カンマ区切り）
> - ヘッダ有り
> - 文字コードはCP932

In [123]:
# 書き込み時のファイル数を1つに再パーティション
outputpath = 'df_product_tmp_header_true_cp932.csv'
df_product_tmp.coalesce(1).write.format("csv").mode("overwrite").option("delimiter", ",").option('charset', 'cp932').option("header", "true").save(outputpath)

---
> P-096: 先に作成したカテゴリ名付き商品データを以下の仕様でファイル出力せよ。なお、出力先のパスはdata配下とする。
>
> - ファイル形式はCSV（カンマ区切り）
> - ヘッダ無し
> - 文字コードはUTF-8

In [124]:
outputpath = 'df_product_tmp_header_false_utf8.csv'
df_product_tmp.coalesce(1).write.format("csv").mode("overwrite").option("delimiter", ",").option('charset', 'utf8').save(outputpath)

---
> P-097: 先に作成した以下形式のファイルを読み込み、データフレームを作成せよ。また、先頭10件を表示させ、正しくとりまれていることを確認せよ。
>
> - ファイル形式はCSV（カンマ区切り）
> - ヘッダ有り
> - 文字コードはUTF-8

In [125]:
df_product_tmp_load = spark.read.option("delimiter", ",").option('charset', 'utf8').option("header", "true").csv("df_product_tmp_header_true_utf8.csv")
df_product_tmp_load.show(10)

+-----------------+------------------+-----------------+----------+----------+---------+-------------------+--------------------+-------------------+
|category_major_cd|category_medium_cd|category_small_cd|product_cd|unit_price|unit_cost|category_major_name|category_medium_name|category_small_name|
+-----------------+------------------+-----------------+----------+----------+---------+-------------------+--------------------+-------------------+
|                4|               401|            40101|P040101001|     198.0|    149.0|               惣菜|              御飯類|             弁当類|
|                4|               401|            40101|P040101002|     218.0|    164.0|               惣菜|              御飯類|             弁当類|
|                4|               401|            40101|P040101003|     230.0|    173.0|               惣菜|              御飯類|             弁当類|
|                4|               401|            40101|P040101004|     248.0|    186.0|               惣菜|              御飯類|

---
> P-098: 先に作成した以下形式のファイルを読み込み、データフレームを作成せよ。また、先頭10件を表示させ、正しくとりまれていることを確認せよ。
>
> - ファイル形式はCSV（カンマ区切り）
> - ヘッダ無し
> - 文字コードはUTF-8

In [126]:
df_product_tmp_load = spark.read.option("delimiter", ",").option('charset', 'utf8').csv("df_product_tmp_header_false_utf8.csv")
df_product_tmp_load.show(10)

+---+---+-----+----------+---+---+----+------+------+
|_c0|_c1|  _c2|       _c3|_c4|_c5| _c6|   _c7|   _c8|
+---+---+-----+----------+---+---+----+------+------+
|  4|401|40101|P040101001|198|149|惣菜|御飯類|弁当類|
|  4|401|40101|P040101002|218|164|惣菜|御飯類|弁当類|
|  4|401|40101|P040101003|230|173|惣菜|御飯類|弁当類|
|  4|401|40101|P040101004|248|186|惣菜|御飯類|弁当類|
|  4|401|40101|P040101005|268|201|惣菜|御飯類|弁当類|
|  4|401|40101|P040101006|298|224|惣菜|御飯類|弁当類|
|  4|401|40101|P040101007|338|254|惣菜|御飯類|弁当類|
|  4|401|40101|P040101008|420|315|惣菜|御飯類|弁当類|
|  4|401|40101|P040101009|498|374|惣菜|御飯類|弁当類|
|  4|401|40101|P040101010|580|435|惣菜|御飯類|弁当類|
+---+---+-----+----------+---+---+----+------+------+
only showing top 10 rows



---
> P-099: 先に作成したカテゴリ名付き商品データを以下の仕様でファイル出力せよ。なお、出力先のパスはdata配下とする。
>
> - ファイル形式はTSV（タブ区切り）
> - ヘッダ有り
> - 文字コードはUTF-8

In [127]:
outputpath = 'df_product_tmp_header_true_utf8.tsv'
df_product_tmp.coalesce(1).write.format("csv").mode("overwrite").option("delimiter", "\t").option('charset', 'utf8').option("header", "true").save(outputpath)

---
> P-100: 先に作成した以下形式のファイルを読み込み、データフレームを作成せよ。また、先頭10件を表示させ、正しくとりまれていることを確認せよ。
>
> - ファイル形式はTSV（タブ区切り）
> - ヘッダ有り
> - 文字コードはUTF-8

In [128]:
df_product_tmp_load = spark.read.option("delimiter", "\t").option('charset', 'utf8').option("header", "true").csv("df_product_tmp_header_true_utf8.tsv")
df_product_tmp_load.show(10)

+-----------------+------------------+-----------------+----------+----------+---------+-------------------+--------------------+-------------------+
|category_major_cd|category_medium_cd|category_small_cd|product_cd|unit_price|unit_cost|category_major_name|category_medium_name|category_small_name|
+-----------------+------------------+-----------------+----------+----------+---------+-------------------+--------------------+-------------------+
|                4|               401|            40101|P040101001|       198|      149|               惣菜|              御飯類|             弁当類|
|                4|               401|            40101|P040101002|       218|      164|               惣菜|              御飯類|             弁当類|
|                4|               401|            40101|P040101003|       230|      173|               惣菜|              御飯類|             弁当類|
|                4|               401|            40101|P040101004|       248|      186|               惣菜|              御飯類|

# これで１００本終わりです。おつかれさまでした！