In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.types import *
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import pickle
import math
import json
import re
from tqdm import tqdm
#conf = SparkConf().setAppName("baogong toturial") 
conf = SparkConf().setAppName("baogong toturial") \
    .set("spark.driver.maxResultSize", "16g") \
    .set('spark.driver.memory','32g') \
    .set("spark.sql.parquet.binaryAsString", "true") \
    .set("spark.files.overwrite", "true") \
    .set("spark.akka.frameSize", "60") \
    .set("spark.hadoop.validateOutputSpecs", "false") \
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .set("spark.executor.extraJavaOptions", "-XX:+PrintGCDetails -XX:+PrintGCTimeStamps") \
    .set("spark.storage.memoryFraction", "0.8") \
    .set("spark.kryoserializer.buffer.max", "2000m")

sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
#sqlContext.setConf("fs.defaultFS", "hdfs://mgjcluster")
sqlContext.sql("set spark.sql.hive.convertMetastoreOrc=true")
sqlContext.sql("set hive.exec.orc.split.strategy=ETL")


import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Layer
from tensorflow.keras.regularizers import l2

from sklearn.model_selection import train_test_split

  from ._conv import register_converters as _register_converters


In [2]:
def get_white_data(date1, date2):
    sql = """
        select *
        from baogong_uuid_white_simulator_url
        --tablesample(bucket 1 out of 8 on rand())
        where visit_date>='{}'
        and visit_date<='{}'
        --limit 500000
    """.format(date1, date2)
    data = sqlContext.sql(sql).toPandas()
    return data

def get_black_data(date1, date2):
    sql = """
        select *
        from baogong_uuid_black_simulator_url
        where visit_date>='{}'
        and visit_date<='{}'
    """.format(date1, date2)
    data = sqlContext.sql(sql).toPandas()
    return data

def get_data(date1, date2):
    sql = """
        select uuid_did, words, times
        from baogong_uuid_sequence_deeplearning_page_data_app
        where visit_date>='{}'
        and visit_date<='{}'
    """.format(date1, date2)
    data = sqlContext.sql(sql).toPandas()
    return data

In [3]:
words_dic = np.load('./words_dic.npy').item()
words_dic['UNK'] = len(words_dic)+1
words_dic['UNKOWN'] = 0 #masking

white_data = get_white_data('2019-09-25', '2019-09-25')
black_data = get_black_data('2019-06-01', '2019-09-25')

In [4]:
white_data['label'] = 0
black_data['label'] = 1
data = pd.concat([white_data, black_data], axis = 0, ignore_index=True)

data['len'] = data['words'].apply(lambda x: len(x.split('::')))

data1 = data[data['len']>=5]
data1['label'].value_counts()

data_deep = data1[['words', 'times', 'label']]
data_deep['times'] = data_deep['times'].fillna('0')
data_deep = data_deep.fillna('UNK')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  # This is added back by InteractiveShellApp.init_path()


In [5]:
def time_diff_parse(times):
    time = list(map(lambda x: int(x), times))
    head_time = time[:-1]
    tail_time = time[1:]
    time_diff = []
    for head, tail in zip(head_time, tail_time):
        time_diff.append(head - tail)
    time_diff.append(999)
    time_diff_result = []
    for diff in time_diff:
        if diff <= 0:
            time_diff_result.append(1)  #与padding 0做区分
        elif diff < 2:
            time_diff_result.append(2)
        elif diff < 4:
            time_diff_result.append(3)
        elif diff < 8:
            time_diff_result.append(4)
        elif diff < 16:
            time_diff_result.append(5)
        elif diff < 32:
            time_diff_result.append(6)
        elif diff < 64:
            time_diff_result.append(7)
        elif diff < 128:
            time_diff_result.append(8)
        elif diff < 1000:
            time_diff_result.append(9)
        else:
            time_diff_result.append(10)
    return time_diff_result

