In [1]:
import os,time,gc,re
import pyspark
import pandas as pd
import numpy as np
from datetime import datetime
import warnings
from dateutil.relativedelta import relativedelta

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext,SparkConf,SQLContext
from pyspark.sql import Window
from pyspark.conf import SparkConf
from pyspark.ml.feature import *
from pyspark.ml import pipeline
from pyspark.ml.evaluation import *
from pyspark.sql import Row
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.linalg import Vectors,VectorUDT
from pyspark.sql.functions import mean,stddev,log
from pyspark.ml.feature import MinMaxScaler,Normalizer
from pyspark.mllib.tree import GradientBoostedTrees
from pyspark.mllib.regression import LabeledPoint

from pyspark.ml.feature import CountVectorizer
from pyspark.mllib.linalg import SparseVector
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import date_format

from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

import pyspark.sql.functions as fn
warnings.filterwarnings('ignore')

In [2]:
ft=time.time()

In [3]:
spark = pyspark.sql.SparkSession.builder.appName('LQ_SKD_D111').config('spark.shuffle.consolidateFiles','true').getOrCreate()
spark.sparkContext.setLogLevel('WARN')

today=datetime.strftime(datetime.now(),'%Y-%m-%d')
print(today)

revise_types=['claim','routine_maintenance','accessory','accident_repair','general_repair',
       'first_maintance','service_act']

2020-08-06


In [4]:
def get_miss_cols_org(dff):
    tmp=dff.agg(*[(1-(fn.count(c)/fn.count('*'))).alias(c) for c in dff.columns])
    cols=tmp.columns
    leng=len(cols)
    lst=tmp.collect()
    miss_cols=[]
    for i in range(leng):
        v=[x[i] for x in lst][0]
        if v>0:
            miss_cols.append(cols[i])
    return miss_cols


def get_miss_cols_new(dff):
    mis_cols=[]
    cols=dff.columns
    cols.remove('vin')
    for x in cols:
        tmp=dff.filter((col(x).isNull())|(col(x)=='')|(col(x)=='-'))
        mis_len=tmp.count()
        if mis_len>0:
            mis_cols.append(x)
    return mis_cols


def check_df(df):
    print(df.count())
    print(df.select('vin','repair_date').dropDuplicates().count())
    print(df.select('vin').distinct().count())

#### <font color=red>1.读取主单

In [5]:
def load_zhudan():

    zhudan=spark.sql('select * from clms.dws_repair_info')

    needs=['vin','repair_date','mileage','dealer_code','repair_amount','repair_amount_pre_discount',
           'labor_fee','part_fee']

    revise_types=['claim','routine_maintenance','accessory','accident_repair','general_repair',
           'first_maintance','service_act']

    zhudan=zhudan.select(needs+revise_types).dropDuplicates()
    zhudan=zhudan.withColumn('brand',substring(col('vin'),1,3)).filter(col('brand')=='LSV').drop('brand').\
    withColumn('repair_date',to_date(col('repair_date'))).withColumnRenamed('mileage','mile').\
    withColumn('mile',col('mile').cast('Int')).\
    withColumn('labor_fee',ceil('labor_fee')).\
    withColumn('part_fee',ceil('part_fee')).\
    withColumn('repair_amount',col('labor_fee')+col('part_fee')).\
    withColumn('repair_amount_pre_discount',ceil('repair_amount_pre_discount')).\
    withColumn('repair_amount_pre_discount',when(col('repair_amount_pre_discount').isNull(),col('repair_amount')).otherwise(col('repair_amount_pre_discount'))).\
    withColumn('claim',when(col('claim').isin(['1.0','1']),lit(1)).otherwise(lit(0))).\
    withColumn('routine_maintenance',when(col('routine_maintenance').isin(['1.0','1']),lit(1)).otherwise(lit(0))).\
    withColumn('accessory',when(col('accessory').isin(['1.0','1']),lit(1)).otherwise(lit(0))).\
    withColumn('accident_repair',when(col('accident_repair').isin(['1.0','1']),lit(1)).otherwise(lit(0))).\
    withColumn('general_repair',when(col('general_repair').isin(['1.0','1']),lit(1)).otherwise(lit(0))).\
    withColumn('first_maintance',when(col('first_maintance').isin(['1.0','1']),lit(1)).otherwise(lit(0))).\
    withColumn('service_act',when(col('service_act').isin(['1.0','1']),lit(1)).otherwise(lit(0))).\
    withColumn('dealer_code',col('dealer_code').cast('int')).\
    withColumn('dealer_code',col('dealer_code').cast('String')).dropDuplicates()
    
    df1=zhudan.groupBy(['vin','repair_date']).agg(max('mile').alias('mile'),
                                                   sum('repair_amount').alias('repair_amount'),
                                                   sum('repair_amount_pre_discount').alias('repair_amount_pre_discount'),
                                                   sum('labor_fee').alias('labor_fee'),
                                                   sum('part_fee').alias('part_fee'),
                                                   sum('claim').alias('claim'),
                                                   sum('routine_maintenance').alias('routine_maintenance'),
                                                   sum('accessory').alias('accessory'),
                                                   sum('accident_repair').alias('accident_repair'),
                                                   sum('general_repair').alias('general_repair'),
                                                   sum('first_maintance').alias('first_maintance'),
                                                   sum('service_act').alias('service_act'))

    df1=df1.withColumn('claim',when(col('claim')>=1,lit(1)).otherwise(col('claim'))).\
    withColumn('routine_maintenance',when(col('routine_maintenance')>=1,lit(1)).otherwise(col('routine_maintenance'))).\
    withColumn('accessory',when(col('accessory')>=1,lit(1)).otherwise(col('accessory'))).\
    withColumn('accident_repair',when(col('accident_repair')>=1,lit(1)).otherwise(col('accident_repair'))).\
    withColumn('general_repair',when(col('general_repair')>=1,lit(1)).otherwise(col('general_repair'))).\
    withColumn('first_maintance',when(col('first_maintance')>=1,lit(1)).otherwise(col('first_maintance'))).\
    withColumn('service_act',when(col('service_act')>=1,lit(1)).otherwise(col('service_act'))).dropDuplicates()

    print('加入常去维修站号')
    belong_dealer=zhudan.select(['vin','dealer_code','repair_date']).dropDuplicates().groupBy(['vin','dealer_code']).agg(count('repair_date').alias('cnt'))
    belong_dealer=belong_dealer.withColumn('cnt_id',row_number().over(Window.partitionBy('vin').orderBy(desc('cnt'))))
    belong_dealer=belong_dealer.filter(col('cnt_id')==1).drop('cnt','cnt_id').withColumnRenamed('dealer_code','belong_dealer_code')

    zhudandf=df1.join(belong_dealer,on='vin',how='left')
    shoubao=zhudandf.filter(col('first_maintance')==1).withColumn('time_id',row_number().over(Window.partitionBy('vin').orderBy(asc('repair_date'))))
    shoubao=shoubao.filter(col('time_id')==1).drop('time_id')
    feishoubao=zhudandf.filter(col('first_maintance')!=1)
    
    zhudandf=shoubao.unionAll(feishoubao)
    
    return zhudandf,belong_dealer

