## Текстовый анализ URL в задаче lookalike

In [1]:
##### Config
from pyspark import SparkConf, SparkContext, HiveContext
import re
import numpy as np
import pandas as pd
import datetime
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.classification import LogisticRegressionWithSGD, NaiveBayes, NaiveBayesModel
import scipy.sparse as sps
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.evaluation import BinaryClassificationMetrics
import hashlib
from collections import Counter

hive_config_query = '''
set hive.vectorized.execution.enabled=true;
set hive.vectorized.execution.reduce.enabled = true;
set mapreduce.map.memory.mb=4096;
set mapreduce.map.child.java.opts=-Xmx4g;
set mapreduce.task.io.sort.mb=1024;
set mapreduce.reduce.child.java.opts=-Xmx4g;
set mapreduce.reduce.memory.mb=7000;
set mapreduce.reduce.shuffle.input.buffer.percent=0.5;
set mapreduce.input.fileinputformat.split.minsize=536870912;
set mapreduce.input.fileinputformat.split.maxsize=1073741824;
set hive.optimize.ppd=true;
set hive.merge.smallfiles.avgsize=536870912;
set hive.merge.mapredfiles=true;
set hive.merge.mapfiles=true;
set hive.hadoop.supports.splittable.combineinputformat=true;
set hive.exec.reducers.bytes.per.reducer=536870912;
set hive.exec.parallel=true;
set hive.exec.max.created.files=10000000;
set hive.exec.compress.output=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=1000000;
set hive.exec.max.dynamic.partitions.pernode=100000;
set io.seqfile.compression.type=BLOCK;
set mapreduce.map.failures.maxpercent=5;
'''

sc.stop()
conf = (SparkConf()
        .set("spark.executor.instances", 33)
        .set("spark.driver.maxResultSize", "16g")
        .set('spark.driver.memory','16g')
        .set("spark.executor.memory", '20g')
        .set("spark.yarn.executor.memoryOverhead", 2048)        
       )
sc = SparkContext(conf=conf)
hc = HiveContext(sc)

for q in hive_config_query.split(';'):
    try:
        hc.sql(q)
    except:
        pass


In [2]:
#Constants
n_list = [1,2,3,4,5]
tf_size = 2 ** 20

In [3]:
init_queries = '''

CREATE FUNCTION md5 as 'onemd5.Md5';

CREATE TABLE `user_kposminin.phone_id_train`(
  `phone_num` string, 
  `id` string, 
  `approve_label` int, 
  `full_app_label` int, 
  `full_app_first_day` int, 
  `sampled` int, 
  `strong_sampled` int)
PARTITIONED BY (ymd string)
;

truncate table user_kposminin.phone_id_train;

CREATE TABLE `user_kposminin.url_text_train`(
  `phone_num` string, 
  `approve_label` int, 
  `full_app_label` int, 
  `full_app_first_day` int, 
  `sampled` int, 
  `strong_sampled` int, 
  `urls_str` string)
  partitioned by (`ymd` string)
;



CREATE TABLE `user_kposminin.phone_id_test`(
  `phone_num` string, 
  `id` string, 
  `approve_label` int, 
  `full_app_label` int, 
  `full_app_first_day` int, 
  `sampled` int, 
  `strong_sampled` int)
PARTITIONED BY (ymd string)
;

truncate table user_kposminin.phone_id_test;

CREATE TABLE `user_kposminin.url_text_test`(
  `phone_num` string, 
  `approve_label` int, 
  `full_app_label` int, 
  `full_app_first_day` int, 
  `sampled` int, 
  `strong_sampled` int, 
  `urls_str` string)
  partitioned by (`ymd` string)
;

'''

another_train_prepare_query = '''
insert overwrite table user_kposminin.phone_id_train partition (ymd) 
  select     
    m.phone_num,
    m.id,
    if(ua.id is Null,0,1) as approve_label,
    if(uf.id is Null,0,1) as full_app_label,
    nvl(uf.first_day,0) as full_app_first_day,
    m.sampled,
    if(substr(md5(concat(phone_num,'aa')),1,1) = '0', 1, 0) as strong_sampled,
    '#ymd0' as ymd
  from
    (select 
       uid_str as id,
       property_value as phone_num,
       if(substr(md5(property_value),1,3) = '000', 1, 0) as sampled
     from
       prod_dds.md_uid_property 
     where
       property_cd = 'PHONE' and
       load_src = 'LI.02'
    ) m
    left join(
      select distinct id, if(ymd = '#ymd1',1,0) as first_day
      from prod_features_liveinternet.user_action
      where action_type = 'tinkoff_platinum_approved_application'
        and ymd between '#ymd1' and '#ymd3'
    ) ua on ua.id = m.id
    left join(
      select distinct id, if(ymd = '#ymd1',1,0) as first_day
      from prod_features_liveinternet.user_action
      where action_type = 'tinkoff_platinum_complete_application'
        and ymd between '#ymd1' and '#ymd3'
    ) uf on uf.id = m.id
    where
      (sampled = 1 or (not ua.id is Null) or (not uf.id is Null))
;



insert overwrite table user_kposminin.url_text_train partition (ymd) 
select 
  u.phone_num,
  max(u.approve_label) as approve_label,
  max(u.full_app_label) as full_app_label,
  max(u.full_app_first_day) as full_app_first_day,
  max(u.sampled) as sampled,
  max(u.strong_sampled) as strong_sampled,
  concat_ws(' ',collect_list(url)) as up,
  '#ymd0' as ymd
from 
  user_kposminin.phone_id_train u
  inner join prod_raw_liveinternet.access_log v on u.id = v.id 
where
  u.ymd = '#ymd0' and 
  v.ymd = '#ymd0'
group by 
  u.phone_num
;

'''