def get_convert_data(Data, max_len, low_len, words_dic):
    X_url = []
    X_time = []
    length = []
    for i in tqdm(range(Data.shape[0])):
        lines = Data.iloc[i, :]
        urls, times = lines[0].split('::'), lines[1].split('::')

        times = time_diff_parse(times)
        
        l = len(urls)
        #  对数据进行 补长 去短
        #assert len(urls) == len(times)
        if len(urls) > max_len:
            urls = urls[:max_len]
            times = times[:max_len]
        else:
            urls = urls + (max_len - len(urls)) * ['UNKOWN']  #UNKOWN->0做mask，UNK->len做未知
            times = times + (max_len - len(times)) * [0]  
        url_id = list(map(lambda x: words_dic[x] if x in words_dic else words_dic['UNK'], urls))
        time_id = times#time_diff_parse(times)
        url_id = url_id if len(url_id)==max_len else url_id + [words_dic['UNK']]*(max_len-len(url_id))
        time_id = time_id if len(time_id)==max_len else time_id + [0]*(max_len-len(time_id))
        assert len(url_id) == len(time_id) == max_len

        X_url.append(url_id)
        X_time.append(time_id)
        length.append(l)
    return X_url, X_time

In [6]:
X_url, X_time = get_convert_data(data_deep, 200, 1, words_dic)

url_train, url_test = train_test_split(np.array(X_url), random_state=11)
time_train, time_test = train_test_split(np.array(X_time), random_state=11)
y_train, y_test = train_test_split(data_deep['label'].values, random_state=11)

x_train_set_all = {"words": url_train,
                   "times": time_train,
                   }
x_test_set_all = {"words": url_test,
                  "times": time_test,
                  }

100%|██████████| 1714635/1714635 [11:43<00:00, 2436.88it/s]


In [7]:
class layerNormalization(Layer):
    def __init__(self, l2_rate, epsilon = 1e-8, **kwargs):
        self.l2_rate = l2_rate
        self.epsilon = epsilon
        super(layerNormalization, self).__init__(**kwargs)

    def build(self, input_shape):
        input_dim = input_shape[-1]
        self.w = self.add_weight(name='kernel',
                                        shape=(input_dim,),
                                        initializer='glorot_uniform',
                                        regularizer=l2(self.l2_rate),
                                        trainable=True)
        self.b = self.add_weight(name='bias',
                                     shape=(input_dim,),
                                     initializer='Zeros',
                                     trainable=True)

        super(layerNormalization, self).build(input_shape)
        
    def call(self, inputs, **kwargs):
        x0 = inputs
        mean, variance = tf.nn.moments(inputs, axes=[-1], keepdims=True)
        normalized = (inputs - mean) / ( (variance + self.epsilon) ** (.5) )
        outputs = self.w * normalized + self.b
        return outputs
    
class positionalEncoding(Layer):
    """
    inputs: 3d tensor. (N, T, E)
    maxlen: scalar. Must be >= T
    masking: Boolean. If True, padding positions are set to zeros.
    returns
    3d tensor that has the same shape as inputs.
    """
    def __init__(self, maxlen, masking=True, **kwargs):
        self.maxlen = maxlen
        self.masking = masking
        super(positionalEncoding, self).__init__(**kwargs)

    def build(self, input_shape):

        super(positionalEncoding, self).build(input_shape)
        
    def call(self, inputs, **kwargs):
        E = inputs.get_shape().as_list()[-1]
        N, T = tf.shape(inputs)[0], tf.shape(inputs)[1] # dynamic
        # position indices
        position_ind = tf.tile(tf.expand_dims(tf.range(T), 0), [N, 1]) # (N, T)

        # First part of the PE function: sin and cos argument
        position_enc = np.array([
            [pos / np.power(10000, (i-i%2)/E) for i in range(E)]
            for pos in range(self.maxlen)])

        # Second part, apply the cosine to even columns and sin to odds.
        position_enc[:, 0::2] = np.sin(position_enc[:, 0::2])  # dim 2i
        position_enc[:, 1::2] = np.cos(position_enc[:, 1::2])  # dim 2i+1
        position_enc = tf.convert_to_tensor(position_enc, tf.float32) # (maxlen, E)

        # lookup
        outputs = tf.nn.embedding_lookup(position_enc, position_ind)

        # masks
        if self.masking:
            outputs = tf.where(tf.equal(inputs, 0), inputs, outputs)
        return tf.dtypes.cast(outputs, tf.float32)
    
