# 自动化特征工程 - 特征生成

<p style="text-indent: 2em; font-family: 微软雅黑; FONT-SIZE: 100%; COLOR: #191919; line-height: 1.5em;">
此文档是根据IBM 实验室在自动化特征工程领域的研究"One button machine for automating feature engineering in relational databases"及相关文献的思路对开放数据集“Berka dataset”进行特征工程的初步实践。
</p>
<p style="text-indent: 2em; font-family: 微软雅黑; FONT-SIZE: 100%; COLOR: #191919; line-height: 1.5em;">
特征生成部分是根据实体间关系，通过属性相应规则进行特征转换，最后扁平化为一个初始建模数据集。
</p>

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import os


In [2]:
#--conf spark.sql.catalogImplementation=in-memory
spark = SparkSession \
    .builder \
    .appName("Feature Engineering Prepare example") \
    .config("spark.sql.execution.arrow.enabled", "false") \
    .getOrCreate()

In [3]:
path = os.environ['DSX_PROJECT_DIR']+'/datasets'

In [4]:
path_data = os.environ['DSX_PROJECT_DIR']+'/../demo/datasets'

## 1. 读入数据文件形成数据关系表

### 1.1 读入客户数据文件

<p style="text-indent: 2em; font-family: 微软雅黑; FONT-SIZE: 100%; COLOR: #191919; line-height: 1.5em;">
读入经过预处理的客户文件 - client.csv，并构建Dataframe。
<ul>
    <li>client_id:&nbsp;&nbsp;客户id，主键</li>
    <li>gender:&nbsp;&nbsp;性别 1-男&nbsp;&nbsp;2-女</li>
    <li>birth_dt:&nbsp;&nbsp;生日，日期类型</li>
    <li>district_id:&nbsp;&nbsp;分支机构id，分支区域外键</li>
</ul>
</p>

In [5]:
sch_client = StructType([
    StructField("client_id", IntegerType(), True),
    StructField("gender", IntegerType(), True),
    StructField("birth_dt", DateType(), True),
    StructField("district_id", IntegerType(), True)
    ])

df_client = spark.read.csv(path_data+"/client.csv", header=True, schema=sch_client)

### 1.2 读入账户数据文件

<p style="text-indent: 2em; font-family: 微软雅黑; FONT-SIZE: 100%; COLOR: #191919; line-height: 1.5em;">
读入经过预处理的账户文件 - account.csv，并构建Dataframe。
<ul>
    <li>account_id:&nbsp;&nbsp;账户id，主键</li>
    <li>district_id:&nbsp;&nbsp;账户所在地区，地区表外键</li>
    <li>frequency:&nbsp;&nbsp;账单频率，1-每月&nbsp;&nbsp;2-每周&nbsp;&nbsp;3-交易发生时</li>
    <li>open_id:&nbsp;&nbsp;开户日期</li>
</ul>
</p>

In [6]:
sch_account = StructType([
    StructField("account_id", IntegerType(), True),
    StructField("district_id", IntegerType(), True),
    StructField("frequency", IntegerType(), True),
    StructField("open_dt", DateType(), True)
    ])

df_account = spark.read.csv(path_data+"/account.csv", header=True, schema=sch_account)

In [7]:
df_account.show(2)

+----------+-----------+---------+----------+
|account_id|district_id|frequency|   open_dt|
+----------+-----------+---------+----------+
|       576|         55|        1|1993-01-01|
|      3818|         74|        1|1993-01-01|
+----------+-----------+---------+----------+
only showing top 2 rows



### 1.3 读入交易数据文件