another_test_prepare_query = '''
insert overwrite table user_kposminin.phone_id_test partition (ymd) 
  select     
    m.phone_num,
    m.id,
    if(ua.id is Null,0,1) as approve_label,
    if(uf.id is Null,0,1) as full_app_label,
    nvl(uf.first_day,0) as full_app_first_day,
    m.sampled,
    if(substr(md5(concat(phone_num,'aa')),1,1) = '0', 1, 0) as strong_sampled,
    '#ymd0' as ymd
  from
    (select 
       uid_str as id,
       property_value as phone_num,
       if(substr(md5(property_value),1,1) = '0', 1, 0) as sampled
     from
       prod_dds.md_uid_property 
     where
       property_cd = 'PHONE' and
       load_src = 'LI.02' and
       md5(property_value),1,1) = '0'
    ) m
    left semi join prod_raw_liveinternet.access_log v on m.id = v.id and v.ymd = '#ymd0'
    left join(
      select distinct id, if(ymd = '#ymd1',1,0) as first_day
      from prod_features_liveinternet.user_action
      where action_type = 'tinkoff_platinum_approved_application'
        and ymd between '#ymd1' and '#ymd3'
    ) ua on ua.id = m.id
    left join(
      select distinct id, if(ymd = '#ymd1',1,0) as first_day
      from prod_features_liveinternet.user_action
      where action_type = 'tinkoff_platinum_complete_application'
        and ymd between '#ymd1' and '#ymd3'
    ) uf on uf.id = m.id
;



insert overwrite table user_kposminin.url_text_test partition (ymd) 
select 
  u.phone_num,
  max(u.approve_label) as approve_label,
  max(u.full_app_label) as full_app_label,
  max(u.full_app_first_day) as full_app_first_day,
  max(u.sampled) as sampled,
  max(u.strong_sampled) as strong_sampled,
  concat_ws(' ',collect_list(url)) as up,
  '#ymd0' as ymd
from 
  user_kposminin.phone_id_test u
  inner join prod_raw_liveinternet.access_log v on u.id = v.id 
where
  u.ymd = '#ymd0' and 
  v.ymd = '#ymd0'
group by 
  u.phone_num
;



'''
    
train_query = 'select * from user_kposminin.url_text_train'


In [4]:
# I used alias to avoid confusion with the mllib library
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.mllib.linalg import SparseVector
from pyspark.ml.feature import HashingTF as MLHashingTF
from pyspark.ml.feature import IDF as MLIDF
from pyspark.sql.types import DoubleType
import urllib

In [5]:
train_date = datetime.datetime(2016,12,23).date()
qq = ''
for _ in range(30):
    query = another_train_prepare_query.replace('#ymd0',str(train_date)) \
      .replace('#ymd1',str(train_date + datetime.timedelta(days = 1))) \
      .replace('#ymd3',str(train_date + datetime.timedelta(days = 3))) \
      .replace('#ind',str(train_date).replace('-',''))
    qq += query
    train_date = train_date - datetime.timedelta(days = 8)
#print(qq)    

In [6]:
test_date = datetime.datetime(2017,1,17).date()
qq = ''
for _ in range(1):
    query = another_test_prepare_query.replace('#ymd0',str(test_date)) \
      .replace('#ymd1',str(test_date + datetime.timedelta(days = 1))) \
      .replace('#ymd3',str(test_date + datetime.timedelta(days = 3))) \
      .replace('#ind',str(test_date).replace('-',''))
    qq += query
    #test_date = test_date - datetime.timedelta(days = 2)
#print(qq)

In [7]:
# Translate ru to eng, Transform text to n_gram list, Get n_gram index

#abc = list(set(''.join([e[0] for e in hc.sql('select url from prod_raw_liveinternet.access_log v where ymd = "2017-01-10" limit 100000').collect()])))