In [6]:
zhudandf,belong_dealer=load_zhudan()

zhudandf=zhudandf.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
zhudandf=broadcast(zhudandf)

belong_dealer=belong_dealer.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
belong_dealer=broadcast(belong_dealer)  

加入常去维修站号


#### <font color=red>2.读取customer info1

In [7]:
def customer_info():
    cust1=spark.sql('select * from clms.dws_customer_base_info')
    cols=['vin', 'purchase_date', 'ies_name','gender_code', 'customer_type']
    cust1=cust1.select(cols).withColumn('purchase_date',to_date(col('purchase_date'))).dropDuplicates()
    cust1=cust1.withColumn('buy_rank',row_number().over(Window.partitionBy('vin').orderBy(asc('purchase_date'))))
    cust1=cust1.filter(col('buy_rank')==1).drop('buy_rank')
    cust1=cust1.withColumn('customer_type',when(col('customer_type')=='P',lit('个人')).\
                           when(col('customer_type')=='O',lit('公司')).otherwise(lit('个人')))
    cust1=cust1.withColumn('gender_code',when(col('gender_code').isin(['1.0','1']),lit('1')).\
                           when(col('gender_code').isin(['2.0','2']),lit('2')).\
                           when(col('gender_code').isin(['0.0','0']),lit('0')).otherwise(lit('1')))
    cust1=cust1.withColumn('ies_name',when(col('ies_name').rlike('SUPERB|昊锐|速派|Superb'),lit('速派')).\
                           when(col('ies_name').rlike('晶锐|Fabia|FABIA'),lit('晶锐')).\
                           when(col('ies_name').rlike('柯迪亚克|Kodiaq|KODIAQ'),lit('柯迪亚克')).\
                           when(col('ies_name').rlike('野帝|YETI|Yeti'),lit('野帝')).\
                           when(col('ies_name').rlike('明锐|Octavia|OCTAVIA'),lit('明锐')).\
                           when(col('ies_name').rlike('昕锐|Rapid|RAPID'),lit('昕锐')).\
                           when(col('ies_name').rlike('柯米克|Kamiq|KAMIQ'),lit('柯米克')).\
                           when(col('ies_name').rlike('柯珞克|柯洛克|Karoq|KAROQ'),lit('柯珞克')).\
                           when(col('ies_name').rlike('昕动'),lit('昕动')).otherwise(lit('其他')))
    cust1=cust1.withColumn('family_name',when(col('ies_name').rlike('柯迪亚克|速派|野帝'),lit(3)).\
                           when(col('ies_name').rlike('明锐|柯米克|柯珞克'),lit(2)).\
                           when(col('ies_name').rlike('昕锐|昕动|晶锐'),lit(1)).otherwise(lit(0)))

    return cust1

In [8]:
cust1=customer_info()
cust1=cust1.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
cust1=broadcast(cust1)
print(cust1.count())
print(cust1.select('vin').dropDuplicates().count())

203717
203717


#### <font color=red>3.读取customer info2

In [9]:
def member_info():
    cust2=spark.sql('select * from clms.dws_skmsm_base_info')
    cols=['vin', 'member_type', 'member_status', 'whether_bind_wechat', 'start_date']
    cust2=cust2.select(cols).dropDuplicates().\
    withColumn('start_date',from_unixtime(unix_timestamp(col('start_date'),'yyyy/M/d'),'yyyy-MM-dd')).\
    withColumn('member_id',row_number().over(Window.partitionBy('vin').orderBy(asc('start_date'))))
    cust2=cust2.filter(col('member_id')==1).drop('member_id')
    cust2=cust2.withColumn('today',to_date(lit(today))).\
    withColumn('member_age',datediff(col('today'),col('start_date'))).\
    withColumn('member_age',col('member_age')/365).\
    withColumn('member_age',ceil('member_age'))
    cust2=cust2.drop('start_date','today')
    return cust2

In [10]:
cust2=member_info()

cust2=cust2.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
cust2=broadcast(cust2)

print(cust2.count())
print(cust2.select('vin').distinct().count())

153750
153750


#### <font color=red>4.合并客户表

