In [1]:
%cd /home/joe/hack104-rec
import xgboost as xgb
from carriage import Stream, StreamTable, Row, X
from sklearn.datasets import load_svmlight_file
from pathlib import Path
import numpy as np

%env SPARK_HOME=/opt/spark
import findspark
findspark.init()
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [274]:
from pyspark.sql import functions as f
from pyspark.sql.types import (StringType, StructType, StructField, 
                               MapType, ArrayType, TimestampType,
                               IntegerType, LongType
                              )
from opencc import OpenCC 

import jieba

import functools as fnt
from pyspark.sql import Row

import urllib

def udfy(func=None, return_type=StringType()):
    def wrapper(func):
        func.udf = f.udf(func, returnType=return_type)
        return func
        
    if func is None:
        return wrapper
    else:
        return wrapper(func)
    
@udfy(return_type=ArrayType(StringType()))
def tokenize(text):
    return (Stream(jieba.cut_for_search(openCC.convert(text)))
            .filter(X != ' ').to_list())

querystring_params = ['keyword', 'ro', 'jobcat', 'area', 'isnew', 'kwop']

querystring_type = StructType([
    StructField(param, StringType())
    for param in querystring_params
])
@udfy(return_type=querystring_type)
def parse_qs(qs):
    qs_d = dict(urllib.parse.parse_qsl(qs))
    qs_d_filled = {
        param: qs_d.get(param, '')
        for param in querystring_params
    }
    return Row(**qs_d_filled)


train_click_sdf = spark.read.json('data/train-click.json')
(train_click_sdf
 .withColumn('datetime', (f.col('date').cast(LongType()) / 1000 ).cast(TimestampType()))
 .withColumn('jobno', f.col('jobno').cast(LongType()))
 .withColumn('joblist', f.col('joblist').cast(ArrayType(LongType())))
 .withColumn('query_params', parse_qs.udf('querystring'))
 .withColumn('tokens', tokenize.udf('query_params.keyword'))
 .withColumn('date', f.to_date('datetime'))
 .withColumn('id', f.monotonically_increasing_id())
).write.parquet('data/train-click.pq', mode='overwrite')

In [277]:
train_click_sdf = spark.read.parquet('data/train-click.pq')
train_click_sdf.printSchema()