def n_gram(s, n):
    '''Returns n-gram list from string s.'''
    return [s[i:i+n] for i in range(len(s) - n + 1)]

def n_gram_index(ngr,abc):
    '''Returns index of n-gram ngr. ngr chars must be from abc list'''
    N = tr_abc_len
    ind = 0
    
    for i in range(len(ngr)):
        try:
            j = abc.index(ngr[i].lower())
            if j > N:
                j = abc.index(ngr[i].lower().translate(transl))
            ind += (N ** i) * j
        except ValueError:
            ind += (N ** i) * (N - 1)
    return ind

#abc = list(u'abcdefghijklmnopqrstuvwxyz0123456789 _абвгдеёжзийклмнопрстуфхцчшщъыьэюя')
#tr_abc_len = abc.index(u'а') - 1

symbols = (u"абвгдеёжзийклмнопрстуфхцчшщъыьэюя&-?%#!/\=_.-~$",
           u"abvgdeejzijklmnoprstufhzcss_y_eua           ")
transl = {ord(a):ord(b) for a, b in zip(*symbols)}

def handle_str(s,transl,n):
    '''Translate ru-> eng by letter and lower string'''
    return re.sub('[ ]+',' '*(n-1),re.sub('[^a-z0-9]+',' ',urllib.unquote(s.encode('UTF-8','ignore')).decode('UTF-8','ignore').lower().translate(transl)))


In [8]:
#a = train_sample.collect()
#for i in range(len(a)):
#    print i
#    b = handle_str(a[i].up,transl)
s= u'''ok.ru/feed glistof.net/board/statusy_pro_ulybku/32 ok.ru/dk?st.cmd=anonymMain mail.ru/?from=odnoklassniki mail.ru/?from=odnoklassniki mail.ru/?from=odnoklassniki mail.ru/?from=odnoklassniki
mail.ru/?from=odnoklassniki ok.ru/ mail.ru/?from=odnoklassniki ok.ru/game/candyvalley2 ok.ru/dk?st.cmd=anonymMain
ok.ru/game/candyvalley2 ok.ru/gifts ok.ru/feed ok.ru/ ok.ru/feed ok.ru/game/candyvalley2 ok.ru/game/candyvalley
ok.ru/dk?st.cmd=anonymMain ok.ru/ my-hit.org/film/19945/ news.mail.ru/politics/27714036/?frommail=1 my-hit.org/film/19945/ news.mail.ru/politics/27714036/?frommail=1 filmogo.co/5902-odnazhdy-v-odesse-2-sezon-vse-seriyiiiiji.html 
ok.ru/game/candyvalley allserials.net/serial-3560-zapovednik-straha-1-sezon.html fast-torrent.ru/film/mezhdu-zhiznyu-i-smertyu.html ok.ru/feed bigcinema.to/series/severnyy-veter-serial.html
go.mail.ru/search?rf=1011&fm=1&q=%D1%87%D1%82%D0%BE%20%D1%82%D0%B0%D0%BA%D0%BE%D0%B5%D0%B1%D1%80%D1%8E%D0%BA%D0%B8%20%D0%BA%D1%8E%D0%BB%D0%BE%D1%82%D1%8B&sbmt=1478611919453 bigcinema.to/series/severnyy-veter-serial.html 
bolshoyvopros.ru/questions/1828070-chto-takoe-brjuki-kjuloty-kak-oni-vygljadjat-kto-ih-avtor.html go.mail.ru/search?fm=1&rf=1011&q=ex.ua ok.ru/?_erv=vaywlyirbwpynedplup ok.ru/game/vegamix ok.ru/game/piratetreasures mail.ru/?from=odnoklassniki
ok.ru/?_erv=vaywlyirbwpynedplup mail.ru/?from=odnoklassniki filmogo.co/5902-odnazhdy-v-odesse-2-sezon-vse-seriyiiiiji.html mail.ru/?from=odnoklassniki allserials.net/serial-3560-zapovednik-straha-1-sezon.html 
mail.ru/?from=odnoklassniki news.mail.ru/politics/27714037/?frommail=1 ok.ru/dk?st.cmd=anonymMain news.mail.ru/politics/27714037/?frommail=1 mail.ru/?from=odnoklassniki go.mail.ru/search?fm=1&rf=1011&q=cnfnecs ghj ek%2Cre 
mail.ru/?from=odnoklassniki statusas.ru/ulibka/33-ulibka.html mail.ru/?from=odnoklassniki go.mail.ru/search?fm=1&rf=1011&q=%D0%B1%D1%80%D1%8E%D0%BA%D0%B8 %D0%BA%D1%8E%D0%BB%D0%BE%D1%82%D1%8B mail.ru/?from=odnoklassniki
mail.ru/?from=odnoklassniki ok.ru/feed mail.ru/?from=odnoklassniki ok.ru/profile/330316254834 mail.ru/?from=odnoklassniki pogoda.mail.ru/prognoz/kiev/ mail.ru/?from=odnoklassniki pogoda.mail.ru/prognoz/kiev/
news.mail.ru/politics/27714036/?frommail=1 mail.ru/?from=odnoklassniki news.mail.ru/politics/27714036/?frommail=1 mail.ru/?from=odnoklassniki glistof.net/board/statusy_pro_ulybku/32 ok.ru/?_erv=viewlyirbwpynedra
mail.ru/?from=odnoklassniki ok.ru/ mail.ru/?from=odnoklassniki ok.ru/ bigcinema.to/series/podzemnyy-perehod-serial.html bigcinema.to/series/podzemnyy-perehod-serial.html fast-torrent.ru/film/mezhdu-zhiznyu-i-smertyu.html 
bigcinema.to/series/severnyy-veter-serial.html bigcinema.to/series/severnyy-veter-serial.html mail.ru/?from=odnoklassniki mail.ru/?from=odnoklassniki mail.ru/?from=odnoklassniki mail.ru/?from=odnoklassniki pogoda.mail.ru/prognoz/kiev/
pogoda.mail.ru/prognoz/kiev/ mail.ru/?from=odnoklassniki mail.ru/?from=odnoklassniki news.mail.ru/incident/27707964/?frommail=1 news.mail.ru/incident/27707964/?frommail=1'''

