# 特征工程

## 1 配置环境
实验平台：华为云ModelArts CPU 8核32GiB

软件环境：python3.6

In [1]:
!pip install joblib

Collecting joblib
  Downloading http://repo.myhuaweicloud.com/repository/pypi/packages/b8/a6/d1a816b89aa1e9e96bcb298eb1ee1854f21662ebc6d55ffa3d7b3b50122b/joblib-0.15.1-py3-none-any.whl (298kB)
[K    100% |████████████████████████████████| 307kB 33.2MB/s ta 0:00:01
[?25hInstalling collected packages: joblib
Successfully installed joblib-0.15.1
[33mYou are using pip version 9.0.1, however version 20.1.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [2]:
!pip install tqdm

Collecting tqdm
  Downloading http://repo.myhuaweicloud.com/repository/pypi/packages/f3/76/4697ce203a3d42b2ead61127b35e5fcc26bba9a35c03b32a2bd342a4c869/tqdm-4.46.1-py2.py3-none-any.whl (63kB)
[K    100% |████████████████████████████████| 71kB 64.9MB/s ta 0:00:01
[?25hInstalling collected packages: tqdm
Successfully installed tqdm-4.46.1
[33mYou are using pip version 9.0.1, however version 20.1.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [3]:
import numpy as np
import pandas as pd
import os
import time
from joblib import Parallel, delayed
from tqdm.notebook import tqdm
import moxing as mox
from modelarts.session import Session

INFO:root:Using MoXing-v1.15.1-99273b13
INFO:root:Using OBS-Python-SDK-3.1.2


In [4]:
# display设置
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
np.set_printoptions(suppress=True)
pd.set_option('display.float_format', lambda x: '%.3f' % x)

import warnings
warnings.filterwarnings('ignore')

In [5]:
source_file_path = os.environ['HOME'] + '/work/'
source_file_path

'/home/ma-user/work/'

In [7]:
# 从OBS下载数据
session = Session()
session.download_data(bucket_path="etasll2020/port.csv", path=source_file_path+"data/port.csv")
session.download_data(bucket_path="etasll2020/loadingOrderEvent.csv", path=source_file_path+"data/loadingOrderEvent.csv")
session.download_data(bucket_path="etasll2020/A_testData0531.csv", path=source_file_path+"data/A_testData0531.csv")
session.download_data(bucket_path="etasll2020/train0523.tar.gz", path=source_file_path+"data/train0523.tar.gz")

INFO:obs:Successfully download file etasll2020/port.csv from OBS to local /home/ma-user/work/data/port.csv
INFO:obs:Successfully download file etasll2020/loadingOrderEvent.csv from OBS to local /home/ma-user/work/data/loadingOrderEvent.csv
INFO:obs:Successfully download file etasll2020/A_testData0531.csv from OBS to local /home/ma-user/work/data/A_testData0531.csv


Successfully download file etasll2020/port.csv from OBS to local /home/ma-user/work/data/port.csv
Successfully download file etasll2020/loadingOrderEvent.csv from OBS to local /home/ma-user/work/data/loadingOrderEvent.csv
Successfully download file etasll2020/A_testData0531.csv from OBS to local /home/ma-user/work/data/A_testData0531.csv


INFO:obs:Successfully download file etasll2020/train0523.tar.gz from OBS to local /home/ma-user/work/data/train0523.tar.gz


Successfully download file etasll2020/train0523.tar.gz from OBS to local /home/ma-user/work/data/train0523.tar.gz


In [8]:
# 解压
!tar -zxvf data/train0523.tar.gz -C data/

._train0523.csv
train0523.csv


In [9]:
NJOBS = 8 

## 2 数据准备

In [22]:
port_data = pd.read_csv("data/port.csv")
order_data = pd.read_csv("data/loadingOrderEvent.csv")
test_data = pd.read_csv("data/A_testData0531.csv")
reader = pd.read_csv("data/train0523.csv", header=None, chunksize=100000,iterator=True)

## 3 数据清洗

训练集：
1. 对每一行加入了下一个时刻的时间戳、经度、纬度特征
2. 求出连续两个时刻的距离、滞留时间
3. 对方向、速度的缺失值进行计算处理，求出当前方向与航线方向对偏角
3. nextPort, nextETA, status的缺失值进行了bfill处理

**注：由于训练集数据过大，所以采用分批处理和多线程处理的方式，以加速处理；测试集数据量较小，采用直接处理的方式**


测试集：
1. 对每一行加入了下一个时刻的时间戳、经度、纬度特征
2. 求出连续两个时刻的距离、滞留时间
3. 对方向、速度的缺失值进行计算处理，求出当前方向与航线方向对偏角

其他数据：
1. 将port_data中港口的名称转换成大写

### 3.1 特征处理api

In [11]:
# 经纬度处理函数
# reference: https://github.com/yongruihuang/Courier-Competition-Round1/blob/9ceb73055d81eaa351b094c98fda43772fb0efeb/code/tools/pos_encoder.py

def bearing_array(lat1, lng1, lat2, lng2):
    """
    两个经纬度之间的方位
    """
    AVG_EARTH_RADIUS = 6371 # in km
    lng_delta_rad = np.radians(lng2 - lng1)
    lat1, lng1, lat2, lng2 = map(np.radians, (lat1, lng1, lat2, lng2))
    y = np.sin(lng_delta_rad) * np.cos(lat2)
    x = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(lng_delta_rad)
    degree = np.degrees(np.arctan2(y, x))
    return (degree + 360) % 360


def haversine_array(lat1, lng1, lat2, lng2):
    """
    Haversine距离
    """
    lat1, lng1, lat2, lng2 = map(np.radians, (lat1, lng1, lat2, lng2))
    AVG_EARTH_RADIUS = 6371 # in km
    lat = lat2 - lat1
    lng = lng2 - lng1
    d = np.sin(lat * 0.5) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(lng * 0.5) ** 2
    h = 2 * AVG_EARTH_RADIUS * np.arcsin(np.sqrt(d))
    return h


def dummy_manhattan_distance(lat1, lng1, lat2, lng2):
    """
    曼哈顿距离
    """
    a = haversine_array(lat1, lng1, lat1, lng2)
    b = haversine_array(lat1, lng1, lat2, lng1)
    return a + b

In [12]:
def get_timeStamp(timeString, form=0):
    if form == 0:
        timeArray = time.strptime(timeString, '%Y-%m-%dT%H:%M:%S.%fZ')
    if form == 1:
        timeArray = time.strptime(timeString, '%Y/%m/%d %H:%M')
    timeStamp = int(time.mktime(timeArray))
    return timeStamp

def dir_diff(temp_dir,total_dir):
    """
    求每条sample的方向和总方向之间的差异
    """
    raw_diff = np.abs(temp_dir-total_dir)
    if raw_diff > 180:
        diff = 360-raw_diff
    else:
        diff = raw_diff
    return diff

### 3.2 训练集预处理函数

In [13]:
def preprocess_grouped(subset): 
    # next position
    subset['nextLongitude'] = subset['longitude'].shift(-1)
    subset['nextLatitude'] = subset['latitude'].shift(-1)
    
    # direction
    subset['direction'] = subset['direction'] / 100
    empty = subset[subset['direction']==-0.01]
    direct = bearing_array(empty['latitude'], empty['longitude'], empty['nextLatitude'], empty['nextLongitude'])
    subset['direction'].replace(-0.01, direct, inplace=True)
    
    # timeStamp
    subset['timestamp'] = subset['timestamp'].apply(get_timeStamp)
    subset['nextTimestamp'] = subset['timestamp'].shift(-1)
    
    # distance
    subset['distance'] = haversine_array(subset['latitude'], subset['longitude'], subset['nextLatitude'], subset['nextLongitude'])
    # subset['distance'] = subset.fillna(0,inplace = True)

    # speed
    empty = subset[subset['speed']==0]
    speed = empty['distance']/(empty['nextTimestamp']-empty['timestamp'])*3600 
    subset['speed'].replace(0, speed, inplace=True)   

    # vessel
    subset['vesselNextport'] = subset['vesselNextport'].bfill()
    subset['vesselNextportETA'] = subset['vesselNextportETA'].bfill()
    subset['vesselStatus'] = subset['vesselStatus'].bfill()
    subset['vesselStatus'] = subset['vesselStatus'].fillna(0)

    # single anchor time
    subset['anchor']=subset.apply(lambda x: 1 if x['vesselStatus'] == 'moored' or x['vesselStatus'] == 'at anchor'
                        else 0, axis=1)
    subset['singleAnchortime']=(subset['nextTimestamp']-subset['timestamp']) * subset['anchor']

    # direction difference
    final = subset.tail(1)
    lastLongitude = final['longitude'].tolist()
    lastLatitude = final['latitude'].tolist()
    subset['total_dir'] = bearing_array(subset['latitude'], subset['longitude'], lastLatitude, lastLongitude)
    subset['dirDiff'] = subset.apply(lambda x: dir_diff(x['direction'], x['total_dir']), axis=1)
    
    return subset


def preprocess(data, show_progress=False):
    """
    训练集预处理
    """
    
    data_grouped = data.groupby('loadingOrder') 
    if show_progress:
        results = Parallel(n_jobs=NJOBS)(delayed(preprocess_grouped)(group) for name, group in tqdm(data_grouped))
    else:
        results = Parallel(n_jobs=NJOBS)(delayed(preprocess_grouped)(group) for name, group in data_grouped)
    return pd.concat(results)

### 3.3 测试集预处理函数

In [14]:
def preprocess_test(data):
    """
    测试集预处理，所有处理都是inplace的
    """
    data.sort_values(['loadingOrder','carrierName'], inplace=True)
    
    # next position
    data.loc[:,'nextLongitude'] = data.groupby('loadingOrder')['longitude'].shift(-1)
    data.loc[:,'nextLatitude'] = data.groupby('loadingOrder')['latitude'].shift(-1)
    
    # direction
    data.loc[:, 'direction'] /= 100
    empty = data[data['direction']==-0.01]
    direct = bearing_array(empty['latitude'], empty['longitude'], empty['nextLatitude'], empty['nextLongitude'])
    data.loc[:, 'direction'].replace(-0.01, direct, inplace=True)
    
    # time stamp
    data.loc[:,'timestamp'] = data['timestamp'].apply(get_timeStamp)
    data.loc[:,'nextTimestamp'] = data.groupby('loadingOrder')['timestamp'].shift(-1)
    
    # distance
    data.loc[:,'distance'] = haversine_array(data['latitude'], data['longitude'], data['nextLatitude'], data['nextLongitude'])
    
    # speed
    empty = data[data['speed']==0]
    speed = empty['distance']/(empty['nextTimestamp']-empty['timestamp'])*3600 
    data.loc[:, 'speed'].replace(0, speed, inplace=True)   

### 3.4 其他数据

In [15]:
# 把港口名称名称大写
port_data['TRANS_NODE_NAME'] = port_data['TRANS_NODE_NAME'].str.upper()

## 4 特征提取

提取特征（共14+1个）

起点终点的经纬度、滞留时间、航线距离、速度及偏角的最大/最小/均值/中位数、订单出现的次数（用于二次预处理）

### 4.1 特征提取api

In [16]:
# with open("data/country_abb.txt") as f:
#     country_list = f.readline()
# country_list = country_list.split(',')
    
def unify_name(name, mode='test'):
    """
    对nextPort命名不规范的名称进行处理
    可能的不规范命名：XX XXX > XX XXX
                      XX XXX
                      XXX > XXX
    """
    if mode == 'test':
        name = name.split('-')[-1].strip()
        return name
    
# 训练集中数据量太大，处理时间太长，暂不考虑处理
#     name = name.upper()
#     raw_name = name
#     try:
#         if '>' in name:
#             if len(name.split('>')[-1].strip()) == 1: 
#                 name = name.split('>')[0].strip()
#             else:
#                 name = name.split('>')[-1].strip()
#         if '-' in name:
#             if len(name.split('-')[-1].strip()) == 1:
#                 name = name.split('-')[0].strip()
#             else:
#                 name = name.split('-')[-1].strip()
#         if '<' in name:
#             name = name.split('<')[0].strip()
#         names = name.split()
#         if names[0] in country_list:
#             name = names[-1]
#         else:
#             name = names[0]
#         return name
#     except IndexError:
#         print(raw_name)
#         print("***********************************")
#         return raw_name

    
def find_possible_ports(name):
    """
    对name在port_data中找可能对应的名字，返回列表
    """
    # 如果恰有相同名称的直接返回index （可能仍然有多个index: 重名）
    if len(port_data[port_data['TRANS_NODE_NAME']==name]) != 0: 
        return port_data[port_data['TRANS_NODE_NAME']==name].index.to_list()
    # 否则查找名称是否有包含name的
    _possible_ports = []
    for p in port_data['TRANS_NODE_NAME'].unique():            
        if name in p:
            _possible_ports.extend(port_data[port_data['TRANS_NODE_NAME']==p].index.to_list())
    return _possible_ports
    
    
def get_test_destination(test_data,loadingOrder):
    """
    找到测试集中的出发点的经纬度
    """
    _test_data = test_data[test_data['loadingOrder']==loadingOrder]
    path = _test_data['TRANSPORT_TRACE'].unique()[0]
    des_name = unify_name(path, mode='test')
    possible_ports = find_possible_ports(des_name)
    if len(possible_ports)==0:
        return -1
    des_lon = port_data.loc[possible_ports,'LONGITUDE'].values[0]
    des_lat = port_data.loc[possible_ports,'LATITUDE'].values[0]
    return des_lon, des_lat

### 4.1 训练集特征提取函数

In [17]:
def add_feature_grouped(subset):
    ans = subset.head(1)
    
    # anchor time
    anchorTime = subset['singleAnchortime'].sum()
    ans['anchorTime'] = anchorTime
    
    # speed
    speed = subset['speed'].agg({'maxSpeed': 'max', 'minSpeed': 'min','meanSpeed':'mean','medianSpeed':'median'})
    ans['maxSpeed'],ans['minSpeed'],ans['meanSpeed'],ans['medianSpeed'] = speed
    
    # direction
    diff = subset['dirDiff'].agg({'maxDirdiff': 'max', 'minDirdiff': 'min','meanDirdiff':'mean','medianDirdiff':'median'})
    ans['maxDirdiff'],ans['minDirdiff'],ans['meanDirdiff'],ans['medianDirdiff'] = diff
    
    # distance
    first = subset.head(1)
    final = subset.tail(1)
    lastLongitude = final['longitude'].to_list()[0]
    lastLatitude = final['latitude'].to_list()[0]
    ans['lastLatitude'] = lastLatitude
    ans['lastLongitude'] = lastLongitude
    ans['manDis'] = haversine_array(ans['latitude'].to_list()[0], ans['longitude'].to_list()[0], lastLatitude, lastLongitude)
    
    # time interval
    ans['timeInterval'] = final['timestamp'].to_list()[0] - first['timestamp'].to_list()[0]
    
    ans['number'] = len(subset)
    
    feature = ['loadingOrder', 'longitude', 'latitude', 'lastLongitude', 'lastLatitude',
   'anchorTime', 'manDis', 'maxSpeed', 'minSpeed', 'meanSpeed', 'medianSpeed',
   'maxDirdiff', 'minDirdiff', 'meanDirdiff', 'medianDirdiff', 'timeInterval', 'number'] 
    
    return ans[feature]


def add_feature(data, show_progress=False):
    data_grouped = data.groupby('loadingOrder')
    if show_progress:
        feature = Parallel(n_jobs=NJOBS)(delayed(add_feature_grouped)(group) for name, group in tqdm(data_grouped))
    else:
        feature = Parallel(n_jobs=NJOBS)(delayed(add_feature_grouped)(group) for name, group in data_grouped)
    return pd.concat(feature)

### 4.3 测试集特征提取函数

In [18]:
def add_feature_test(data):
    """
    将相同订单号的数据合并并添加特征
    @param:
    data：测试数据预处理过的训练数据
    mode:训练集和测试集有不同的处理方式
    """
    df = data
    
    df['anchor']=df.apply(lambda x: 1 if x['speed'] == 0 else 0, axis=1)
    df['singleAnchortime']=(df['nextTimestamp']-df['timestamp']) * df['anchor']
    anchorTime = df.groupby('loadingOrder')['singleAnchortime'].sum().tolist()

    ans = df.groupby('loadingOrder').head(1)
    total_dis = df.groupby('loadingOrder')['distance'].sum().tolist()
    ans['totalDis']=total_dis
    group = df.groupby('loadingOrder')['speed'].agg({'maxSpeed': 'max', 'minSpeed': 'min','meanSpeed':'mean','medianSpeed':'median'}).reset_index()
    ans=ans.merge(group,on='loadingOrder',how='left')
    ans['anchorTime']=anchorTime
    ans['lastLongitude']=ans['loadingOrder'].apply(lambda x: get_test_destination(test_data,x)[0])
    ans['lastLatitude']=ans['loadingOrder'].apply(lambda x: get_test_destination(test_data,x)[1])
    ans['total_dir']=bearing_array(ans['latitude'],ans['longitude'],ans['lastLatitude'],ans['lastLongitude'])
    temp=ans[['loadingOrder','total_dir']]
    df = df.merge(temp,on = 'loadingOrder',how='left')
    df['dirDiff']= df.apply(lambda x: dir_diff(x['direction'],x['total_dir']),axis =1)
    diff = df.groupby('loadingOrder')['dirDiff'].agg({'maxDirdiff': 'max', 'minDirdiff': 'min','meanDirdiff':'mean','medianDirdiff':'median'}).reset_index()

    ans=ans.merge(diff,on='loadingOrder',how='left')
    ans['manDis']=haversine_array(ans['latitude'],ans['longitude'],ans['lastLatitude'],ans['lastLongitude'])
    feature = ['loadingOrder', 'longitude', 'latitude', 'lastLongitude', 'lastLatitude',
       'anchorTime', 'manDis', 'maxSpeed', 'minSpeed', 'meanSpeed', 'medianSpeed',
       'maxDirdiff', 'minDirdiff', 'meanDirdiff', 'medianDirdiff']
    ans = ans[feature]
    
    return ans

## 5 运行

调用以上的函数进行处理，并将处理好数据的保存到桶里

### 5.1 测试集

In [24]:
preprocess_test(test_data)
test_feature = add_feature_test(test_data)
test_feature.to_csv("data/test_feature.csv", index=False)
mox.file.copy_parallel("data/test_feature.csv", "s3://etasll2020/saved/test_feature.csv")
print("test_feature.csv saved!")

test_feature.csv saved!


### 5.2 训练集

In [26]:
loop = True
i = 0
while loop:
    try:
        i+=1
        chunk = reader.get_chunk()
        chunk.columns = ['loadingOrder','carrierName','timestamp','longitude',
                  'latitude','vesselMMSI','speed','direction','vesselNextport',
                  'vesselNextportETA','vesselStatus','vesselDatasource','TRANSPORT_TRACE']
        train_data = preprocess(chunk)        
        features = add_feature(train_data)
        filepath = "feature200_new/feature"+str(i)+".csv"
        features.to_csv(filepath)
        mox.file.copy_parallel(filepath, 's3://etasll2020/saved/'+filepath) 
        print("feature200_new/feature"+str(i)+".csv saved!")
        break
    except StopIteration:
        loop = False

feature200_new/feature1.csv saved!