root
 |-- action: string (nullable = true)
 |-- date: date (nullable = true)
 |-- joblist: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- jobno: long (nullable = true)
 |-- querystring: string (nullable = true)
 |-- source: string (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- query_params: struct (nullable = true)
 |    |-- keyword: string (nullable = true)
 |    |-- ro: string (nullable = true)
 |    |-- jobcat: string (nullable = true)
 |    |-- area: string (nullable = true)
 |    |-- isnew: string (nullable = true)
 |    |-- kwop: string (nullable = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: long (nullable = true)



# train click exploded

In [229]:
%%time

train_click_sdf = spark.read.parquet('data/train-click.pq')
train_click_exploded_sdf = (
    train_click_sdf 
    .select('action', 
            'query_params',
            'tokens',
            'datetime',
            f.explode('joblist').alias('jobno'), 
            f.col('jobno').alias('jobno_action_on'),
           )
    .withColumn('jobno_action_on', f.col('jobno') == f.col('jobno_action_on'))
)

train_click_exploded_sdf.write.parquet(
    'data/train-click-exploded.pq',
    mode='overwrite',
    compression='snappy')

CPU times: user 53.8 ms, sys: 11.9 ms, total: 65.7 ms
Wall time: 22.9 s


# train click CTR

In [230]:
%%time
train_click_exploded_sdf = spark.read.parquet('data/train-click-exploded.pq')

(train_click_exploded_sdf
 .withColumn('jobno_action_on', f.when(f.col('jobno_action_on') == True, 1).otherwise(0))
 .withColumn('date', f.to_date('datetime'))
 .groupby('date', 'action', 'query_params.keyword', 'jobno')
 .agg(
     f.count(f.lit(1)).alias('impr'),
     f.sum('jobno_action_on').alias('click')
     )
 .withColumn('CTR', f.col('click') / f.col('impr'))
 .sort('action', 'impr', 'CTR', ascending=[False, False, False])
 .write.parquet(
     'data/train_click_ctr.pq', 
     mode='overwrite',
     compression='snappy', 
 )
)

CPU times: user 12.6 ms, sys: 458 µs, total: 13 ms
Wall time: 19.2 s


In [219]:
train_click_ctr_sdf = spark.read.parquet('data/train-click-ctr.pq')
train_click_ctr_sdf.show()
# .sort('action', 'impr', 'CTR', ascending=[False, False]).show()

+----------+--------+-------+--------+----+-----+---+
|      date|  action|keyword|   jobno|impr|click|CTR|
+----------+--------+-------+--------+----+-----+---+
|2018-05-08|clickJob|  store|10130298|   1|    0|0.0|
|2018-05-25|clickJob|   服飾銷售| 9517175|   1|    0|0.0|
|2018-05-31|clickJob|   .net| 7585853|   1|    0|0.0|
|2018-05-25|clickJob|     松下| 8769340|   1|    0|0.0|
|2018-05-04|clickJob|國家衛生研究院| 6176405|   1|    0|0.0|
|2018-05-25|clickJob|   業務代表| 8316937|   1|    0|0.0|
|2018-05-16|clickJob|    SAP|10092116|   1|    0|0.0|
|2018-05-25|clickJob|  汽車繪圖師| 6218526|   1|    0|0.0|
|2018-05-05|clickJob|   會計助理| 3888481|   1|    0|0.0|
|2018-05-25|clickJob| 海外儲備主管| 9114522|   1|    0|0.0|
|2018-05-02|clickJob|     光寶| 4793599|   1|    0|0.0|
|2018-05-25|clickJob|     產品| 7269869|   1|    0|0.0|
|2018-05-08|clickJob|   資訊安全| 7820423|   1|    0|0.0|
|2018-05-25|clickJob|    繪圖員| 8130755|   1|    0|0.0|
|2018-05-08|clickJob|   資訊安全| 7105052|   1|    0|0.0|
|2018-05-25|clickJob|   英文助理

# action

In [167]:
!head data/train-action.json

{"jobno":"10000835","date":"1527566791000","action":"viewJob","source":"web"}
{"jobno":"10000835","date":"1526357145000","action":"viewJob","source":"web"}
{"jobno":"10000835","date":"1526359940000","action":"viewJob","source":"mobileWeb"}
{"jobno":"10000835","date":"1526877314000","action":"viewJob","source":"web"}
{"jobno":"10000835","date":"1527653404000","action":"viewJob","source":"web"}
{"jobno":"10000835","date":"1525659300000","action":"viewJob","source":"mobileWeb"}
{"jobno":"10000835","date":"1525659953000","action":"viewJob","source":"app","device":"android"}
{"jobno":"10000835","date":"1525658788000","action":"viewJob","source":"web"}
{"jobno":"10000835","date":"1526011986000","action":"viewJob","source":"web"}
{"jobno":"10000835","date":"1525779400000","action":"viewJob","source":"web"}


In [214]:
schema = StructType([
    StructField("jobno", StringType(), True),
    StructField("date", StringType(), True),
    StructField("action", StringType(), True),
    StructField("source", StringType(), True),
    StructField("device", StringType(), True),

])

train_action_sdf = spark.read.json('data/train-action.json', schema=schema)


(train_action_sdf
 .select(f.col('jobno').cast(IntegerType()),
         (f.col('date').cast(LongType()) / 1000 ).cast(TimestampType()).alias('datetime'),
         'action',
         'source',
         'device'
        )
 .write.parquet('data/train-action.pq', mode='overwrite'))

In [221]:
train_action_sdf = spark.read.parquet('data/train-action.pq')
train_action_sdf.printSchema()

root
 |-- jobno: integer (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- action: string (nullable = true)
 |-- source: string (nullable = true)
 |-- device: string (nullable = true)



In [228]:
(train_action_sdf
 .withColumn('date', f.to_date('datetime'))
 .groupby('date', 'action', 'source', 'jobno')
 .count()
).write.parquet('data/train-action-count.pq', mode='overwrite')

# label

In [279]:
train_click_sdf = spark.read.parquet('data/train-click.pq')
(train_click_sdf
 .select('*', 
         f.posexplode('joblist').alias('pos', 'job'),
        )
 .withColumn('action', f.when(f.col('job') == f.col('jobno'), f.col('action')).otherwise(None))
 .withColumn('keyword', f.col('query_params.keyword'))
 .drop('query_params')
 .drop('querystring')
 .drop('joblist')
 .drop('jobno')
).write.parquet('data/train-label.pq', mode='overwrite')


In [302]:
train_label_sdf = spark.read.parquet('data/train-label.pq')
train_label_sdf.printSchema()

root
 |-- action: string (nullable = true)
 |-- date: date (nullable = true)
 |-- source: string (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: long (nullable = true)
 |-- pos: integer (nullable = true)
 |-- job: long (nullable = true)
 |-- keyword: string (nullable = true)



In [324]:
train_action_count_sdf = spark.read.parquet('data/train-action-count.pq')
action_count_per_day = (train_action_count_sdf
 .groupby('jobno', 'action')
 .agg(f.min('date').alias('date_start'),
      f.max('date').alias('date_stop'), 
      f.sum('count').alias('count'))
 .withColumn('date_diff', f.datediff('date_stop', 'date_start') + 1)
 .withColumn('count_per_day', f.col('count') / f.col('date_diff'))
 .select('jobno', 'action', 'count_per_day')
  .groupby('jobno')
 .pivot('action', ['viewJob', 'saveJob', 'applyJob'])
 .mean('count_per_day')
.fillna(0.0)

  .write.parquet('data/action-count-per-day.pq', mode='overwrite') )


In [325]:
action_count_per_day = spark.read.parquet('data/action-count-per-day.pq')
action_count_per_day.show()

+--------+------------------+-------------------+-------------------+
|   jobno|           viewJob|            saveJob|           applyJob|
+--------+------------------+-------------------+-------------------+
| 5685414| 4.290322580645161|                0.0|                0.0|
| 6951559|18.870967741935484| 0.2413793103448276|0.35714285714285715|
| 7108361|22.333333333333332| 1.1538461538461537| 0.9230769230769231|
| 7522686|3.7419354838709675|                0.0|0.10526315789473684|
| 8325514| 6.580645161290323|              0.375|             0.8125|
| 8795443| 8.225806451612904| 0.6785714285714286| 0.5862068965517241|
| 9370683| 11.32258064516129| 0.5714285714285714|                0.6|
| 9666534|13.333333333333334|               0.75| 0.6666666666666666|
| 9912410|25.071428571428573| 0.6521739130434783| 1.3703703703703705|
| 9915543| 6.290322580645161| 0.4827586206896552| 0.7142857142857143|
| 8081373|               8.0| 0.6153846153846154| 0.2222222222222222|
| 9301703|21.2142857

In [None]:
action_count_per_day.fillna()

In [327]:
train_label_with_action_sdf = train_label_sdf.join(action_count_per_day
                                                   .withColumnRenamed('jobno', 'job'), 
                                                   on='job', how='left')

In [329]:
train_label_with_action_sdf.rdd.getNumPartitions()

16

In [341]:
train_label_with_action_df = (train_label_with_action_sdf
 .repartition(16*4, 'id').sortWithinPartitions('id', 'pos')
 .withColumn('label', f.when(f.col('action') == 'clickJob', 1)
             .when(f.col('action') == 'saveJob', 2)
             .when(f.col('action')== 'applyJob', 3)
             .otherwise(0))
 .select('label', 'id', 'pos', 'viewJob', 'saveJob', 'applyJob')
).toPandas()

Py4JJavaError: An error occurred while calling o4551.collectToPython.
: java.lang.OutOfMemoryError: GC overhead limit exceeded
	at org.apache.spark.sql.execution.SparkPlan$$anon$1.next(SparkPlan.scala:282)
	at org.apache.spark.sql.execution.SparkPlan$$anon$1.next(SparkPlan.scala:276)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.sql.execution.SparkPlan$$anon$1.foreach(SparkPlan.scala:276)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeCollect$1.apply(SparkPlan.scala:298)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeCollect$1.apply(SparkPlan.scala:297)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3195)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3192)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3192)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
train_label_with_action_df

# job

In [284]:
job_sdf = spark.read.json('data/job.json')
job_sdf.show(vertical=True)


-RECORD 0---------------------------
 addr_no     | 6001005004           
 custno      | 7c01228e-5bc6-479... 
 description | ※輔導國小課業內容。
※中、英語... 
 edu         | 56                   
 exp_jobcat1 | 0                    
 exp_jobcat2 | 0                    
 exp_jobcat3 | 0                    
 industry    | 1005001009           
 job         | 國小安親課輔老師             
 jobcat1     | 2016002008           
 jobcat2     | 2016002011           
 jobcat3     | 0                    
 jobno       | 4371531              
 language1   | 14444                
 language2   | 1111                 
 language3   | 1111                 
 major_cat   | 0                    
 major_cat2  | 0                    
 major_cat3  | 0                    
 need_emp    | 1                    
 need_emp1   | 2                    
 others      | 1.提供勞健保
2.待遇優
3.需... 
 period      | 2                    
 role        | 1                    
 role_status | 2049                 
 s2          | 4                    
 s