#print(handle_str(s,transl))


In [9]:
#sdf = hc.sql('select phone_num,label,first_day,up from user_kposminin.url_text_20161108_2 limit 100') #
#sdf.show()

def generate_ngram_stat_manual(sdf,n_list,tf_size, idf = None):
    
    #cols = sdf.columns
    df_ngrams = (sdf
               .rdd
               .map(lambda r: list(r[:-1]) + [handle_str(r[-1],transl,n = max(n_list))])
               .map(lambda r: r[:-1] + [[[e for e in n_gram(r[-1],n) if not ' ' in e] for n in n_list]] )
               .map(lambda r: r + [[len(l) for l in r[-1]]])
            )
    
    df_tf = (df_ngrams
         .map(lambda r: r[:-2] + 
                [[
                    reduce(
                       lambda a,b:a+b,
                       [
                        [(int(hashlib.md5(k).hexdigest(),16) % tf_size,float(v)/r[-1][n-1] ),
                        (int(hashlib.md5(k + ':;Z').hexdigest(),16) % tf_size,float(v)/r[-1][n-1])]
                          for k,v in Counter(r[-2][n-1]).iteritems()  
                       ],
                       []
                     )      for n in n_list
                ]]
             )
         )

    df_tf.cache()
    
    
    if not idf:
        id_num = float(df_tf.count())
        idf = (df_tf
           .flatMap(lambda r: reduce(lambda a,b:a+b,[[((n,k),1) for k,_ in r[-1][n-1]] for n in n_list],[]) )
           .reduceByKey(lambda v1,v2: v1 + v2)
           .map(lambda (k,v):(k,np.log((id_num + 1)/(v + 1))))
         )

    df_tfidf = (df_tf
            .flatMap(lambda r: reduce(lambda a,b:a+b,[[((n,k),r[:-1] + [v]) for k,v in r[-1][n-1]] for n in n_list]))
            .join(idf)
            .map(lambda ((n,k),(v,i)): (tuple(v[:-1]),(k,n,v[-1],v[-1] * i)))
            .groupByKey()
            .map(lambda (k,v):(k,sorted(v)))            
           )
    
    return df_tfidf, idf


In [10]:
def generate_ngram_tf_manual(sdf,n_list,tf_size):
    
    #cols = sdf.columns
    df_ngrams = (sdf
               .rdd
               .map(lambda r: list(r[:-1]) + [handle_str(r[-1],transl,n = max(n_list))])
               .map(lambda r: r[:-1] + [[[e for e in n_gram(r[-1],n) if not ' ' in e] for n in n_list]] )
               .map(lambda r: r + [[len(l) for l in r[-1]]])
            )
    
    df_tf = (df_ngrams
         .map(lambda r: r[:-2] + 
                [[
                    reduce(
                       lambda a,b:a+b,
                       [
                        [(int(hashlib.md5(k).hexdigest(),16) % tf_size,float(v)/r[-1][n-1] ),
                        (int(hashlib.md5(k + ':;Z').hexdigest(),16) % tf_size,float(v)/r[-1][n-1])]
                          for k,v in Counter(r[-2][n-1]).iteritems()  
                       ],
                       []
                     )      for n in n_list
                ]]
             )
         )
    
    return df_tf


In [12]:

! hadoop fs -rm -r hdfs://nameservice1/user/k.p.osminin/url_text_tf_train_dir