<p style="text-indent: 2em; font-family: 微软雅黑; FONT-SIZE: 100%; COLOR: #191919; line-height: 1.5em;">
读入经过预处理的交易文件 - transaction.csv，并构建Dataframe。
<ul>
    <li>trans_id:&nbsp;&nbsp;交易id，主键</li>
    <li>account_id:&nbsp;&nbsp;账户id，账户表外键</li>
    <li>trans_dt:&nbsp;&nbsp;交易日期，日期类型</li>
    <li>type:&nbsp;&nbsp;会计方式&nbsp;&nbsp;1-贷记(存入)&nbsp;&nbsp;2-借记(支取)</li>
    <li>operation:&nbsp;&nbsp;操作类型&nbsp;&nbsp;1-信用卡取现&nbsp;&nbsp;2-现金存入&nbsp;&nbsp;3-从他行收款&nbsp;&nbsp;4-现金支取&nbsp;&nbsp;5-汇出他行</li>
    <li>amount:&nbsp;&nbsp;交易金额</li>
    <li>balance:&nbsp;&nbsp;账户余额</li>
    <li>k_symbol:&nbsp;&nbsp;交易特点&nbsp;&nbsp;1-保险支付&nbsp;&nbsp;2-账单支付&nbsp;&nbsp;3-利息存入&nbsp;&nbsp;4-罚息&nbsp;&nbsp;5-家庭支出&nbsp;&nbsp;6-养老金支付&nbsp;&nbsp;7-贷款支付&nbsp;&nbsp;8-其它</li>
    <li>bank:&nbsp;&nbsp;他行代码</li>
    <li>account:&nbsp;&nbsp;他行账号</li>
</ul>
</p>

In [8]:
sch_transaction = StructType([
    StructField("trans_id", IntegerType(), True),
    StructField("account_id", IntegerType(), True),
    StructField("trans_dt", DateType(), True),
    StructField("type", IntegerType(), True),
    StructField("operation", IntegerType(), True),
    StructField("amount", DecimalType(11,2), True),
    StructField("balance", DecimalType(11,2), True),
    StructField("k_symbol", IntegerType(), True),
    StructField("bank", StringType(), True),
    StructField("account", StringType(), True),
    ])

df_transaction = spark.read.csv(path_data+"/transaction.csv", header=True, schema=sch_transaction)

### 1.4 读入关系数据文件

<p style="text-indent: 2em; font-family: 微软雅黑; FONT-SIZE: 100%; COLOR: #191919; line-height: 1.5em;">
读入经过预处理的关系文件 - disposition.csv，并构建Dataframe。关系文件主要用来描述客户、账户实体之间的多对多关系。
<ul>
    <li>disp_id:&nbsp;&nbsp;关系id，主键</li>
    <li>client_id:&nbsp;&nbsp;客户id，客户表外键</li>
    <li>account_id:&nbsp;&nbsp;账户id，账户表外键</li>
    <li>type:&nbsp;&nbsp;关系类型&nbsp;&nbsp;1-拥有者&nbsp;&nbsp;2-使用者</li>
</ul>
</p>

In [9]:
sch_disposition = StructType([
    StructField("disp_id", IntegerType(), True),
    StructField("client_id", IntegerType(), True),
    StructField("account_id", IntegerType(), True),
    StructField("type", IntegerType(), True),
    ])

df_disposition = spark.read.csv(path_data+"/disposition.csv", header=True, schema=sch_disposition)

In [10]:
df_disposition.show(2)

+-------+---------+----------+----+
|disp_id|client_id|account_id|type|
+-------+---------+----------+----+
|      1|        1|         1|   1|
|      2|        2|         2|   1|
+-------+---------+----------+----+
only showing top 2 rows



### 1.5 读入信用卡数据文件

<p style="text-indent: 2em; font-family: 微软雅黑; FONT-SIZE: 100%; COLOR: #191919; line-height: 1.5em;">
读入经过预处理的信用卡数据文件 - card.csv，并构建Dataframe。信用卡是账户的一种服务。
<ul>
    <li>card_id:&nbsp;&nbsp;卡id，主键</li>
    <li>disp_id:&nbsp;&nbsp;关系id，关系表外键</li>
    <li>type:&nbsp;&nbsp;类型&nbsp;&nbsp;1-学生卡&nbsp;&nbsp;2-普通卡&nbsp;&nbsp;3-金卡</li>
    <li>issued_dt:&nbsp;&nbsp;发卡日期，日期类型</li>
</ul>
</p>

