In [1]:
from pyspark.sql.types import *

In [2]:
# ユーザデータ読み込み
user_file = r'E:\datafiles\project\pyspark-study\PySparkトレーニング\input\user.csv'
schema = StructType([
    StructField('user_id', StringType(), False),
    StructField('family_name', StringType(), False),
    StructField('personal_name', StringType(), False),
    StructField('age', IntegerType(), False),
    StructField('sex', StringType(), False),
    StructField('birth_date', StringType(), False),
    StructField('pref_cd', StringType(), False),
    StructField('active_flg', IntegerType(), False)
])
df_user = spark.read.csv(user_file, header=False, encoding='utf-8', mode='DROPMALFORMED', schema=schema)
df_user.show(5)

+-------+-----------+-------------+---+---+----------+-------+----------+
|user_id|family_name|personal_name|age|sex|birth_date|pref_cd|active_flg|
+-------+-----------+-------------+---+---+----------+-------+----------+
|   3844|       舟橋|         志歩| 35| 女| 1984/4/21|     17|         1|
|   8945|       茂木|         佳乃| 58| 女| 1961/9/16|     39|         1|
|   5359|       山中|         研一| 38| 男|  1981/5/7|     23|         0|
|   9465|       清家|           鈴| 53| 女|  1966/8/1|     31|         0|
|   9556|       宮城|         敦彦| 56| 男| 1963/5/10|     23|         1|
+-------+-----------+-------------+---+---+----------+-------+----------+
only showing top 5 rows



In [3]:
# 住所データ読み込み
address_file = r'E:\datafiles\project\pyspark-study\PySparkトレーニング\input\zenkoku.csv'
addr = spark.read.csv(address_file, header=True, encoding='shift-jis', mode='DROPMALFORMED')

In [4]:
# 重複削除
addr = addr.select(addr[1], addr[7]).drop_duplicates()

In [5]:
# カラム名変更
#addr = addr.withColumnRenamed('都道府県CD','pref_cd') \
#           .withColumnRenamed('都道府県', 'pref_nm')
# 一括変更
addr = addr.toDF(*['pref_cd', 'pref_nm'])

In [6]:
# ソート
addr = addr.sort(addr.pref_cd)
addr = addr.sort(addr.pref_cd.desc())
addr.show(5)
# 複数キー
df_user.sort(df_user.user_id, df_user.age).show()

+-------+-------+
|pref_cd|pref_nm|
+-------+-------+
|      9| 栃木県|
|      8| 茨城県|
|      7| 福島県|
|      6| 山形県|
|      5| 秋田県|
+-------+-------+
only showing top 5 rows

+-------+-----------+-------------+---+---+----------+-------+----------+
|user_id|family_name|personal_name|age|sex|birth_date|pref_cd|active_flg|
+-------+-----------+-------------+---+---+----------+-------+----------+
|    100|       石垣|         陳雄| 36| 男|  1983/6/9|     23|         1|
|   1005|       助川|         昌之| 35| 男|  1984/6/7|     37|         1|
|   1026|       辻野|         心春| 40| 女| 1979/9/21|     46|         0|
|   1035|       神戸|         信雄| 27| 男| 1992/10/4|     20|         1|
|   1036|         畑|         知世| 48| 女| 1972/1/20|     34|         0|
|   1039|       山岡|         明憲| 44| 男| 1975/6/12|     03|         0|
|   1043|       花井|         雄二| 61| 男| 1958/9/17|     20|         0|
|   1064|       上川|         篤彦| 29| 男|  1991/3/3|     12|         0|
|   1064|       安武|         繁雄| 35| 男| 1984/7/22|    

In [7]:
# 元のDataFrameの型定義を元に、0埋めしたカラムを追加した型定義を作成する
addr = addr.withColumnRenamed('pref_cd', 'pref_cd_org')
new_schema_fields = addr.schema.fields + [StructField('pref_cd', StringType())]
new_schema = StructType(new_schema_fields)

In [8]:
# 0埋め
addr = addr.rdd \
           .map(lambda r: list(r) + [r.pref_cd_org.zfill(2)]) \
           .toDF(new_schema) \
           .select('pref_cd', 'pref_nm')
#addr.rdd.take(5)
#['9', '栃木県'] + ['09'] = ['9', '栃木県', '09']

In [9]:
# sort
addr.sort('pref_cd').show()