#masks = tf.math.equal(x, 0)
def mask(inputs, key_masks=None, type=None):
    """
    https://github.com/Kyubyong/transformer/blob/master/modules.py
    """
    padding_num = -2 ** 32 + 1
    if type in ("k", "key", "keys"):
        key_masks = tf.dtypes.cast(key_masks, tf.float32)
        key_masks = tf.tile(key_masks, [tf.shape(inputs)[0] // tf.shape(key_masks)[0], 1]) # (h*N, seqlen)
        key_masks = tf.expand_dims(key_masks, 1)  # (h*N, 1, seqlen)
        outputs = inputs + key_masks * padding_num
    elif type in ("f", "future", "right"):
        diag_vals = tf.ones_like(inputs[0, :, :])  # (T_q, T_k)
        tril = tf.linalg.LinearOperatorLowerTriangular(diag_vals).to_dense()  # (T_q, T_k)
        future_masks = tf.tile(tf.expand_dims(tril, 0), [tf.shape(inputs)[0], 1, 1])  # (N, T_q, T_k)

        paddings = tf.ones_like(future_masks) * padding_num
        outputs = tf.where(tf.equal(future_masks, 0), paddings, inputs)#True的都填充了paddings，false填充了inputs
    else:
        print("Check if you entered type correctly!")
        
    return outputs

def scaled_dot_product_attention(Q, K, V, masks, key_masks,
                                 causality=False, dropout_rate=0.,
                                 training=True):
    """"
    Q: Packed queries. 3d tensor. [N, T_q, d_k].
    K: Packed keys. 3d tensor. [N, T_k, d_k].
    V: Packed values. 3d tensor. [N, T_k, d_v]
    """
    dk = Q.get_shape().as_list()[-1]
    outputs = tf.matmul(Q, tf.transpose(K, (0, 2, 1)))/dk**0.5
    if masks:
        outputs = mask(outputs, key_masks=key_masks, type="key")
    # causality or future blinding masking
    if causality:
        outputs = mask(outputs, type="future")
    
    # softmax
    outputs = tf.nn.softmax(outputs) 
    # dropout
    outputs = keras.layers.Dropout(rate=dropout_rate)(outputs)

    # weighted sum (context vectors)
    outputs = tf.matmul(outputs, V)  # (N, T_q, d_v)
    return outputs

class mutiHeadAttention(Layer):
    def __init__(self, masks, causality, dropout_rate, training, num_heads, size_per_head, **kwargs):
        self.masks = masks
        #self.key_masks = key_masks
        self.causality = causality
        self.dropout_rate = dropout_rate
        self.training = training
        self.num_heads = num_heads
        self.size_per_head = size_per_head
 
        super(mutiHeadAttention, self).__init__(**kwargs)
        
    def build(self, input_shape):
        input_shape = self.num_heads * self.size_per_head
        
        self.q_dense = keras.layers.Dense(input_shape, use_bias=True)
        self.k_dense = keras.layers.Dense(input_shape, use_bias=True)
        self.v_dense = keras.layers.Dense(input_shape, use_bias=True)
        
        super(mutiHeadAttention, self).build(input_shape)
    
    def call(self, inputs):
        q, k, v , input1 = inputs
        
        if self.masks:
            key_masks = tf.equal(input1, 0)
        else:
            key_masks = None
        Q = self.q_dense(q)
        K = self.k_dense(k)
        V = self.v_dense(v)
        
        Q_ = tf.concat(tf.split(Q, self.num_heads, axis=2), axis=0) # (num_heads*N, T_q, size_per_head)
        K_ = tf.concat(tf.split(K, self.num_heads, axis=2), axis=0) # (num_heads*N, T_k, size_per_head)
        V_ = tf.concat(tf.split(V, self.num_heads, axis=2), axis=0) # (num_heads*N, T_k, size_per_head)
        
        # Attention
        outputs = scaled_dot_product_attention(Q_, K_, V_, self.masks, key_masks, 
                                               self.causality, self.dropout_rate, self.training)

        # Restore shape
        outputs = tf.concat(tf.split(outputs, self.num_heads, axis=0), axis=2 ) # (N, T_q, d_model)
              
        # Residual connection
        outputs += q
        return outputs

class hanAttention(Layer):
    """https://arxiv.org/pdf/1707.00896.pdf"""
    def __init__(self, masks, dropout_rate, l2_rate, **kwargs):
        self.masks = masks
        self.dropout_rate = dropout_rate
        self.l2_rate = l2_rate
        super(hanAttention, self).__init__(**kwargs)
        
    def build(self, input_shape):
        #print ('my shape is,',input_shape[0])
        self.w = self.add_weight(name='kernel',
                                        shape=(int(input_shape[0][-1]),),
                                        initializer='glorot_uniform',
                                        regularizer=l2(self.l2_rate),
                                        trainable=True
                                )
        
        self.dense = keras.layers.Dense(int(input_shape[0][-1]), use_bias=True)
        
        super(hanAttention, self).build(input_shape)
    
    def call(self, inputs):
        h, input1 = inputs #h [none, 200, 2*lstmShape]
        dk = h.get_shape().as_list()[-1]
        
        if self.masks:
            key_masks = tf.equal(input1, 0)
        else:
            key_masks = None
        hw = self.dense(h)
        outputs = tf.matmul(hw, tf.reshape(self.w, (dk,-1)))  #[none, 200, 2*lstmShape]*[2*lstmShape, 1]
        outputs = mask(tf.transpose(outputs, (0, 2, 1)), key_masks=key_masks, type="key")
        outputs = tf.nn.softmax(outputs)
        
        outputs = keras.layers.Dropout(rate=self.dropout_rate)(outputs)
        outputs = tf.matmul(outputs, h) #[none, 1, 200] * [none, 200, 2*lstmShape] 
        return outputs

In [10]:
def attentionModel():
    
    inputs1 = keras.layers.Input(shape = (200, ), name = 'words')
    embd1 = keras.layers.Embedding(output_dim=64, input_dim=len(words_dic)+1, input_length=200)(inputs1)
    inputs2 = keras.layers.Input(shape = (200, ), name = 'times')
    embd2 = keras.layers.Embedding(output_dim=64, input_dim=10+1, input_length=200)(inputs2)
    
    #embd1 += positionalEncoding(200, True)(embd1)
    att_words = mutiHeadAttention(masks = True, causality = False, dropout_rate = 0.1, 
        training = True, num_heads = 8, size_per_head = 8)([embd1, embd1, embd1, inputs1])
    #att_words = layerNormalization(0.001, epsilon = 1e-8)(att_words)
    
    att_times = mutiHeadAttention(masks = True, causality = False, dropout_rate = 0.1, 
        training = True, num_heads = 8, size_per_head = 8)([embd2, embd2, embd2, inputs2])
    #att_times = layerNormalization(0.001, epsilon = 1e-8)(att_times)
    
    att = keras.layers.concatenate(inputs=[att_words, att_times], axis=1)
    att = keras.layers.Flatten()(att)
    dense = keras.layers.Dense(16, activation='relu')(att)
    
    output = keras.layers.Dense(1, activation='sigmoid')(dense)
    
    model_att = keras.Model(inputs=[inputs1, inputs2], outputs=[output])
    model_att.compile(loss='binary_crossentropy', optimizer='adam',
                            metrics=[keras.metrics.Precision(), keras.metrics.Recall()])
    #print (model_att.summary())
    return model_att

def lstmModel():

    inputs1 = keras.layers.Input(shape = (200, ), name = 'words')
    embd1 = keras.layers.Embedding(output_dim=64, input_dim=len(words_dic)+1, input_length=200)(inputs1)
    inputs2 = keras.layers.Input(shape = (200, ), name = 'times')
    embd2 = keras.layers.Embedding(output_dim=64, input_dim=10+1, input_length=200)(inputs2)
    
    lstm_words = keras.layers.LSTM(64)(embd1)
    lstm_times = keras.layers.LSTM(64)(embd2)
    
    lstm = keras.layers.concatenate(inputs=[lstm_words, lstm_times], axis=1)
    lstm = keras.layers.Flatten()(lstm)
    dense = keras.layers.Dense(16, activation='relu')(lstm)
    
    output = keras.layers.Dense(1, activation='sigmoid')(dense)
    
    model_lstm = keras.Model(inputs=[inputs1, inputs2], outputs=[output])
    model_lstm.compile(loss='binary_crossentropy', optimizer='adam',
                            metrics=['accuracy',keras.metrics.Precision(), keras.metrics.Recall()])
    #print (model_lstm.summary())
    return model_lstm

def RCnnModel():

    inputs1 = keras.layers.Input(shape = (200, ), name = 'words')
    embd1 = keras.layers.Embedding(output_dim=64, input_dim=len(words_dic)+1, input_length=200)(inputs1)
    inputs2 = keras.layers.Input(shape = (200, ), name = 'times')
    embd2 = keras.layers.Embedding(output_dim=64, input_dim=10+1, input_length=200)(inputs2)
    
    steps_hState1, steps_reverse_hState1 = keras.layers.Bidirectional(
        keras.layers.LSTM(64, return_sequences=True, dropout = 0.3), merge_mode = None)(embd1)
    steps_hState2, steps_reverse_hState2 = keras.layers.Bidirectional(
        keras.layers.LSTM(64, return_sequences=True, dropout = 0.3), merge_mode = None)(embd2)
    
    shape = [tf.shape(steps_hState1)[0], 1, tf.shape(steps_hState1)[2]]
    c_left1 = tf.concat([tf.zeros(shape), steps_hState1[:, :-1]], axis=1)
    c_right1 = tf.concat([steps_reverse_hState1[:, 1:], tf.zeros(shape)], axis=1)
    
    c_left2 = tf.concat([tf.zeros(shape), steps_hState2[:, :-1]], axis=1)
    c_right2 = tf.concat([steps_reverse_hState2[:, 1:], tf.zeros(shape)], axis=1)
    
    lstm_words = keras.layers.concatenate(inputs=[c_left1, embd1, c_right1], axis=2)
    lstm_times = keras.layers.concatenate(inputs=[c_left2, embd2, c_right2], axis=2)
    
    dense_words = keras.layers.Dense(64, activation='relu')(lstm_words)
    dense_times = keras.layers.Dense(64, activation='relu')(lstm_times)
    
    dense = keras.layers.concatenate([tf.reduce_max(dense_words, axis = 1), 
                                      tf.reduce_max(dense_times, axis = 1)], axis = 1)
    dropout = keras.layers.Dropout(0.3)(dense)
    output = keras.layers.Dense(1, activation='sigmoid')(dropout)
    
    model_rcnn = keras.Model(inputs=[inputs1, inputs2], outputs=[output])
    model_rcnn.compile(loss='binary_crossentropy', optimizer='adam',
                            metrics=['accuracy',keras.metrics.Precision(), keras.metrics.Recall()])
    #print (model_lstm.summary())
    return model_rcnn

def HAttention():
    inputs1 = keras.layers.Input(shape = (200, ), name = 'words')
    embd1 = keras.layers.Embedding(output_dim=64, input_dim=len(words_dic)+1, input_length=200)(inputs1)
    inputs2 = keras.layers.Input(shape = (200, ), name = 'times')
    embd2 = keras.layers.Embedding(output_dim=64, input_dim=10+1, input_length=200)(inputs2)
    
    steps_hState1, steps_reverse_hState1 = keras.layers.Bidirectional(keras.layers.LSTM(64, return_sequences=True), merge_mode = None)(embd1)
    steps_hState2, steps_reverse_hState2 = keras.layers.Bidirectional(keras.layers.LSTM(64, return_sequences=True), merge_mode = None)(embd2)
    
    lstm_words = keras.layers.concatenate(inputs=[steps_hState1, steps_reverse_hState1], axis=2)
    lstm_times = keras.layers.concatenate(inputs=[steps_hState2, steps_reverse_hState2], axis=2)
    
    att_words = hanAttention(masks = True, dropout_rate = 0.1, l2_rate = 0.001)([lstm_words, inputs1])
    att_times = hanAttention(masks = True, dropout_rate = 0.1, l2_rate = 0.001)([lstm_times, inputs1])
    
    dense = keras.layers.concatenate([tf.reduce_max(att_words, axis = 1), 
                                      tf.reduce_max(att_times, axis = 1)], axis = 1)
    dense = keras.layers.Dense(32, activation='relu')(dense)
    output = keras.layers.Dense(1, activation='sigmoid')(dense)
    
    model_han = keras.Model(inputs=[inputs1, inputs2], outputs=[output])
    model_han.compile(loss='binary_crossentropy', optimizer='adam',
                            metrics=['accuracy',keras.metrics.Precision(), keras.metrics.Recall()])
    #print (model_lstm.summary())
    return model_han


In [11]:
early_stopping = keras.callbacks.EarlyStopping(monitor='val_loss', patience=3)
check_point_path_noatt = "./model/my_att_model.h5"
model_checkpoint_noatt = keras.callbacks.ModelCheckpoint(check_point_path_noatt, verbose=1, save_best_only=True,
                                         save_weights_only=True)
att_model = attentionModel()#lstmModel()#attentionModel()
att_model.fit(x_train_set_all, y_train, \
                validation_data=(x_test_set_all, y_test), \
                epochs=5, batch_size=128, shuffle=True, \
                callbacks=[early_stopping, model_checkpoint_noatt])

W1021 11:58:52.421134 140034245318464 deprecation.py:323] From /home/baogong/anaconda3/lib/python3.6/site-packages/tensorflow/python/ops/math_grad.py:1250: add_dispatch_support.<locals>.wrapper (from tensorflow.python.ops.array_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where


Train on 1285976 samples, validate on 428659 samples
Epoch 1/5
 124160/1285976 [=>............................] - ETA: 4:11:43 - loss: 0.0224 - precision: 0.8991 - recall: 0.6619

KeyboardInterrupt: 

In [12]:
data1['label'].value_counts()

0    1689966
1      24669
Name: label, dtype: int64

In [None]:
# def label_smoothing(inputs, epsilon=0.1):
#     V = inputs.get_shape().as_list()[-1] # number of channels
#     return ((1-epsilon) * inputs) + (epsilon / V)

In [None]:
def get_white_data1(date1, date2):
    sql = """
        select *
        from baogong_uuid_white_simulator_url
        --tablesample(bucket 1 out of 8 on rand())
        where visit_date>='{}'
        and visit_date<='{}'
        --limit 500000
    """.format(date1, date2)
    data = sqlContext.sql(sql).toPandas()
    return data
###valadation
val_black = get_black_data('2019-09-26', '2019-09-30')
val_white = get_white_data1('2019-09-26', '2019-09-26')
#val_data = get_data('2019-09-26', '2019-09-26')

val_white['label'] = 0
val_black['label'] = 1
val_data = pd.concat([val_white, val_black], axis = 0, ignore_index=True)

val_data['len'] = val_data['words'].apply(lambda x: len(x.split('::')))

val_data1 = val_data[val_data['len']>=5]
val_data1['label'].value_counts()

In [None]:
val_data_deep = val_data1[['words', 'times', 'label']]
val_data_deep['times'] = val_data_deep['times'].fillna('0')
val_data_deep = val_data_deep.fillna('UNK')

val_X_url, val_X_time = get_convert_data(val_data_deep, 200, 1, words_dic)

url_val = np.array(val_X_url)
time_val = np.array(val_X_time)
y_val = val_data_deep['label'].values

x_val_set_all = {"words": url_val,
                   "times": time_val,
                   }

In [None]:
pred_val = att_model.predict(x_val_set_all, verbose=1)

In [None]:
val_data_deep['pred'] = pred_val[:,0]

In [None]:
val_data_deep[val_data_deep['label']==1].shape, val_data_deep[val_data_deep['pred']>=0.99].shape

In [None]:
val_data_deep[(val_data_deep['label']==1)&(val_data_deep['pred']>=0.99)].shape

In [None]:
14.0/33, 14.0/108

In [None]:
val_data_deep['uuid_did'] = val_data1['uuid_did']