In [11]:
sch_card = StructType([
    StructField("card_id", IntegerType(), True),
    StructField("disp_id", IntegerType(), True),
    StructField("type", IntegerType(), True),
    StructField("issued_dt", DateType(), True)
    ])

df_card = spark.read.csv(path_data+"/card.csv", header=True, schema=sch_card)

In [12]:
df_card.show(5)

+-------+-------+----+----------+
|card_id|disp_id|type| issued_dt|
+-------+-------+----+----------+
|   1005|   9285|   2|1993-11-07|
|    104|    588|   2|1994-01-19|
|    747|   4915|   2|1994-02-05|
|     70|    439|   2|1994-02-08|
|    577|   3687|   2|1994-02-15|
+-------+-------+----+----------+
only showing top 5 rows



### 1.6 读入贷款数据文件

<p style="text-indent: 2em; font-family: 微软雅黑; FONT-SIZE: 100%; COLOR: #191919; line-height: 1.5em;">
读入经过预处理的贷款数据文件 - loan.csv，并构建Dataframe。贷款是账户的一种服务。一个账户理论可以有多笔贷款。在这个实验中，贷款表是主表。
<ul>
    <li>loan_id:&nbsp;&nbsp;贷款id，主键</li>
    <li>account_id:&nbsp;&nbsp;账户id，账户表外键</li>
    <li>granted_dt:&nbsp;&nbsp;审批日期，日期类型</li>
    <li>amount:&nbsp;&nbsp;贷款金额</li>
    <li>duration:&nbsp;&nbsp;贷款期限(月)</li>
    <li>psyments:&nbsp;&nbsp;还款额</li>
    <li>label:&nbsp;&nbsp;违约状态&nbsp;&nbsp;0-正常&nbsp;&nbsp;2-违约&nbsp;&nbsp;这个是目标变量</li>
</ul>
</p>

In [13]:
sch_loan = StructType([
    StructField("loan_id", IntegerType(), True),
    StructField("account_id", IntegerType(), True),
    StructField("granted_dt", DateType(), True),
    StructField("amount", DecimalType(11,2), True),
    StructField("duration", IntegerType(), True),
    StructField("payments", DecimalType(11,2), True),
    StructField("label", IntegerType(), True)
    ])

df_loan = spark.read.csv(path_data+"/loan.csv", header=True, schema=sch_loan)

In [14]:
df_loan.show(2)

+-------+----------+----------+---------+--------+--------+-----+
|loan_id|account_id|granted_dt|   amount|duration|payments|label|
+-------+----------+----------+---------+--------+--------+-----+
|   5314|      1787|1993-07-05| 96396.00|      12| 8033.00|    1|
|   5316|      1801|1993-07-11|165960.00|      36| 4610.00|    0|
+-------+----------+----------+---------+--------+--------+-----+
only showing top 2 rows



### 1.7 读入他行支付数据文件

<p style="text-indent: 2em; font-family: 微软雅黑; FONT-SIZE: 100%; COLOR: #191919; line-height: 1.5em;">
读入经过预处理的他行支付数据文件 - payorder.csv，并构建Dataframe。他行支付是该账户对他行的汇出指令。
<ul>
    <li>order_id:&nbsp;&nbsp;指令id，主键</li>
    <li>account_id:&nbsp;&nbsp;账户id，账户表外键</li>
    <li>bank_to:&nbsp;&nbsp;他行代码</li>
    <li>account_to:&nbsp;&nbsp;他行账号</li>
    <li>amount:&nbsp;&nbsp;支付金额</li>
    <li>k_symbol:&nbsp;&nbsp;支付类型&nbsp;&nbsp;1-保险金支付&nbsp;&nbsp;2-家庭支付&nbsp;&nbsp;3-租借支付&nbsp;&nbsp;4-贷款支付&nbsp;&nbsp;6-其它</li>
</ul>
</p>

In [15]:
sch_payorder = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("account_id", IntegerType(), True),
    StructField("bank_to", StringType(), True),
    StructField("account_to", StringType(), True),
    StructField("amount", DecimalType(11,2), True),
    StructField("k_symbol", IntegerType(), True)
    ])

df_payorder = spark.read.csv(path_data+"/payorder.csv", header=True, schema=sch_payorder)