+-------+--------+
|pref_cd| pref_nm|
+-------+--------+
|     01|  北海道|
|     02|  青森県|
|     03|  岩手県|
|     04|  宮城県|
|     05|  秋田県|
|     06|  山形県|
|     07|  福島県|
|     08|  茨城県|
|     09|  栃木県|
|     10|  群馬県|
|     11|  埼玉県|
|     12|  千葉県|
|     13|  東京都|
|     14|神奈川県|
|     15|  新潟県|
|     16|  富山県|
|     17|  石川県|
|     18|  福井県|
|     19|  山梨県|
|     20|  長野県|
+-------+--------+
only showing top 20 rows



In [10]:
# join 
addr_user = addr.join(df_user, on='pref_cd', how='left_outer')
addr_user.show()

+-------+-------+-------+-----------+-------------+---+---+----------+----------+
|pref_cd|pref_nm|user_id|family_name|personal_name|age|sex|birth_date|active_flg|
+-------+-------+-------+-----------+-------------+---+---+----------+----------+
|     09| 栃木県|   9035|       鶴岡|           譲| 39| 男| 1981/3/11|         1|
|     09| 栃木県|    845|       手嶋|         恒雄| 48| 男| 1972/1/14|         1|
|     09| 栃木県|   7967|     仲宗根|         紅葉| 43| 女| 1976/6/12|         1|
|     09| 栃木県|   8237|       廣瀬|           豊| 40| 男|1979/12/24|         0|
|     09| 栃木県|   3290|       田山|         伊織| 32| 女| 1987/4/20|         0|
|     09| 栃木県|   1222|       赤川|         好克| 46| 男|1973/10/17|         1|
|     09| 栃木県|   9753|       野田|         奈菜| 46| 女| 1974/3/28|         1|
|     09| 栃木県|   9863|       川田|         寛子| 45| 女| 1974/8/22|         1|
|     09| 栃木県|   2231|       古家|         美月| 19| 女|  2001/2/9|         1|
|     09| 栃木県|   6617|       伊勢|           徹| 59| 男|1960/10/23|         1|
|     09| 栃木

In [11]:
# groupby （グループ化） / count
cnt_pref_all = addr_user.groupby(['pref_cd', 'pref_nm']) \
                         .count() \
                         .withColumnRenamed('count', 'user_cnt') \
                         .sort('pref_cd')
cnt_pref_all.show(50)

+-------+--------+--------+
|pref_cd| pref_nm|user_cnt|
+-------+--------+--------+
|     01|  北海道|      45|
|     02|  青森県|      47|
|     03|  岩手県|      36|
|     04|  宮城県|      43|
|     05|  秋田県|      28|
|     06|  山形県|      53|
|     07|  福島県|      41|
|     08|  茨城県|      40|
|     09|  栃木県|      41|
|     10|  群馬県|      45|
|     11|  埼玉県|      44|
|     12|  千葉県|      37|
|     13|  東京都|      29|
|     14|神奈川県|      46|
|     15|  新潟県|      44|
|     16|  富山県|      53|
|     17|  石川県|      42|
|     18|  福井県|      33|
|     19|  山梨県|      50|
|     20|  長野県|      37|
|     21|  岐阜県|      64|
|     22|  静岡県|      35|
|     23|  愛知県|      49|
|     24|  三重県|      39|
|     25|  滋賀県|      45|
|     26|  京都府|      41|
|     27|  大阪府|      53|
|     28|  兵庫県|      42|
|     29|  奈良県|      46|
|     30|和歌山県|      53|
|     31|  鳥取県|      53|
|     32|  島根県|      35|
|     33|  岡山県|      43|
|     34|  広島県|      29|
|     35|  山口県|      47|
|     36|  徳島県|      38|
|     37|  香川県|   

In [12]:
# sum （合計）
cnt_pref_active = addr_user.groupby(['pref_cd', 'pref_nm']) \
                           .sum('active_flg') \
                           .withColumnRenamed('sum(active_flg)', 'valid_user_cnt') \
                           .sort('pref_cd')
cnt_pref_active.show(50)