17/01/30 14:40:13 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1440 minutes, Emptier interval = 0 minutes.
Moved: 'hdfs://nameservice1/user/k.p.osminin/url_text_tf_train_dir' to trash at: hdfs://nameservice1/user/k.p.osminin/.Trash/Current


In [13]:
start_time = datetime.datetime.now()

train_select_query = '''
select ymd,phone_num,approve_label,full_app_label,full_app_first_day,sampled,strong_sampled,urls_str 
from user_kposminin.url_text_train

'''


df1 = hc.sql(train_select_query)
# df2, idf = generate_ngram_stat_manual(df1,n_list,tf_size) # Не проработал за выходные 
df2 = generate_ngram_tf_manual(df1, n_list, tf_size)



(df2
.saveAsTextFile('url_text_tf_train_dir')
 )

#.map(lambda (k,v):','.join([str(e) for e in k]) + ',' + ';'.join([' '.join([str(ee) for ee in e]) for e in v]) + '\n')



print('train was handled in {}.'.format(datetime.datetime.now()  - start_time))


train was handled in 1:13:41.867371.


In [None]:
start_time = datetime.datetime.now()

test_select_query = '''
select ymd,phone_num,approve_label,full_app_label,full_app_first_day,sampled,strong_sampled,urls_str 
from user_kposminin.url_text_test
'''


df1t = hc.sql(test_select_query)
df2t,idf = generate_ngram_stat_manual(df1t,n_list,tf_size)
df2t.map(lambda (k,v):','.join([str(e) for e in k]) + ',' + ';'.join([' '.join([str(ee) for ee in e]) for e in v]) + '\n')
(df2t
.map(lambda (k,v):','.join([str(e) for e in k]) + ',' + ';'.join([' '.join([str(ee) for ee in e]) for e in v]) + '\n')
.saveAsTextFile('./external_hdfs/url_text_test_dir')
 )

print('{}.test was handled in {}.'.format(datetime.datetime.now(),datetime.datetime.now()  - start_time))


In [None]:
#a = df2.take(5)
#len(a[0][1])
import os
(df2
.map(lambda (k,v):','.join([str(e) for e in k]) + ',' + ';'.join([' '.join([str(ee) for ee in e]) for e in v]) + '\n')
.saveAsTextFile('url_text_train_dir_tst')
 )
#df2.saveAsSequenceFile('external_hdfs/url_text_trains_seq_tst')

#os.popen('cat ./external_hdfs/url_text_train_dir_tst/* > url_text_train.txt').read()

In [None]:
! du -h external_hdfs/url_text_train.txt
#! du --help
19./200 * 1500000/30

In [None]:
#htf_method = MLHashingTF(numFeatures = tf_size, inputCol="ngram_list", outputCol="tf")
#df_tf = htf_method.transform(df_ngrams) 
#df_tf.show()



In [None]:
def generate_ngram_stat(sdf,n_list,tf_size,idf_method = None, minDocFreq = 1):
    '''
    Generates n-gram statistics of sdf.
    
    Input:
      sdf -PySpark DataFrame.sdf last column contains text data to analyse (type string)
      n - size of n-grams
      tf_size -  dimension of a  space ngram projected to;
      
    Returns DataFrame with all sdf columns except last + columns:
        tf_size - dimension of a  space ngram projected to;
        tf_index - list of  indexes of n-gram found in sdf text columns;
        tf_values - list of corresponding TF values (n-gram counts);
        idf_values - list of corresponding TFIDF values.
    '''
    cols = sdf.columns
    df_ngrams = (sdf
               .rdd
               .map(lambda r: list(r[:-1]) + [handle_str(r[-1],transl,max(n_list))])
               .map(lambda r: list(r[:-1]) + [reduce(lambda a,b:a+b,n_gram(r[-1],n)  for b in n_list)])
               .toDF()
               .withColumnRenamed("_{}".format(len(cols)),"ngram_list"))    
    htf_method = MLHashingTF(numFeatures = tf_size, inputCol="ngram_list", outputCol="tf")
    df_tf = htf_method.transform(df_ngrams)    
    if not idf_method:
        print('Fitting idf_method')
        idf_method = MLIDF(inputCol="tf", outputCol="idf", minDocFreq = minDocFreq).fit(df_tf)        
    df_tfidf = idf_method.transform(df_tf)
    df_data = (df_tfidf
             .rdd
             .map(lambda r:
                  list(r[:-3]) + 
                  [r.tf.indices.tolist(),
                  r.tf.values.tolist(),
                  r.idf.values.tolist()]
                 )
             .toDF()
             .withColumnRenamed("_{}".format(len(cols)), "tf_index")
             .withColumnRenamed("_{}".format(len(cols)+1), "tf_values")
             .withColumnRenamed("_{}".format(len(cols)+2), "idf_values")
             )
    for i in range(len(cols)):
        df_data = df_data.withColumnRenamed("_{}".format(i+1),cols[i])
    return df_data, idf_method
    
    