In [16]:
df_payorder.show(2)

+--------+----------+-------+----------+-------+--------+
|order_id|account_id|bank_to|account_to| amount|k_symbol|
+--------+----------+-------+----------+-------+--------+
|   29401|         1|     YZ|  87144583|2452.00|       2|
|   29402|         2|     ST|  89597016|3372.70|       4|
+--------+----------+-------+----------+-------+--------+
only showing top 2 rows



### 1.8 读入账户地区数据文件

<p style="text-indent: 2em; font-family: 微软雅黑; FONT-SIZE: 100%; COLOR: #191919; line-height: 1.5em;">
读入经过预处理的账户地区数据文件 - district.csv，并构建Dataframe。
<ul>
    <li>district_id:&nbsp;&nbsp;地区id，主键</li>
    <li>name:&nbsp;&nbsp;账户所在分支机构地区名称</li>
    <li>region:&nbsp;&nbsp;地域</li>
    <li>Inhabitants:&nbsp;&nbsp;地区人口</li>
    <li>Municipalities499:&nbsp;&nbsp;该地区小于499居民的行政区数量</li>
    <li>Municipalities1999:&nbsp;&nbsp;该地区在500-1999之间居民的行政区数量</li>
    <li>Municipalities9999:&nbsp;&nbsp;该地区在2000-9999之间居民的行政区数量</li>
    <li>Municipalities10000:&nbsp;&nbsp;该地区大于10000居民的行政区数量</li>
    <li>cities:&nbsp;&nbsp;城市数</li>
    <li>urbanratio:&nbsp;&nbsp;城市数</li>
    <li>avgsalary:&nbsp;&nbsp;该地区平均工资</li>
    <li>unemploy95:&nbsp;&nbsp;该地区1995年失业率</li>
    <li>unemploy96:&nbsp;&nbsp;该地区1996年失业率</li>
    <li>Enterpreneurs:&nbsp;&nbsp;该地区1000居民中企业家数量</li>
    <li>crimes95:&nbsp;&nbsp;该地区1995年犯罪率</li>
    <li>crimes96:&nbsp;&nbsp;该地区1996年犯罪率</li>
</ul>
</p>