In [11]:
def fill_vehicle(vehicle):
    fill_dict={'ies_name':'其他', 'family_name':0,
               'whether_bind_wechat':'否','gender_code':'1','customer_type':'个人',
               'province':'未知','city':'未知','city_class':'未知',
               'member_type':'非会员','member_status':'非会员','member_age':0}

    vehicle=vehicle.withColumn('ies_name',when((col('ies_name').isNull())|(col('ies_name')==''),lit(fill_dict['ies_name'])).otherwise(col('ies_name'))).\
    withColumn('family_name',when((col('family_name').isNull())|(col('family_name')==''),lit(fill_dict['family_name'])).otherwise(col('family_name'))).\
    withColumn('whether_bind_wechat',when((col('whether_bind_wechat').isNull())|(col('whether_bind_wechat')==''),lit(fill_dict['whether_bind_wechat'])).otherwise(col('whether_bind_wechat'))).\
    withColumn('gender_code',when((col('gender_code').isNull())|(col('gender_code')==''),lit(fill_dict['gender_code'])).otherwise(col('gender_code'))).\
    withColumn('customer_type',when((col('customer_type').isNull())|(col('customer_type')==''),lit(fill_dict['customer_type'])).otherwise(col('customer_type'))).\
    withColumn('city',when((col('city').isNull())|(col('city')==''),lit(fill_dict['city'])).otherwise(col('city'))).\
    withColumn('province',when((col('province').isNull())|(col('province')==''),lit(fill_dict['province'])).otherwise(col('province'))).\
    withColumn('city_class',when((col('city_class').isNull())|(col('city_class')==''),lit(fill_dict['city_class'])).otherwise(col('city_class'))).\
    withColumn('member_type',when((col('member_type').isNull())|(col('member_type')==''),lit(fill_dict['member_type'])).otherwise(col('member_type'))).\
    withColumn('member_status',when((col('member_status').isNull())|(col('member_status')==''),lit(fill_dict['member_status'])).otherwise(col('member_status'))).\
    withColumn('member_age',when((col('member_age').isNull())|(col('member_age')==''),lit(fill_dict['member_age'])).otherwise(col('member_age')))
    return vehicle

In [12]:
#加入vin的6个特征
def get_user(df1):
    df1=df1.withColumn('car_body_type',substring('vin',4,1)).\
    withColumn('gearbox',substring('vin',5,1)).withColumn('crew_protection_system',substring('vin',6,1)).\
    withColumn('car_class',substring('vin',7,2)).withColumn('output_year',substring('vin',10,1)).\
    withColumn('assembly_factory',substring('vin',11,1))
    
    df1=df1.withColumn('user_tag',concat_ws(',',col('car_body_type'),col('gearbox'),col('crew_protection_system'),col('car_class'),col('output_year'),col('assembly_factory')))
    return df1

In [13]:
def merge_vehicle(cust1,cust2,belong_dealer):

    vehicle=cust1.join(cust2,on=['vin'],how='outer').join(belong_dealer,on='vin',how='outer')

    print('匹配常去经销商的省份\城市\城市级别信息')
    dealer=spark.sql('select * from clms.dws_dealer_base_info')
    cols=['province', 'city', 'dealer_invoice_code']
    dealer=dealer.select(cols).dropDuplicates().withColumnRenamed('dealer_invoice_code','belong_dealer_code').\
    withColumn('city',regexp_replace('city','市','')).\
    withColumn('city',when(col('city').rlike('黔南'),lit('黔南')).\
                             when(col('city').rlike('恩施'),lit('恩施')).\
                             when(col('city').rlike('湘西'),lit('湘西')).\
                             when(col('city').rlike('黔西南'),lit('黔西南')).\
                             when(col('city').rlike('喀什'),lit('喀什')).\
                             when(col('city').rlike('阿克苏'),lit('阿克苏')).\
                             when(col('city').rlike('大理'),lit('大理')).\
                             when(col('city').rlike('黔东南'),lit('黔东南')).\
                             when(col('city').rlike('凉山'),lit('凉山')).\
                             when(col('city').rlike('楚雄'),lit('楚雄')).\
                             when(col('city').rlike('西双版纳'),lit('西双版纳')).\
                             when(col('city').rlike('文山'),lit('文山')).\
                                 when(col('city').rlike('海西'),lit('海西')).\
                             when(col('city').rlike('巴音郭楞'),lit('巴音郭楞')).\
                             when(col('city').rlike('德宏'),lit('德宏')).\
                             when(col('city').rlike('伊犁哈萨克'),lit('伊犁哈萨克')).\
                             when(col('city').rlike('昌吉'),lit('昌吉')).\
                             when(col('city').rlike('延边'),lit('延边')).\
                             when(col('city').rlike('阿拉善'),lit('阿拉善')).\
                             when(col('city').rlike('兴安盟'),lit('大兴安岭')).otherwise(col('city'))).\
    withColumn('belong_dealer_code',col('belong_dealer_code').cast('Int').cast('String'))

    vehicle=vehicle.join(dealer,on='belong_dealer_code',how='left')

    city=spark.sql('select * from clms.city_info')
    cols=['city1','city_class']
    city=city.select(cols).dropDuplicates()
    city=city.withColumn('city1',when(col('city1').rlike('大兴安岭'),lit('大兴安岭')).\
                         when(col('city1').rlike('延边'),lit('延边')).\
                         when(col('city1').rlike('大理'),lit('大理')).\
                         when(col('city1').rlike('文山'),lit('文山')).\
                         when(col('city1').rlike('德宏'),lit('德宏')).\
                         when(col('city1').rlike('楚雄'),lit('楚雄')).\
                         when(col('city1').rlike('红河'),lit('红河')).\
                         when(col('city1').rlike('西双版纳'),lit('西双版纳')).\
                         when(col('city1').rlike('喀什'),lit('喀什')).\
                         when(col('city1').rlike('巴音郭楞'),lit('巴音郭楞')).\
                         when(col('city1').rlike('昌吉'),lit('昌吉')).\
                         when(col('city1').rlike('阿克苏'),lit('阿克苏')).\
                         when(col('city1').rlike('伊犁哈萨克'),lit('伊犁哈萨克')).\
                         when(col('city1').rlike('黔西南'),lit('黔西南')).\
                         when(col('city1').rlike('黔东南'),lit('黔东南')).\
                         when(col('city1').rlike('黔南'),lit('黔南')).\
                         when(col('city1').rlike('凉山'),lit('凉山')).\
                         when(col('city1').rlike('恩施'),lit('恩施')).\
                         when(col('city1').rlike('湘西'),lit('湘西')).\
                         when(col('city1').rlike('海西'),lit('海西')).\
                         when(col('city1').rlike('阿拉善'),lit('阿拉善')).otherwise(col('city1'))).\
    withColumnRenamed('city1','city').dropDuplicates()

    vehicle=vehicle.join(city,on='city',how='left')
    vehicle=fill_vehicle(vehicle)
    vehicle=get_user(vehicle)
    return vehicle