In [None]:
def generate_ngram_stat(sdf,n_list,tf_size,idf_method = None, minDocFreq = 1):
    '''
    Generates n-gram statistics of sdf.
    
    Input:
      sdf -PySpark DataFrame.sdf last column contains text data to analyse (type string)
      n - size of n-grams
      tf_size -  dimension of a  space ngram projected to;
      
    Returns DataFrame with all sdf columns except last + columns:
        tf_size - dimension of a  space ngram projected to;
        tf_index - list of  indexes of n-gram found in sdf text columns;
        tf_values - list of corresponding TF values (n-gram counts);
        idf_values - list of corresponding TFIDF values.
    '''
    cols = sdf.columns
    df_ngrams = (sdf
               .rdd
               .map(lambda r: list(r[:-1]) + [handle_str(r[-1],transl,max(n_list))])
               .map(lambda r: list(r[:-1]) + [n_gram(r[-1],n)  for b in n_list])
               .toDF()
               .withColumnRenamed("_{}".format(len(cols)),"ngram_list"))    
    htf_method = MLHashingTF(numFeatures = tf_size, inputCol="ngram_list", outputCol="tf")
    df_tf = htf_method.transform(df_ngrams)    
    if not idf_method:
        print('Fitting idf_method')
        idf_method = MLIDF(inputCol="tf", outputCol="idf", minDocFreq = minDocFreq).fit(df_tf)        
    df_tfidf = idf_method.transform(df_tf)
    df_data = (df_tfidf
             .rdd
             .map(lambda r:
                  list(r[:-3]) + 
                  [r.tf.indices.tolist(),
                  r.tf.values.tolist(),
                  r.idf.values.tolist()]
                 )
             .toDF()
             .withColumnRenamed("_{}".format(len(cols)), "tf_index")
             .withColumnRenamed("_{}".format(len(cols)+1), "tf_values")
             .withColumnRenamed("_{}".format(len(cols)+2), "idf_values")
             )
    for i in range(len(cols)):
        df_data = df_data.withColumnRenamed("_{}".format(i+1),cols[i])
    return df_data, idf_method


In [None]:
train_sample = hc.sql('select phone_num,label,first_day,up from user_kposminin.url_text_20161108_2')
train_data,idf_method = generate_ngram_stat(train_sample, n = n, tf_size = tf_size)
print(datetime.datetime.now())

In [None]:
train_data.write.saveAsTable("user_kposminin.url_text_feat_20161108_7")
print(datetime.datetime.now())

In [None]:
test_sample = hc.sql('select phone_num,label,first_day,up from user_kposminin.url_text_20161115 where (substr(md5(phone_num),1,1) in ("0","1") or label = 1)')

from pyspark.ml.feature import IDFModel
idfm = MLIDF._new_java_obj("org.apache.spark.ml.feature.IDFModel.load", "idf_model")
idf_method = IDFModel(idfm)

test_data, _ = generate_ngram_stat(test_sample, n = n, tf_size = tf_size, idf_method = idf_method)
print(datetime.datetime.now())

In [None]:
test_data.write.saveAsTable("user_kposminin.url_text_feat_20161115_12")
print(datetime.datetime.now())

In [None]:
# Save IDF model
writer = idf_method._call_java("write")
writer.save("idf_model")

In [None]:
# Load IDF model
from pyspark.ml.feature import IDFModel
idfm = MLIDF._new_java_obj("org.apache.spark.ml.feature.IDFModel.load", "idf_model")
idf_method1 = IDFModel(idfm)


In [None]:
tf_size

In [None]:
train_data = (hc.sql('select label,first_day,tf_index from user_kposminin.url_text_feat_20161108_7')
         .map(lambda r:LabeledPoint(r.label,SparseVector(tf_size,{e:1 for e in r.tf_index})))
         )

In [None]:
#Train NaiveBayes model
train_data.cache()
modelNB = NaiveBayes.train(train_data)

def predict_proba_NB(f,model):
    import numpy as np
    '''
    Naive Bayes model prediction with probability. f is features [Sparse] vector. model is mllib.NaiveBayesModel.
    Function selects winning class with it probability.
    Output: tuple with model selected class number as first element (type int) and it probability as second (type float).
    '''
    logp = [[i,f.dot(model.theta[i]) + model.pi[i]] for i in range(len(model.theta))] # classes with log probabilities
    wi = sorted(logp, key = lambda e:  - e[1])[0][0] #winning index
    prob = 1./sum([np.exp(e[1] - logp[wi][1]) for e in logp]) #winning class probability
    return wi, prob