In [17]:
sch_district = StructType([
    StructField("district_id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("region", StringType(), True),
    StructField("Inhabitants", IntegerType(), True),
    StructField("Municipalities499", IntegerType(), True),
    StructField("Municipalities1999", IntegerType(), True),
    StructField("Municipalities9999", IntegerType(), True),
    StructField("Municipalities10000", IntegerType(), True),
    StructField("cities", IntegerType(), True),
    StructField("urbanratio", DecimalType(5,2), True),
    StructField("avgsalary", DecimalType(11,2), True),
    StructField("unemploy95", DecimalType(5,2), True),
    StructField("unemploy96", DecimalType(5,2), True),
    StructField("Enterpreneurs", IntegerType(), True),
    StructField("crimes95", IntegerType(), True),
    StructField("crimes96", IntegerType(), True)
    ])

df_district = spark.read.csv(path_data+"/district.csv", header=True, schema=sch_district)

In [18]:
df_district.show(2)

+-----------+-----------+---------------+-----------+-----------------+------------------+------------------+-------------------+------+----------+---------+----------+----------+-------------+--------+--------+
|district_id|       name|         region|Inhabitants|Municipalities499|Municipalities1999|Municipalities9999|Municipalities10000|cities|urbanratio|avgsalary|unemploy95|unemploy96|Enterpreneurs|crimes95|crimes96|
+-----------+-----------+---------------+-----------+-----------------+------------------+------------------+-------------------+------+----------+---------+----------+----------+-------------+--------+--------+
|          1|Hl.m. Praha|         Prague|    1204953|                0|                 0|                 0|                  1|     1|    100.00| 12541.00|      0.29|      0.43|          167|   85677|   99107|
|          2|    Benesov|central Bohemia|      88884|               80|                26|                 6|                  2|     5|     46.70|  850

## 2. 特征生成

<p style="text-indent: 2em; font-family: 微软雅黑; FONT-SIZE: 100%; COLOR: #191919; line-height: 1.5em;">
根据梳理的路径，可以在技术上分为3层，从0层主表贷款表出发，到1层的账户表，再到2层的客户、地区、支付指令、交易、信用卡，最后是属性变量。具体计算上，采用从下向上的汇集方式。
</p>

In [19]:
df_client.createOrReplaceTempView("client")
df_account.createOrReplaceTempView("account")
df_transaction.createOrReplaceTempView("transaction")
df_card.createOrReplaceTempView("card")
df_district.createOrReplaceTempView("district")
df_disposition.createOrReplaceTempView("disposition")
df_loan.createOrReplaceTempView("loan")
df_payorder.createOrReplaceTempView("payorder")

### 2.1 生成客户到账户的特征

<p style="text-indent: 2em; font-family: 微软雅黑; FONT-SIZE: 100%; COLOR: #191919; line-height: 1.5em;">
客户表与账户表为多对多关系，可以从技术上拆分为一对多和多对一两个，首先处理账户到客户多对一的关系。
</p>
<p style="text-indent: 2em; font-family: 微软雅黑; FONT-SIZE: 100%; COLOR: #191919; line-height: 1.5em;">
每个账户只有一个拥有着，因此，加上type=1条件可以形成多对一关系。而多对一关系在oneBM中成为一对一方式，转换时直接使用转换器即可，无需处理合计、平均等汇聚函数。
</p>

In [20]:
df_client_ex = spark.sql("select d.account_id, d.client_id, c.gender, \
                datediff('1999-01-01', c.birth_dt) as birth_days from \
                client c, disposition d where c.client_id=d.client_id \
                and d.type=1 \
        ")

In [21]:
df_client_ex.show(2)

+----------+---------+------+----------+
|account_id|client_id|gender|birth_days|
+----------+---------+------+----------+
|         1|        1|     2|     10246|
|         2|        2|     1|     19689|
+----------+---------+------+----------+
only showing top 2 rows



### 2.2 生成地区到账户的特征

<p style="text-indent: 2em; font-family: 微软雅黑; FONT-SIZE: 100%; COLOR: #191919; line-height: 1.5em;">
同样，地区表与账户表为一对多关系，在oneBM中成为一对一方式，转换时直接使用转换器即可。
</p>

<p style="text-indent: 2em; font-family: 微软雅黑; FONT-SIZE: 100%; COLOR: #191919; line-height: 1.5em;">
地区表中region字段实际是枚举类型，可以转换为数字类型，在oneBM中应该按照出现频率排序，此略。
</p>

In [22]:
spark.sql("select distinct region from \
                district d, account a where a.district_id=d.district_id \
        ").show()

+---------------+
|         region|
+---------------+
|         Prague|
|  north Bohemia|
|   east Bohemia|
|  south Moravia|
|  north Moravia|
|central Bohemia|
|   west Bohemia|
|  south Bohemia|
+---------------+



In [23]:
df_district_ex = spark.sql("select a.account_id, case region when 'Prague' then 1 \
            when 'north Bohemia' then 2 \
            when 'east Bohemia' then 3 \
            when 'south Moravia' then 4 \
            when 'north Moravia' then 5 \
            when 'central Bohemia' then 6 \
            when 'west Bohemia' then 7 \
            when 'south Bohemia' then 8 end as region_tp, \
            Inhabitants, Municipalities499, Municipalities1999, \
            Municipalities9999, Municipalities10000, cities, urbanratio, \
            avgsalary, unemploy95, unemploy96, Enterpreneurs, \
            crimes95, crimes96 \
            from district d, account a where a.district_id=d.district_id \
        ")

### 2.3 生成客户表的多对一方式特征

<p style="text-indent: 2em; font-family: 微软雅黑; FONT-SIZE: 100%; COLOR: #191919; line-height: 1.5em;">
客户表与账户表还可以表达为多对一关系，即一个账户会有多个客户。因此，需要进行分组统计相应计算，由于客户表属性很少，统计计算只有一个计数。
</p>

In [24]:
df_client_ex1 = spark.sql("select d.account_id, count(1) as client_cnt \
                from client c, disposition d where c.client_id=d.client_id \
                group by d.account_id \
        ")

### 2.4 生成支付指令表特征

<p style="text-indent: 2em; font-family: 微软雅黑; FONT-SIZE: 100%; COLOR: #191919; line-height: 1.5em;">
支付指令表与账户表为多对一关系，即一个账户会有多个支付指令。因此，需要进行分组统计相应计算。
</p>
<p style="text-indent: 2em; font-family: 微软雅黑; FONT-SIZE: 100%; COLOR: #191919; line-height: 1.5em;">
需要注意的是，由于每个账户的支付指令数量不一，从0到多，因此在分组计算中可能会出现null值，需要加以处理。
</p>

In [25]:
df_payorder_ex = spark.sql("select a.account_id, \
            COALESCE(p.cnt, 0) as cnt, \
            COALESCE(p.amt_s, 0) as amt_s, \
            COALESCE(p.amt_a, 0) as amt_a, \
            COALESCE(p.amt_t, 0) as amt_t, \
            COALESCE(p.amt_x, 0) as amt_x, \
            COALESCE(p.symbol_1, 0) as symbol_1, \
            COALESCE(p.symbol_2, 0) as symbol_2, \
            COALESCE(p.symbol_3, 0) as symbol_3, \
            COALESCE(p.symbol_4, 0) as symbol_4, \
            COALESCE(p.symbol_6, 0) as symbol_6 \
            from account a left outer join \
            (select account_id, count(1) as cnt, \
            sum(amount) as amt_s, avg(amount) as amt_a, \
            case count(1) when 1 then 0 else std(amount) end as amt_t, max(amount) as amt_x, \
            sum(case k_symbol when 1 then 1 else 0 end) as symbol_1, \
            sum(case k_symbol when 2 then 1 else 0 end) as symbol_2, \
            sum(case k_symbol when 3 then 1 else 0 end) as symbol_3, \
            sum(case k_symbol when 4 then 1 else 0 end) as symbol_4, \
            sum(case k_symbol when 6 then 1 else 0 end) as symbol_6 \
            from payorder \
            group by account_id) p on a.account_id=p.account_id \
        ")

### 2.5 信用卡特征生成

<p style="text-indent: 2em; font-family: 微软雅黑; FONT-SIZE: 100%; COLOR: #191919; line-height: 1.5em;">
信用卡表与账户表是多对一关系，由于计算最高卡类别方法与其它函数处理不同，因此分为两步。
</p>


<p style="text-indent: 2em; font-family: 微软雅黑; FONT-SIZE: 100%; COLOR: #191919; line-height: 1.5em;">
以下是信用卡处理一般统计数量函数。需要注意的是并非所有账户都有信用卡，因此新卡数量特征需要相应赋值为0。
</p>

In [26]:
df_card_ex1 = spark.sql("select a.account_id, COALESCE(c.cnt, 0) as card_cnt \
            from account a left outer join \
            (select d.account_id, count(1) as cnt \
            from card c, disposition d where c.disp_id=d.disp_id \
            group by d.account_id) c on a.account_id=c.account_id \
        ")

<p style="text-indent: 2em; font-family: 微软雅黑; FONT-SIZE: 100%; COLOR: #191919; line-height: 1.5em;">
获取账户最高级别的信用卡相应属性。
</p>

In [27]:
df_card_ex2 = spark.sql("select a.account_id, COALESCE(c.type, 0) as card_tp, \
            COALESCE(c.issued_days, -1) as card_issued_days \
            from account a left outer join \
            (select account_id, type, datediff('1999-01-01', issued_dt) as issued_days \
            from (select d.account_id, c.issued_dt, c.type, rank() \
                    over(partition by d.account_id, c.type order by c.issued_dt) as rk \
                    from (select dd.account_id, max(cc.type) as ty \
                            from card cc, disposition dd where cc.disp_id=dd.disp_id \
                            group by dd.account_id) a, card c, disposition d \
                    where c.disp_id=d.disp_id and a.account_id=d.account_id and c.type = a.ty) b \
            where rk=1) c on a.account_id=c.account_id ")

### 2.6 交易特征生成

<p style="text-indent: 2em; font-family: 微软雅黑; FONT-SIZE: 100%; COLOR: #191919; line-height: 1.5em;">
交易表与账户表是多对一关系，需要进行分组处理。考虑到交易中还存在着操作类型等枚举字段，进一步可以进行基于枚举的分组统计。
</p>

In [28]:
df_trans_ex = spark.sql("select account_id, count(1) as trans_cnt, \
                sum(amount) as amt_s, avg(amount) as amt_a, std(amount) as amt_d, \
                max(amount) as amt_x, min(amount) as amt_n, \
                sum(balance) as balance_s, avg(balance) as balance_a, std(balance) as balance_d, \
                max(balance) as balance_x, min(balance) as balance_n, \
                datediff('1999-01-01', max(trans_dt)) as trans_dt_r, \
                sum(case operation when 1 then 1 else 0 end) as symbol_1, \
                sum(case operation when 2 then 1 else 0 end) as symbol_2, \
                sum(case operation when 3 then 1 else 0 end) as symbol_3, \
                sum(case operation when 4 then 1 else 0 end) as symbol_4, \
                sum(case operation when 5 then 1 else 0 end) as symbol_5, \
                sum(case operation when 6 then 1 else 0 end) as symbol_6, \
                sum(case operation when 1 then amount else 0 end) as symbol_amt_1_s, \
                sum(case operation when 2 then amount else 0 end) as symbol_amt_2_s, \
                sum(case operation when 3 then amount else 0 end) as symbol_amt_3_s, \
                sum(case operation when 4 then amount else 0 end) as symbol_amt_4_s, \
                sum(case operation when 5 then amount else 0 end) as symbol_amt_5_s, \
                sum(case operation when 6 then amount else 0 end) as symbol_amt_6_s, \
                avg(case operation when 1 then amount else 0 end) as symbol_amt_1_a, \
                avg(case operation when 2 then amount else 0 end) as symbol_amt_2_a, \
                avg(case operation when 3 then amount else 0 end) as symbol_amt_3_a, \
                avg(case operation when 4 then amount else 0 end) as symbol_amt_4_a, \
                avg(case operation when 5 then amount else 0 end) as symbol_amt_5_a, \
                avg(case operation when 6 then amount else 0 end) as symbol_amt_6_a, \
                max(case operation when 1 then amount else 0 end) as symbol_amt_1_x, \
                max(case operation when 2 then amount else 0 end) as symbol_amt_2_x, \
                max(case operation when 3 then amount else 0 end) as symbol_amt_3_x, \
                max(case operation when 4 then amount else 0 end) as symbol_amt_4_x, \
                max(case operation when 5 then amount else 0 end) as symbol_amt_5_x, \
                max(case operation when 6 then amount else 0 end) as symbol_amt_6_x, \
                min(case operation when 1 then amount else 0 end) as symbol_amt_1_n, \
                min(case operation when 2 then amount else 0 end) as symbol_amt_2_n, \
                min(case operation when 3 then amount else 0 end) as symbol_amt_3_n, \
                min(case operation when 4 then amount else 0 end) as symbol_amt_4_n, \
                min(case operation when 5 then amount else 0 end) as symbol_amt_5_n, \
                min(case operation when 6 then amount else 0 end) as symbol_amt_6_n \
          from transaction group by account_id \
        ")

### 2.7 账户到贷款的特征生成

In [29]:
df_client_ex.createOrReplaceTempView("client_ex")
df_client_ex1.createOrReplaceTempView("client_ex1")
#df_account.createOrReplaceTempView("account")
df_trans_ex.createOrReplaceTempView("transaction_ex")
df_card_ex1.createOrReplaceTempView("card_ex1")
df_card_ex2.createOrReplaceTempView("card_ex2")
df_district_ex.createOrReplaceTempView("district_ex")
#df_loan_ex.createOrReplaceTempView("loan_ex")
df_payorder_ex.createOrReplaceTempView("payorder_ex")

<p style="text-indent: 2em; font-family: 微软雅黑; FONT-SIZE: 100%; COLOR: #191919; line-height: 1.5em;">
账户与贷款是一对多关系，因此在oneBM中属于一对一，可以直接使用特征。因此，只是简单连接多表。
</p>


In [30]:
df_account_ex = spark.sql("select a.account_id, a.frequency, datediff('1999-01-01', a.open_dt) as acc_open_days, \
            b.gender, b.birth_days, \
            c.client_cnt, \
            d.trans_cnt, d.amt_s, d.amt_a, d.amt_d, d.amt_x, d.amt_n, d.balance_s, d.balance_a, d.balance_d, \
            d.balance_x, d.balance_n, d.trans_dt_r, d.symbol_1, d.symbol_2, d.symbol_3, d.symbol_4, d.symbol_5, d.symbol_6, \
            d.symbol_amt_1_s, d.symbol_amt_2_s, d.symbol_amt_3_s, d.symbol_amt_4_s, d.symbol_amt_5_s, d.symbol_amt_6_s, \
            d.symbol_amt_1_a, d.symbol_amt_2_a, d.symbol_amt_3_a, d.symbol_amt_4_a, d.symbol_amt_5_a, d.symbol_amt_6_a, \
            d.symbol_amt_1_x, d.symbol_amt_2_x, d.symbol_amt_3_x, d.symbol_amt_4_x, d.symbol_amt_5_x, d.symbol_amt_6_x, \
            d.symbol_amt_1_n, d.symbol_amt_2_n, d.symbol_amt_3_n, d.symbol_amt_4_n, d.symbol_amt_5_n, d.symbol_amt_6_n, \
            e.card_cnt, \
            f.card_tp, f.card_issued_days, \
            g.region_tp, g.Inhabitants, g.Municipalities499, g.Municipalities1999, g.Municipalities9999, \
            g.Municipalities10000, g.cities, g.urbanratio, g.avgsalary, g.unemploy95, g.unemploy96, \
            g.Enterpreneurs, g.crimes95, g.crimes96, \
            h.cnt as payorder_cnt, h.amt_s as payorder_amt_s,  h.amt_a as payorder_amt_a, \
            h.amt_t as payorder_amt_t, h.amt_x as payorder_amt_x, h.symbol_1 as payorder_symbol_1, \
            h.symbol_2 as payorder_symbol_2, h.symbol_3 as payorder_symbol_3, h.symbol_4 as payorder_symbol_4, \
            h.symbol_6 as payorder_symbol_6 \
            from account a, client_ex b, client_ex1 c, \
            transaction_ex d, card_ex1 e, card_ex2 f, \
            district_ex g, payorder_ex h \
            where a.account_id=b.account_id \
            and a.account_id=c.account_id \
            and a.account_id=d.account_id \
            and a.account_id=e.account_id \
            and a.account_id=f.account_id \
            and a.account_id=g.account_id \
            and a.account_id=h.account_id \
            ")

In [31]:
df_account_ex.createOrReplaceTempView("account_ex")

### 2.8 处理贷款表本身属性，并输出为特征宽表

In [32]:
spark.sql("describe table loan").show()

+----------+-------------+-------+
|  col_name|    data_type|comment|
+----------+-------------+-------+
|   loan_id|          int|   null|
|account_id|          int|   null|
|granted_dt|         date|   null|
|    amount|decimal(11,2)|   null|
|  duration|          int|   null|
|  payments|decimal(11,2)|   null|
|     label|          int|   null|
+----------+-------------+-------+



In [33]:
df_loan_ex = spark.sql("select a.*, \
            datediff('1999-01-01', l.granted_dt) as granted_days, \
            l.amount, l.duration, l.payments, l.label \
            from account_ex a, loan l where a.account_id=l.account_id")

In [34]:
df_loan_ex_pd = df_loan_ex.toPandas()

In [35]:
df_loan_ex_pd.to_csv(path+"/loan_ex2.csv", header=True, index=False)

In [36]:
spark.stop()

## 作者

**李英伟 liyingw@cn.ibm.com ** 