In [14]:
vehicle=merge_vehicle(cust1,cust2,belong_dealer)

vehicle=vehicle.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
vehicle=broadcast(vehicle)

print(vehicle.count())
print(vehicle.select('vin').distinct().count())
print(vehicle.show(5))

匹配常去经销商的省份\城市\城市级别信息
206870
206870
+----+------------------+-----------------+-------------+--------+-----------+-------------+-----------+-----------+-------------+-------------------+----------+--------+----------+-------------+-------+----------------------+---------+-----------+----------------+------------+
|city|belong_dealer_code|              vin|purchase_date|ies_name|gender_code|customer_type|family_name|member_type|member_status|whether_bind_wechat|member_age|province|city_class|car_body_type|gearbox|crew_protection_system|car_class|output_year|assembly_factory|    user_tag|
+----+------------------+-----------------+-------------+--------+-----------+-------------+-----------+-----------+-------------+-------------------+----------+--------+----------+-------------+-------+----------------------+---------+-----------+----------------+------------+
|  南京|          74309241|LSV2C60Z9HN054328|   2018-06-22|    柯迪亚克|          1|           个人|          3|        非会员|          非会

In [15]:
mis1=get_miss_cols_org(vehicle)
print(mis1)

['belong_dealer_code', 'purchase_date']


In [16]:
cols=vehicle.columns
tongyi_cols=['vin', 'purchase_date', 'ies_name', 'gender_code', 'customer_type', 'family_name', 'member_type',
             'member_status','member_age', 'whether_bind_wechat', 'province','city', 'belong_dealer_code', 'city_class',
             'car_body_type', 'gearbox', 'crew_protection_system', 'car_class', 'output_year', 'assembly_factory', 'user_tag']
same1=[x for x in cols if x not in tongyi_cols]
print(same1)
same2=[x for x in tongyi_cols if x not in cols]
print(same2)

[]
[]


In [17]:
vehicle=vehicle.select(tongyi_cols).dropDuplicates()
print(vehicle.count())
print(vehicle.select('vin').dropDuplicates().count())

206870
206870


In [18]:
vehicle.registerTempTable('vehicle_reg')
spark.sql('drop table if exists clms.skd_vehicle')
spark.sql('create table if not exists clms.skd_vehicle (vin string,purchase_date date,ies_name string,\
gender_code string,customer_type string,family_name int,member_type string,member_status string,member_age int,\
whether_bind_wechat string,province string,city string,belong_dealer_code string,city_class string,\
car_body_type string,gearbox string,crew_protection_system string,car_class string,output_year string,\
assembly_factory string,user_tag string)')
spark.sql('insert overwrite table clms.skd_vehicle select * from vehicle_reg')

DataFrame[]

In [19]:
print('检查字段')
check_cols=['ies_name', 'gender_code', 'customer_type', 'family_name', 'member_type','member_status',
            'member_age', 'whether_bind_wechat', 'province','city', 'belong_dealer_code', 'city_class',
            'car_body_type', 'gearbox', 'crew_protection_system', 'car_class', 'output_year', 'assembly_factory', 'user_tag']
for x in check_cols:
    print(x)
    print([y[0] for y in vehicle.select(x).distinct().collect()])
    print('-'*30)

检查字段
ies_name
['晶锐', '野帝', '柯米克', '昕锐', '明锐', '柯珞克', '速派', '其他', '柯迪亚克', '昕动']
------------------------------
gender_code
['0', '1', '2']
------------------------------
customer_type
['公司', '个人']
------------------------------
family_name
[1, 3, 2, 0]
------------------------------
member_type
['非会员', '钻石卡', '金卡', '绿卡']
------------------------------
member_status
['非会员', '冻结', '有效', '注销']
------------------------------
member_age
[0, 7, 6, 9, 5, 1, 10, 3, 12, 8, 11, 2, 4]
------------------------------
whether_bind_wechat
['否', '是']
------------------------------
province
['未知', '河南', '江苏', '山东', '上海', '浙江']
------------------------------
city
['南京', '无锡', '未知', '苏州', '杭州', '嘉兴', '德州', '郑州', '上海']
------------------------------
belong_dealer_code
['74308081', '74309021', '74309241', '74309251', None, '74310031', '74310291', '74314141', '74308161', '74319141']
------------------------------
city_class
['二线城市', '未知', '四线城市', '一线城市', '二线发达城市']
------------------------------
car_body_type