def predict_proba_NB_2(f, model):
    import numpy as np
    '''
    Naive Bayes model prediction with probability for 2-class classification.
    f is features [Sparse] vector. model is mllib.NaiveBayesModel.
    Output: probability of class 1 (type float).
    '''
    if len(model.theta) != 2:
        print('Model is NOT a 2-class classifier')
        return None
    logp = [f.dot(model.theta[i]) + model.pi[i] for i in range(2)]    
    return 1./(1. + np.exp(logp[0] - logp[1]))

In [None]:
print(datetime.datetime.now())

In [None]:
#LogisticRegression model
modelLR = LogisticRegressionWithSGD.train(train_data)
modelLR.clearThreshold()
print(datetime.datetime.now())

In [None]:
test_data = (hc.sql('select label,first_day,tf_index from user_kposminin.url_text_feat_20161115_12')
         .map(lambda r:LabeledPoint(r.label,SparseVector(tf_size,{e:1 for e in r.tf_index})))
         )

In [None]:
df_test = test_data.map( lambda lp: pyspark.sql.Row(
        Label = lp.label,
        NaiveBayes = float(predict_proba_NB_2(lp.features, modelNB)),
        LogisticRegression = float(modelLR.predict(lp.features))
    )).toDF().toPandas()
print(datetime.datetime.now())

In [None]:
#Build AUCROC metric and print results
import sklearn
AUCROC = {}
for c in df_test.columns:
    if c!= 'Label':
        AUCROC[c] = sklearn.metrics.roc_auc_score(df_test['Label'],df_test[c])
        
print('Methods AUCROC performance on test sample ({0:.0f} samples with {1:.0f} positives):\n'.format(len(df_test),df_test['Label'].sum()) +
     '\n'.join(['{0:<30}{1:.5f}'.format(k,v) for (k,v) in AUCROC.items()]))
print(datetime.datetime.now())

In [None]:
train_conf_str = '''
# task type, support train and predict
task = train

# boosting type, support gbdt for now, alias: boosting, boost
boosting_type = gbdt

# application type, support following application
# regression , regression task
# binary , binary classification task
# lambdarank , lambdarank task
# alias: application, app
objective = binary

# eval metrics, support multi metric, delimite by ',' , support following metrics
# l1 
# l2 , default metric for regression
# ndcg , default metric for lambdarank
# auc 
# binary_logloss , default metric for binary
# binary_error
metric = auc,binary_logloss

# frequence for metric output
metric_freq = 1

# true if need output metric for training data, alias: tranining_metric, train_metric
is_training_metric = true

# number of bins for feature bucket, 255 is a recommend setting, it can save memories, and also has good accuracy. 
max_bin = 255

# training data
# if exsting weight file, should name to "binary.train.weight"
# alias: train_data, train
#data = binary.train

# validation data, support multi validation data, separated by ','
# if exsting weight file, should name to "binary.test.weight"
# alias: valid, test, test_data, 
#valid_data = binary.test

# number of trees(iterations), alias: num_tree, num_iteration, num_iterations, num_round, num_rounds
num_trees = 100

# shrinkage rate , alias: shrinkage_rate
learning_rate = 0.05

# number of leaves for one tree, alias: num_leaf
num_leaves = 63

# type of tree learner, support following types:
# serial , single machine version
# feature , use feature parallel to train
# data , use data parallel to train
# voting , use voting based parallel to train
# alias: tree
tree_learner = data

# number of threads for multi-threading. One thread will use one CPU, defalut is setted to #cpu. 
# num_threads = 8

# feature sub-sample, will random select 80% feature to train on each iteration 
# alias: sub_feature
feature_fraction = 0.8

#classes are unbalanced
is_unbalance = true

# Support bagging (data sub-sample), will perform bagging every 5 iterations
bagging_freq = 5

# Bagging farction, will random select 80% data on bagging
# alias: sub_row
bagging_fraction = 0.8

# minimal number data for one leaf, use this to deal with over-fit
# alias : min_data_per_leaf, min_data
min_data_in_leaf = 5

# minimal sum hessians for one leaf, use this to deal with over-fit
#min_sum_hessian_in_leaf = 5.0

# save memory and faster speed for sparse feature, alias: is_sparse
is_enable_sparse = true

# when data is bigger than memory size, set this to true. otherwise set false will have faster speed
# alias: two_round_loading, two_round
use_two_round_loading = true

# true if need to save data to binary file and application will auto load data from binary file next time
# alias: is_save_binary, save_binary
is_save_binary_file = false

# output model file
output_model = lgbm.model

'''

lgbm_dir = './lgbm'
import os
os.popen('mkdir -p '+lgbm_dir).read()
open(lgbm_dir + '/train.conf','w').write(train_conf_str)