+-------+--------+--------------+
|pref_cd| pref_nm|valid_user_cnt|
+-------+--------+--------------+
|     01|  北海道|            30|
|     02|  青森県|            32|
|     03|  岩手県|            15|
|     04|  宮城県|            20|
|     05|  秋田県|            16|
|     06|  山形県|            27|
|     07|  福島県|            22|
|     08|  茨城県|            21|
|     09|  栃木県|            23|
|     10|  群馬県|            28|
|     11|  埼玉県|            30|
|     12|  千葉県|            19|
|     13|  東京都|            19|
|     14|神奈川県|            28|
|     15|  新潟県|            20|
|     16|  富山県|            31|
|     17|  石川県|            19|
|     18|  福井県|            20|
|     19|  山梨県|            21|
|     20|  長野県|            16|
|     21|  岐阜県|            31|
|     22|  静岡県|            17|
|     23|  愛知県|            26|
|     24|  三重県|            20|
|     25|  滋賀県|            22|
|     26|  京都府|            23|
|     27|  大阪府|            26|
|     28|  兵庫県|            17|
|     29|  奈良県|            17|


In [13]:
# mean（平均）
avg_age_active = addr_user.where(addr_user.active_flg == 1) \
                          .groupby(['pref_cd', 'pref_nm']) \
                          .mean('age') \
                          .withColumnRenamed('avg(age)', 'avg_active_age') \
                          .sort('pref_cd')
avg_age_active.show(50)

+-------+--------+------------------+
|pref_cd| pref_nm|    avg_active_age|
+-------+--------+------------------+
|     01|  北海道| 35.36666666666667|
|     02|  青森県|            33.125|
|     03|  岩手県|              36.8|
|     04|  宮城県|             36.85|
|     05|  秋田県|              36.5|
|     06|  山形県|37.074074074074076|
|     07|  福島県|29.181818181818183|
|     08|  茨城県|34.333333333333336|
|     09|  栃木県| 41.82608695652174|
|     10|  群馬県|36.607142857142854|
|     11|  埼玉県| 38.86666666666667|
|     12|  千葉県|35.526315789473685|
|     13|  東京都|33.473684210526315|
|     14|神奈川県| 35.57142857142857|
|     15|  新潟県|              30.0|
|     16|  富山県| 34.61290322580645|
|     17|  石川県|36.473684210526315|
|     18|  福井県|             29.85|
|     19|  山梨県| 36.76190476190476|
|     20|  長野県|           33.3125|
|     21|  岐阜県|33.096774193548384|
|     22|  静岡県|30.529411764705884|
|     23|  愛知県| 35.07692307692308|
|     24|  三重県|              39.5|
|     25|  滋賀県| 32.77272727272727|
|     26|  京

In [14]:
# pandas DataFrameに変換
cnt_pref_all.toPandas()

Unnamed: 0,pref_cd,pref_nm,user_cnt
0,1,北海道,45
1,2,青森県,47
2,3,岩手県,36
3,4,宮城県,43
4,5,秋田県,28
5,6,山形県,53
6,7,福島県,41
7,8,茨城県,40
8,9,栃木県,41
9,10,群馬県,45


In [15]:
from pyspark.sql.functions import *

def to_2digits(cd):
    return cd.zfill(2)

udf_to_2digits = udf(lambda x: x.zfill(2), StringType())

addr.withColumn('pref_cd_2', udf_to_2digits('pref_cd')) \
    .sort('pref_cd') \
    .show()

+-------+--------+---------+
|pref_cd| pref_nm|pref_cd_2|
+-------+--------+---------+
|     01|  北海道|       01|
|     02|  青森県|       02|
|     03|  岩手県|       03|
|     04|  宮城県|       04|
|     05|  秋田県|       05|
|     06|  山形県|       06|
|     07|  福島県|       07|
|     08|  茨城県|       08|
|     09|  栃木県|       09|
|     10|  群馬県|       10|
|     11|  埼玉県|       11|
|     12|  千葉県|       12|
|     13|  東京都|       13|
|     14|神奈川県|       14|
|     15|  新潟県|       15|
|     16|  富山県|       16|
|     17|  石川県|       17|
|     18|  福井県|       18|
|     19|  山梨県|       19|
|     20|  長野県|       20|
+-------+--------+---------+
only showing top 20 rows



In [16]:
# 2列の値を元に新しい列を追加
def to_cd_nm(cd, nm):
    return f'{cd}_{nm}'

udf_to_cd_nm = udf(to_cd_nm, StringType())

addr.withColumn('pref_cd_nm', udf_to_cd_nm('pref_cd','pref_nm')) \
    .sort('pref_cd') \
    .show()