#### <font color=red>5.合并主单和用户自身特征

In [20]:
#原先的revise_type是没有清洗过的,所以用清洗的数据拼接每一次进店的修理类型
def concat_revise_type(a,b,c,d,e,f,g):
    lst=[]
    if a==1.0:
        lst.append('claim')
    if b==1.0:
        lst.append('routine_maintenance')
    if c==1.0:
        lst.append('accessory')
    if d==1.0:
        lst.append('accident_repair')
    if e==1.0:
        lst.append('general_repair')
    if f==1.0:
        lst.append('first_maintance')
    if g==1.0:
        lst.append('service_act')
    return ','.join(lst)

In [21]:
def merge_zhudan_veh(zhudandf,vehicle):
    veh1=vehicle.drop('belong_dealer_code')

    df=zhudandf.join(veh1,on=['vin'],how='left').\
    withColumn('repair_date',to_date(col('repair_date'))).\
    withColumn('purchase_date',to_date(col('purchase_date')))

    df=df.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
    df=broadcast(df)

    print(df.count())
    print(df.select('vin','repair_date').dropDuplicates().count())

    org_vin=df.select('vin').distinct().count()
    print('原始数据中,一共有多少个vin:',org_vin)

    df1=df.filter(col('repair_date')>col('purchase_date'))
    print(df1.count())

    new_vin=df1.select('vin').distinct().count()
    print('去掉购车时间比修理日期还要晚的数据以后,一共有多少个vin:',new_vin)

    print('相当于有%d的vin的购车时间比修理日期还要晚,并且被直接清洗掉了的'%(org_vin-new_vin))
    
    the_udf=udf(lambda a,b,c,d,e,f,g:concat_revise_type(a,b,c,d,e,f,g))
    print('其中有部分的数据是没有repair_type,因为pattern不完整的原因,不过还好都是索赔和内部车辆维修的,因此不做进一步处理了')
    df1=df1.withColumn('repair_type',the_udf(col('claim'),col('routine_maintenance'),col('accessory'),\
                                               col('accident_repair'),col('general_repair'),\
                                               col('first_maintance'),col('service_act')))
    
    df1=df1.dropDuplicates()
    print('开始baoyang数据trick清洗')
    tmp=df1.select('vin','repair_date','mile').dropDuplicates().groupBy('vin').agg(count('repair_date').alias('baoyang_times'))
    df1=df1.join(tmp,on=['vin'],how='left')

    return df1

#### <font color=red>6.提取保养数据,并进行清洗

In [22]:
df1=merge_zhudan_veh(zhudandf,vehicle)

df1=df1.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
df1=broadcast(df1)

832449
832449
原始数据中,一共有多少个vin: 168639
815739
去掉购车时间比修理日期还要晚的数据以后,一共有多少个vin: 165067
相当于有3572的vin的购车时间比修理日期还要晚,并且被直接清洗掉了的
其中有部分的数据是没有repair_type,因为pattern不完整的原因,不过还好都是索赔和内部车辆维修的,因此不做进一步处理了
开始baoyang数据trick清洗


In [23]:
def rebuild_data(df1):
    baoyang=df1.filter((col('first_maintance')==1)|(col('routine_maintenance')==1))
    nobaoyang=df1.filter((col('first_maintance')!=1)&(col('routine_maintenance')!=1))
#     print(baoyang.count()+nobaoyang.count()==df1.count())

    print('1.baoyang中取出购车时间>=2013/1/1的')
    buy_start='2013-01-01'
    baoyang=baoyang.withColumn('buy_start',to_date(lit(buy_start)))
    baoyang_model_1=baoyang.filter(col('purchase_date')>=col('buy_start')).drop('buy_start')