In [None]:
# prapare train data for LightGBM. Write to file.
'''
with open(lgbm_dir + '/train_data.txt','w') as f:
    for r in train_data.map(lambda lp: ' '.join([str(int(lp.label))] + [str(i) +':1' for i in lp.features.indices])).collect():
        f.write(r+'\n')
'''
print(datetime.datetime.now())
# prapare test data for LightGBM. Write to file.
test = (hc.sql('select label,tf_index,idf_values from user_kposminin.url_text_feat_20161108_7')
        .map(lambda r: ' '.join([str(int(r.label))] + [str(i) +':' + str(v) for i,v in zip(r.tf_index,r.idf_values)]))
        .saveAsTextFile('url_text_train_data1')
       )
#with open(lgbm_dir + '/test_data.txt','w') as f:
#    for r in test.collect():
#        f.write(r+'\n')
print(datetime.datetime.now())


In [None]:
print(datetime.datetime.now())
# prapare test data for LightGBM. Write to file.
test = (hc.sql('select label,tf_index from user_kposminin.url_text_feat_20161115_12 where substr(md5(phone_num),1,1) in ("0","1")')
        .map(lambda r: ' '.join([str(int(r.label))] + [str(i) +':1'  for i in r.tf_index]))
        .saveAsTextFile('url_text_test_data_sampled')
       )
#with open(lgbm_dir + '/test_data.txt','w') as f:
#    for r in test.collect():
#        f.write(r+'\n')
print(datetime.datetime.now())

In [None]:
def generate_ngram_stat(sdf,n,tf_size,idf_method = None, minDocFreq = 1):
    '''
    Generates n-gram statistics of sdf.
    
    Input:
      sdf -PySpark DataFrame.sdf last column contains text data to analyse (type string)
      n - size of n-grams
      tf_size -  dimension of a  space ngram projected to;
      
    Returns DataFrame with all sdf columns except last + columns:
        tf_size - dimension of a  space ngram projected to;
        tf_index - list of  indexes of n-gram found in sdf text columns;
        tf_values - list of corresponding TF values (n-gram counts);
        idf_values - list of corresponding TFIDF values.
    '''
    cols = sdf.columns
    df_ngrams = (sdf
               .rdd
               .map(lambda r: list(r[:-1]) + [n_gram(handle_str(r[-1],transl),n)])
               .toDF()
               .withColumnRenamed("_{}".format(len(cols)),"ngram_list"))    
    htf_method = MLHashingTF(numFeatures = tf_size, inputCol="ngram_list", outputCol="tf")
    df_tf = htf_method.transform(df_ngrams)    
    if not idf_method:
        print('Fitting idf_method')
        idf_method = MLIDF(inputCol="tf", outputCol="idf", minDocFreq = minDocFreq).fit(df_tf)        
    df_tfidf = idf_method.transform(df_tf)
    df_data = (df_tfidf
             .rdd
             .map(lambda r:
                  list(r[:-3]) + 
                  [r.tf.indices.tolist(),
                  r.tf.values.tolist(),
                  r.idf.values.tolist()]
                 )
             .toDF()
             .withColumnRenamed("_{}".format(len(cols)),"tf_index")
             .withColumnRenamed("_{}".format(len(cols)+1),"tf_values")
             .withColumnRenamed("_{}".format(len(cols)+2),"idf_values")
             )
    for i in range(len(cols)):
        df_data = df_data.withColumnRenamed("_{}".format(i+1),cols[i])
    return df_data, idf_method
    

In [None]:
sc.stop()
conf = (SparkConf()
        .set("spark.executor.instances", 1)
        .set("spark.driver.maxResultSize", "4g")
        .set('spark.driver.memory','4g')
       # .set("spark.executor.memory", '8g')
       # .set("spark.yarn.executor.memoryOverhead", 2048)        
       )
sc = SparkContext(conf=conf)

In [None]:


text = '''We have been helping grow the San Francisco data science community since 2014. and we are finally ready
to bring our fellowship program to New York.  We wanted to give aspiring data scientists in the tri-state area. the 
opportunity to hone their skills by building real world machine learning systems for finance and startups'''

df = sc.parallelize([[0,0,e] for e in text.split('.')]).toDF()

df.show()

In [None]:
n=5
cols = df.columns
df_ngrams = (df
               .rdd
               .map(lambda r: list(r[:-1]) + [n_gram(handle_str(r[-1],transl),n)])
               .toDF()
               .withColumnRenamed("_{}".format(len(cols)),"ngram_list"))  

In [None]:
df_ngrams.show()

In [None]:
tf_size = 2**20
htf_method = MLHashingTF(numFeatures = tf_size, inputCol="ngram_list", outputCol="tf")
df_tf = htf_method.transform(df_ngrams)    

In [None]:
print(str(df_tf.select('tf').collect()[0]))