+-------+--------+-----------+
|pref_cd| pref_nm| pref_cd_nm|
+-------+--------+-----------+
|     01|  北海道|  01_北海道|
|     02|  青森県|  02_青森県|
|     03|  岩手県|  03_岩手県|
|     04|  宮城県|  04_宮城県|
|     05|  秋田県|  05_秋田県|
|     06|  山形県|  06_山形県|
|     07|  福島県|  07_福島県|
|     08|  茨城県|  08_茨城県|
|     09|  栃木県|  09_栃木県|
|     10|  群馬県|  10_群馬県|
|     11|  埼玉県|  11_埼玉県|
|     12|  千葉県|  12_千葉県|
|     13|  東京都|  13_東京都|
|     14|神奈川県|14_神奈川県|
|     15|  新潟県|  15_新潟県|
|     16|  富山県|  16_富山県|
|     17|  石川県|  17_石川県|
|     18|  福井県|  18_福井県|
|     19|  山梨県|  19_山梨県|
|     20|  長野県|  20_長野県|
+-------+--------+-----------+
only showing top 20 rows



In [17]:
cnt_pref_active.coalesce(1) \
               .write \
               .csv('pref_users'
                    , sep=','
                    , mode='overwrite'
                    , header=True
                    , encoding='shift-jis')

In [18]:
# 条件に合致する場合のみ編集値設定
udf_slv = udf(lambda age: f'🥈({age})' if age >= 65 else age, StringType())
df_usr_with_slv = df_user.withColumn('age_silver', udf_slv('age'))
df_usr_with_slv.where(df_usr_with_slv.age >= 64).sort('age').show()

+-------+-----------+-------------+---+---+----------+-------+----------+----------+
|user_id|family_name|personal_name|age|sex|birth_date|pref_cd|active_flg|age_silver|
+-------+-----------+-------------+---+---+----------+-------+----------+----------+
|   2469|       西島|         徹子| 64| 女| 1955/9/13|     47|         0|        64|
|   6912|       熊田|         花梨| 64| 女|1955/12/26|     40|         0|        64|
|   7060|       川元|         清隆| 64| 男| 1956/2/24|     11|         0|        64|
|     99|       大迫|         尚生| 64| 男|1955/11/14|     01|         0|        64|
|   8332|       新保|         次男| 64| 男| 1956/4/10|     21|         1|        64|
|   3742|       砂川|         宗男| 64| 男|1955/10/16|     41|         0|        64|
|   2496|       田原|           椿| 64| 女| 1956/3/24|     41|         0|        64|
|   3922|       藤田|         彩加| 64| 女|1955/10/16|     23|         1|        64|
|   5917|       中出|         夕菜| 64| 女|  1956/1/1|     44|         1|        64|
|   3588|       小橋|     

In [20]:
# データ準備（65歳以上のユーザーにslver_flg=1を設定。それ以外はnull）
udf_slv_flg = udf(lambda age: 1 if age >= 65 else None, IntegerType())
df_usr_slv = df_user.withColumn('silver_flg', udf_slv_flg('age'))
df_usr_slv.where(df_usr_slv.age >= 64).sort('age').show()

# 欠損値補完
# subset引数で指定
df_usr_slv2 = df_usr_slv.fillna(0, subset='silver_flg')
df_usr_slv2.where(df_usr_slv2.age >= 64).sort('age').show()

# 辞書で指定
df_usr_slv3 = df_usr_slv.fillna({'silver_flg': 0, 'active_flg': 0})
df_usr_slv3.where(df_usr_slv3.age >= 64).sort('age').show()

+-------+-----------+-------------+---+---+----------+-------+----------+----------+
|user_id|family_name|personal_name|age|sex|birth_date|pref_cd|active_flg|silver_flg|
+-------+-----------+-------------+---+---+----------+-------+----------+----------+
|   2469|       西島|         徹子| 64| 女| 1955/9/13|     47|         0|      null|
|   6912|       熊田|         花梨| 64| 女|1955/12/26|     40|         0|      null|
|   7060|       川元|         清隆| 64| 男| 1956/2/24|     11|         0|      null|
|     99|       大迫|         尚生| 64| 男|1955/11/14|     01|         0|      null|
|   8332|       新保|         次男| 64| 男| 1956/4/10|     21|         1|      null|
|   3742|       砂川|         宗男| 64| 男|1955/10/16|     41|         0|      null|
|   2496|       田原|           椿| 64| 女| 1956/3/24|     41|         0|      null|
|   3922|       藤田|         彩加| 64| 女|1955/10/16|     23|         1|      null|
|   5917|       中出|         夕菜| 64| 女|  1956/1/1|     44|         1|      null|
|   3588|       小橋|     