#     shoubao_vin=baoyang_model.filter(col('first_maintance')==1).select('vin').distinct().withColumn('have_shoubao',lit(1))
#     baoyang_model=baoyang_model.join(shoubao_vin,on='vin',how='left')
#     del shoubao_vin
#     baoyang_model=baoyang_model.withColumn('have_shoubao',when(col('have_shoubao').isNull(),lit(0)).otherwise(col('have_shoubao')))
#     baoyang_model_1=baoyang_model.filter(col('have_shoubao')==1).drop('have_shoubao')
    
    a=baoyang.select('vin').distinct().count()
    print('有多少vin有做过保养:',a)
    b=baoyang_model_1.select('vin').distinct().count()
    print('有多少vin有首保:',b)
    print('对应占比是:',b/a)
    baoyang_model_1=baoyang_model_1.dropDuplicates()
    baoyang_model_1=baoyang_model_1.withColumn('last_repair_date',lag(col('repair_date'),-1).over(Window.partitionBy('vin').orderBy(desc('repair_date')))).\
    withColumn('last_mile',lag(col('mile'),-1).over(Window.partitionBy('vin').orderBy(desc('repair_date')))).\
    withColumn('last_repair_date',when(col('last_repair_date').isNull(),col('purchase_date')).otherwise(col('last_repair_date'))).\
    withColumn('last_mile',when(col('last_mile').isNull(),lit(0)).otherwise(col('last_mile'))).\
    withColumn('daydiff',datediff(col('repair_date'),col('last_repair_date'))).\
    withColumn('milediff',col('mile')-col('last_mile'))

    baoyang_model_1=baoyang_model_1.filter(col('daydiff')>7.0)
    
    if 'baoyang_times' in baoyang_model_1.columns:
        baoyang_model_1=baoyang_model_1.drop('baoyang_times')
    tmp=baoyang_model_1.select('vin','repair_date').dropDuplicates().groupBy('vin').agg(count('repair_date').alias('baoyang_times'))
    baoyang_model_1=baoyang_model_1.join(tmp,on=['vin'],how='left')

    df11=baoyang_model_1.filter(col('baoyang_times')==1)
    df11=df11.filter(col('daydiff')>7)
    df11=df11.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
    df11=broadcast(df11)

    df22=baoyang_model_1.filter(col('baoyang_times')==2)
    tmp=df22.select('vin','repair_date','daydiff').dropDuplicates().groupBy('vin').agg(mean('daydiff')).withColumnRenamed('avg(daydiff)','daydiff_habit')
    df22=df22.join(tmp,on=['vin'],how='left').withColumn('right_habit',col('daydiff_habit')*0.5)
    df22=df22.filter(col('daydiff')>=col('right_habit'))
    df22=df22.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
    df22=broadcast(df22)

    df33=baoyang_model_1.filter(col('baoyang_times')>=3)
    df33=df33.filter(col('daydiff')>14)
    df33=df33.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
    df33=broadcast(df33)

    df11=df11.drop('baoyang_times')
    df22=df22.drop('right_habit','daydiff_habit','baoyang_times')
    df33=df33.drop('baoyang_times')
    
    same=[x for x in [y for y in df11.columns if y in df22.columns] if x in df33.columns]
    baoyang_model_2=df11.select(same).unionAll(df22.select(same)).unionAll(df33.select(same))
    baoyang_model_2=baoyang_model_2.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
    baoyang_model_2=broadcast(baoyang_model_2)
    
    del df11
    del df22
    del df33
    del tmp
    del a
    del b
    
    wrong_mile_vin=baoyang_model_2.filter(col('milediff')<=0).select('vin').dropDuplicates().withColumn('wrong_mile_vin',lit(1))
    print('公里数差<=0的vin的脏数据有:',wrong_mile_vin.count())

    baoyang_model_2=baoyang_model_2.join(wrong_mile_vin,on=['vin'],how='left').\
    withColumn('wrong_mile_vin',when(col('wrong_mile_vin').isNull(),lit(0)).otherwise(col('wrong_mile_vin')))

    print('取出没有公里数差的数据')
    baoyang_model_2=baoyang_model_2.filter(col('wrong_mile_vin')==0)
    
    print('取出个人用户')
    baoyang_model_2=baoyang_model_2.filter(col('customer_type')=='个人')
    baoyang_model_2=baoyang_model_2.drop('wrong_mile_vin','last_repair_date','last_mile',
                                     'baoyang_times','repair_year', 'daydiff', 'milediff').dropDuplicates()
    
    baoyang_model_vin=baoyang_model_2.select('vin').distinct().withColumn('in_baoyang_model',lit(1))

    baoyang_rule=baoyang.join(baoyang_model_vin,on=['vin'],how='left')
    baoyang_rule=baoyang_rule.withColumn('in_baoyang_model',when(col('in_baoyang_model').isNull(),lit(0)).otherwise(col('in_baoyang_model')))
    baoyang_rule=baoyang_rule.filter(col('in_baoyang_model')==0).drop('in_baoyang_model')
    
    del baoyang_model_vin
    del wrong_mile_vin
    
    print('(1)其中baoyang_model需要重新run_last_run')
    print('(2)其中baoyang_rule需要进行同样的天数清洗')
    print('(3)其中nobaoyang仅仅代表是没有保养的数据,其中的vin可能有保养记录,也可能没有保养记录')
    
    return baoyang_model_2,baoyang_rule,nobaoyang

In [24]:
baoyang_model,baoyang_rule,nobaoyang=rebuild_data(df1)

baoyang_model=baoyang_model.withColumn('mark',lit('模型预测'))
baoyang_rule=baoyang_rule.withColumn('mark',lit('规则预测'))
nobaoyang=nobaoyang.withColumn('mark',lit('非保养数据'))

baoyang_model=baoyang_model.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
baoyang_model=broadcast(baoyang_model)
    
baoyang_rule=baoyang_rule.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
baoyang_rule=broadcast(baoyang_rule)

nobaoyang=nobaoyang.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
nobaoyang=broadcast(nobaoyang)

1.baoyang中取出购车时间>=2013/1/1的
有多少vin有做过保养: 147334
有多少vin有首保: 111163
对应占比是: 0.7544965859882987
公里数差<=0的vin的脏数据有: 400
取出没有公里数差的数据
取出个人用户
(1)其中baoyang_model需要重新run_last_run
(2)其中baoyang_rule需要进行同样的天数清洗
(3)其中nobaoyang仅仅代表是没有保养的数据,其中的vin可能有保养记录,也可能没有保养记录


In [25]:
same=[x for x in [y for y in baoyang_model.columns if y in baoyang_rule.columns] if x in nobaoyang.columns]
fin_zhudan=baoyang_model.select(same).unionAll(baoyang_rule.select(same)).unionAll(nobaoyang.select(same))

In [26]:
tongyi_cols=['vin','repair_date','mile','repair_amount','repair_amount_pre_discount','labor_fee',
             'part_fee','claim','routine_maintenance','accessory','accident_repair','general_repair',
             'first_maintance','service_act','belong_dealer_code','city','customer_type',
             'purchase_date','ies_name','province','family_name','city_class','member_type',
             'whether_bind_wechat','car_body_type','gearbox','crew_protection_system','car_class',
             'output_year','assembly_factory','user_tag','repair_type','mark',
             'gender_code', 'member_status', 'member_age']
cols=fin_zhudan.columns
same1=[x for x in tongyi_cols if x not in cols]
print(same1)
same2=[x for x in cols if x not in tongyi_cols]
print(same2)

[]
[]


In [27]:
fin_zhudan=fin_zhudan.select(tongyi_cols).dropDuplicates()

In [28]:
print(fin_zhudan.count())
print(fin_zhudan.show(3))

811821
+-----------------+-----------+-----+-------------+--------------------------+---------+--------+-----+-------------------+---------+---------------+--------------+---------------+-----------+------------------+----+-------------+-------------+--------+--------+-----------+----------+-----------+-------------------+-------------+-------+----------------------+---------+-----------+----------------+------------+--------------------+----+-----------+-------------+----------+
|              vin|repair_date| mile|repair_amount|repair_amount_pre_discount|labor_fee|part_fee|claim|routine_maintenance|accessory|accident_repair|general_repair|first_maintance|service_act|belong_dealer_code|city|customer_type|purchase_date|ies_name|province|family_name|city_class|member_type|whether_bind_wechat|car_body_type|gearbox|crew_protection_system|car_class|output_year|assembly_factory|    user_tag|         repair_type|mark|gender_code|member_status|member_age|
+-----------------+-----------+-----+

In [29]:
fin_zhudan.registerTempTable('fin_zhudan_reg')
spark.sql('drop table if exists clms.skd_zhudan')
spark.sql('create table if not exists clms.skd_zhudan (vin string, repair_date date, mile double,\
repair_amount double,repair_amount_pre_discount double, labor_fee double, part_fee double, claim double,\
routine_maintenance double,accessory double, accident_repair double, general_repair double,\
first_maintance double, service_act double,belong_dealer_code string, city string, customer_type string,\
purchase_date date,ies_name string, province string, family_name int, city_class string, member_type string,\
whether_bind_wechat string, car_body_type string, gearbox string, crew_protection_system string,\
car_class string, output_year string, assembly_factory string, user_tag string, repair_type string,\
mark string,gender_code string,member_status string,member_age int)')
spark.sql('insert overwrite table clms.skd_zhudan select * from fin_zhudan_reg')

DataFrame[]

#### <font color=blue>3.统一特征工程以后的baoyang_model数据保存

In [30]:
# baoyang_model=spark.sql('select * from clms.skd_baoyang_model')
# print(baoyang_model.show(5))

In [31]:
print(baoyang_model.columns)

['vin', 'repair_date', 'mile', 'repair_amount', 'repair_amount_pre_discount', 'labor_fee', 'part_fee', 'claim', 'routine_maintenance', 'accessory', 'accident_repair', 'general_repair', 'first_maintance', 'service_act', 'belong_dealer_code', 'purchase_date', 'ies_name', 'gender_code', 'customer_type', 'family_name', 'member_type', 'member_status', 'member_age', 'whether_bind_wechat', 'province', 'city', 'city_class', 'car_body_type', 'gearbox', 'crew_protection_system', 'car_class', 'output_year', 'assembly_factory', 'user_tag', 'repair_type', 'mark']


In [32]:
print('重新清洗了数据,所以需要重新跑last_run')
if 'baoyang_times' in baoyang_model.columns:
    baoyang_model=baoyang_model.drop('baoyang_times')
tmp=baoyang_model.select('vin','repair_date').dropDuplicates().groupBy('vin').agg(count('repair_date').alias('baoyang_times'))
baoyang_model=baoyang_model.join(tmp,on=['vin'],how='left')

if 'baoyang_id' in baoyang_model.columns:
    baoyang_model=baoyang_model.drop('baoyang_id')
baoyang_model=baoyang_model.withColumn('baoyang_id',row_number().over(Window.partitionBy('vin').orderBy(asc('repair_date'))))

baoyang_model=baoyang_model.withColumn('last_repair_date',lag(col('repair_date'),-1).over(Window.partitionBy('vin').orderBy(desc('repair_date'))))
baoyang_model=baoyang_model.withColumn('last_mile',lag(col('mile'),-1).over(Window.partitionBy('vin').orderBy(desc('repair_date'))))
baoyang_model=baoyang_model.withColumn('last_repair_date',when(col('last_repair_date').isNull(),col('purchase_date')).otherwise(col('last_repair_date')))
baoyang_model=baoyang_model.withColumn('last_mile',when(col('last_mile').isNull(),lit(0)).otherwise(col('last_mile')))
baoyang_model=baoyang_model.withColumn('daydiff',datediff(col('repair_date'),col('last_repair_date')))
baoyang_model=baoyang_model.withColumn('milediff',col('mile')-col('last_mile'))
baoyang_model=baoyang_model.withColumn('dayofmile',col('milediff')/col('daydiff'))

baoyang_model=baoyang_model.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
baoyang_model=broadcast(baoyang_model)

重新清洗了数据,所以需要重新跑last_run


In [33]:
baoyang_model=baoyang_model.withColumn('repair_daydiff',datediff(col('repair_date'),col('purchase_date'))).\
withColumn('repair_milediff',col('mile')-0).\
withColumn('repair_dayofmile',col('repair_milediff')/col('repair_daydiff')).\
withColumn('today',to_date(lit(today))).\
withColumn('buy_year',year('purchase_date')).\
withColumn('buy_mon',month('purchase_date')).\
withColumn('car_age_bin',ceil((datediff(col('today'),col('purchase_date')))/180)).\
withColumn('buy_id',(col('buy_year')-2013)*12+col('buy_mon')).\
withColumn('year',year(col('repair_date'))).\
withColumn('mon',month(col('repair_date'))).\
withColumn('day',dayofmonth(col('repair_date'))).\
withColumn('xun',when(col('day').isin([1,2,3,4,5,6,7,8,9,10]),lit(1)).\
           when(col('day').isin([11,12,13,14,15,16,17,18,19,20]),lit(2)).otherwise(lit(3))).\
withColumn('dayofweek',date_format('repair_date','u')).\
withColumn('dayofweek',when(col('dayofweek')=='1',lit(1)).when(col('dayofweek')=='2',lit(2)).\
           when(col('dayofweek')=='3',lit(3)).when(col('dayofweek')=='4',lit(4)).\
           when(col('dayofweek')=='5',lit(5)).when(col('dayofweek')=='6',lit(6)).\
           when(col('dayofweek')=='7',lit(7)).otherwise(lit(0))).\
withColumn('is_workday',when(col('dayofweek').isin([1,2,3,4,5]),lit(1)).otherwise(lit(0)))

# baoyang_model=baoyang_model.withColumn('city_class',when(col('city_class')=='三线城市',lit(4)).\
#                                        when(col('city_class')=='一线城市',lit(7)).\
#                                        when(col('city_class')=='二线发达城市',lit(6)).\
#                                        when(col('city_class')=='二线城市',lit(5)).\
#                                        when(col('city_class')=='四线城市',lit(3)).\
#                                        when(col('city_class')=='六线城市',lit(1)).\
#                                        when(col('city_class')=='五线城市',lit(2)).\
#                                        when(col('city_class')=='未知',lit(0)).otherwise(lit(0)))

# print([x[0] for x in baoyang_model.select('city_class').distinct().collect()])

baoyang_model=baoyang_model.withColumn('next_mile',lag(col('mile'),1).over(Window.partitionBy('vin').orderBy(desc('repair_date'))))
baoyang_model=baoyang_model.withColumn('next_repair_date',lag(col('repair_date'),1).over(Window.partitionBy('vin').orderBy(desc('repair_date')))).\
withColumn('whether_discount',col('repair_amount')-col('repair_amount_pre_discount')).\
withColumn('whether_discount',when(col('whether_discount')<=-10,lit(1)).otherwise(lit(0))).\
withColumn('dayofweek',col('dayofweek').cast('Int'))

In [34]:
baoyang_model=baoyang_model.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
baoyang_model=broadcast(baoyang_model)

In [35]:
tongyi_cols=['vin','repair_date','mile','repair_amount','repair_amount_pre_discount','labor_fee','part_fee',
             'claim','routine_maintenance','accessory','accident_repair','general_repair','first_maintance',
             'service_act','belong_dealer_code','city','purchase_date',
             'ies_name','province','family_name' ,'city_class','member_type','whether_bind_wechat',
             'car_body_type','gearbox','crew_protection_system','car_class','output_year','assembly_factory',
             'user_tag','repair_type','baoyang_times','last_repair_date','last_mile' ,'daydiff',
             'milediff','dayofmile','baoyang_id', 'repair_daydiff','repair_milediff',
             'repair_dayofmile', 'today','buy_year','buy_mon','car_age_bin' ,'buy_id','year','mon','day',
             'dayofweek','xun','is_workday','next_mile','next_repair_date','whether_discount',
            'gender_code', 'customer_type', 'member_status', 'member_age']
cols=baoyang_model.columns
same1=[x for x in tongyi_cols if x not in cols]
print(same1)
same2=[x for x in cols if x not in tongyi_cols]
print(same2)

[]
['mark']


In [36]:
baoyang_model=baoyang_model.select(tongyi_cols).dropDuplicates()

In [37]:
print(baoyang_model.count())
print(baoyang_model.show(5))

318261
+-----------------+-----------+-----+-------------+--------------------------+---------+--------+-----+-------------------+---------+---------------+--------------+---------------+-----------+------------------+----+-------------+--------+--------+-----------+----------+-----------+-------------------+-------------+-------+----------------------+---------+-----------+----------------+------------+--------------------+-------------+----------------+---------+-------+--------+------------------+----------+--------------+---------------+-----------------+----------+--------+-------+-----------+------+----+---+---+---------+---+----------+---------+----------------+----------------+-----------+-------------+-------------+----------+
|              vin|repair_date| mile|repair_amount|repair_amount_pre_discount|labor_fee|part_fee|claim|routine_maintenance|accessory|accident_repair|general_repair|first_maintance|service_act|belong_dealer_code|city|purchase_date|ies_name|province|family

In [38]:
baoyang_model.registerTempTable('baoyang_model_fin_reg')
spark.sql('drop table if exists clms.skd_baoyang_model')
spark.sql('create table if not exists clms.skd_baoyang_model (vin string, repair_date date, mile double,\
repair_amount double,repair_amount_pre_discount double, labor_fee double, part_fee double, claim double,\
routine_maintenance double,accessory double, accident_repair double, general_repair double,\
first_maintance double, service_act double,belong_dealer_code string, city string, purchase_date date,\
ies_name string, province string, family_name int, city_class string, member_type string,\
whether_bind_wechat string, car_body_type string, gearbox string, \
crew_protection_system string,car_class string, output_year string, assembly_factory string, user_tag string, \
repair_type string, baoyang_times double, last_repair_date date, last_mile double, daydiff double,\
milediff double,dayofmile double, baoyang_id int,repair_daydiff double,\
repair_milediff double, repair_dayofmile double, today date,buy_year int,\
buy_mon int,car_age_bin double,buy_id int,year int,mon int,day int,dayofweek int,\
xun int,is_workday int,next_mile double, next_repair_date date,whether_discount int,\
gender_code string, customer_type string, member_status string, member_age int)')
spark.sql('insert overwrite table clms.skd_baoyang_model select * from baoyang_model_fin_reg')

DataFrame[]

In [39]:
# baoyangdff=baoyang_model.toPandas()
# baoyangdff.to_excel('baoyang_model_engine.xlsx',index=False,encoding='gbk')

In [40]:
print((time.time()-ft)/60)

6.745045288403829
