In [None]:
import pandas as pd
import numpy as np
import os
import gc
from tqdm import tqdm_notebook # check the progressbar in the python. 
import glob # check the file name in fold. 

In [None]:
import pyspark

import findspark
findspark.init()
findspark.find()

In [None]:
# C:\Users\User\Documents\R&D Challenge2019
path = 'C:/Users/User/Documents/R_D Challenge2019/Challenge19_GameBot_Preliminary/dataset/'

In [None]:
read_file_lst = glob.glob(path + '*')
read_file_lst[0:2]

In [None]:
from pyspark import SparkContext
sc = SparkContext()

In [None]:
import functools 

def unionAll(dfs):
    return functools.reduce(lambda df1,df2: df1.union(df2.select(df1.columns)), dfs) 

In [None]:
from pyspark.sql import SQLContext
from pyspark import SparkFiles
from pyspark.sql.functions import lit

sqlContext = SQLContext(sc)
for i in tqdm_notebook(read_file_lst):
    # sqlContext.read.csv() : path를 C:Users/..의 형식으로 맞춰줘야함. (초기값이 spark설치된 곳으로 지정되어있음)
    df_temp = sqlContext.read.csv(SparkFiles.get(i), header=True, inferSchema= True) # read.csv 
    
    # transformation으로 추가하려면, df = df.withColumn("age_square", col("age")**2)
    df_temp = df_temp.withColumn("Date", lit(i[-8:-4])) # column 추가, lit명령어가 value를 추가하는 방법. 
    
    if i == read_file_lst[0]:
        df_total = df_temp
    else:
        df_total = unionAll([df_total, df_temp]) # row-wise 결합
        
    del df_temp 
    gc.collect()

In [None]:
import pyspark.sql.functions as f
# userItem=df.groupby('userId').agg(f.expr('count(distinct item)').alias('n_item'))
# df_total.groupBy("log_id").agg(f.expr('count(distinct item)').alias('log_id'))
# df_total.select("actor_account").distinct().count()

In [None]:
i = 'C:/Users/User/Documents/R_D Challenge2019/Challenge19_GameBot_Preliminary/'
train_label = sqlContext.read.csv(SparkFiles.get(i + 'labeled_accounts.csv'), header=True, inferSchema= True)
test_label = sqlContext.read.csv(SparkFiles.get(i + 'test_accounts.csv'), header=True, inferSchema= True)

In [None]:
print((train_label.count(), len(train_label.columns)))
print((test_label.count(), len(test_label.columns)))

### Feature Engineering

Player information features
- Actor (o)
- A_Acc (o)
- login_day_count 
- logout_day_count
- playtime (o)
- playtime_per_day
- avg_money (o)
- login_count (o)
- ip_count (o)
- max_level (o)

In [None]:
# 코드의 개발을 위해서 데이터를 먼저 0.01%만 샘플링해서 사용. 
# df_total_sample = df_total.rdd.takeSample(False, 0.0001, seed = 1234)

# 하지만, 샘플링의 경우 모든 row를 훑기때문에 속도가 느리고 
# 내가 원한거는 디버깅용이므로 상위 10만개의 row만 추출하도록 수정 
# 방법1. index column 이 있으면 Between 사용 : 
# 예) df_total.where(col("actor").between(886535, 8865350))
df_total_sample = df_total.where(df_total.actor.between(800000, 800100))

# 방법2. index column 을 추가해서 자르는 방법이 있음. 
# 예) from pyspark.sql.functions import monotonically_increasing_id
# df.withColumn("id", monotonically_increasing_id()).show()

In [None]:
print((df_total_sample.count(), len(df_total_sample.columns)))

Feature name : login_count

In [None]:
import pyspark.sql.functions as func # pyspark의 유용한 기능을 사용하는 패키지 : countDistinct 사용가능
# 원하는 부분만 출력할때는 df.filter()를 사용. 

# 아이온데이터에 actor는 다르지만 actor_account가 같은 경우는 있는 것 같음. 
# 하지만 제출형태가 actor_account를 기준으로 해가지고 groupby를 아래의 변수로 진행했음. 
df_total_sample = df_total_sample.withColumnRenamed("actor_account", "account")
# 컬럼명을 변경하는 방법으로는 .select(col("count").alias("login_count"))이라는 방법을 사용합니다. 
df_total_sample_agg = df_total_sample.filter(df_total_sample['log_id'] == 103).groupBy('account').agg({'account':'count'}).withColumnRenamed("count(account)", "login_count")
# pyspark에서 join의 경우 .join(how='left', table1.id == table2.id)의 문법을 사용함. 
train_label = train_label.join(df_total_sample_agg, ['account'] , how='left')
test_label = test_label.join(df_total_sample_agg, ['account'] , how='left')

Feature name : Day_unique_count 

In [None]:
df_total_sample_agg = df_total_sample.groupby(df_total_sample.account).agg(func.countDistinct('Date')).withColumnRenamed("count(DISTINCT Date)", "Day_unique_count")
train_label = train_label.join(df_total_sample_agg, ['account'], how='left')
test_label = test_label.join(df_total_sample_agg, ['account'], how='left')

Feature name : ip_count

In [None]:
df_total_sample_agg = df_total_sample.filter(df_total_sample['log_id'] == 103).groupBy('account').agg(func.countDistinct('etc_str1')).withColumnRenamed("count(DISTINCT etc_str1)", "ip_count")
train_label = train_label.join(df_total_sample_agg, ['account'], how='left')
test_label = test_label.join(df_total_sample_agg, ['account'], how='left')

Feature name : max_level

In [None]:
df_total_sample_agg = df_total_sample.filter(df_total_sample['log_id'] == 160).groupBy('account').agg({'etc_num2':'max'}).withColumnRenamed("max(etc_num2)", "max_level")
train_label = train_label.join(df_total_sample_agg, ['account'], how='left')
test_label = test_label.join(df_total_sample_agg, ['account'], how='left')

Feature name : playtime

In [None]:
df_total_sample_agg = df_total_sample.filter(df_total_sample['log_id'] == 104).groupBy('account').agg({'etc_num7':'sum'}).withColumnRenamed("sum(etc_num7)", "playtime")
train_label = train_label.join(df_total_sample_agg, ['account'], how='left')
test_label = test_label.join(df_total_sample_agg, ['account'], how='left')

Feature name : sum_money

In [None]:
df_total_sample_agg = df_total_sample.filter(df_total_sample['log_id'] == 104).groupBy('account').agg({'etc_num1':'sum'}).withColumnRenamed("sum(etc_num1)", "sum_money")
train_label = train_label.join(df_total_sample_agg, ['account'], how='left')
test_label = test_label.join(df_total_sample_agg, ['account'], how='left')

In [None]:
print((train_label.count(), len(train_label.columns)))

multiple groupby : https://stackoverflow.com/questions/36251004/pyspark-aggregation-on-multiple-columns

Player action features
- Sit (count / ratio / per_day_count)
- Exp_get
- item_get
- money_get
- abyss_get
- Exp_repair (count / per_day_count)
- Use_portal
- Killed_bypc
- Killed_bynpc
- Teleport
- Reborn

In [None]:
def action_features(train, test, df, name, count, ratio, per_day_count, log_id, etc_num):
    if per_day_count == True:
        print("Processing per_day_count features...")
        df_agg = df.filter(df['log_id'] == log_id).groupBy(['account', 'date']).agg({'account':'count'}).withColumnRenamed("count(account)", "{}_per_day_count".format(name))
        df_agg = df_agg.groupby(['account']).agg({"{}_per_day_count".format(name):'mean'}).withColumnRenamed("mean({}_per_day_count)".format(name), "{}_per_day_count".format(name))
        train = train.join(df_agg, ['account'], how='left')
        test = test.join(df_agg, ['account'], how='left')
    if count == True:
        print("Processing count features...")
        df_agg = df.filter(df['log_id'] == log_id).groupBy('account').agg({'account':'count'}).withColumnRenamed("count(account)", "{}_count".format(name))
        train = train.join(df_agg, ['account'], how='left')
        test = test.join(df_agg, ['account'], how='left')
        #if ratio == True:
        #    # "{}_count".format(name)
        #    # Count (총) / Sum (총)
        #    # etc_num을 Sum으로 나누면 Ratio가 됨. 
        #    df_agg = df.filter(df['log_id'] == log_id).groupBy('account').agg({'account_account':'count', '{}'.format(etc_num):'sum'}).withColumnRenamed("count(account_account)", "{}_count".format(name)).withColumnRenamed("sum({})".format(etc_num), "{}_sum".format(name))
        #    df = df.join(df_agg, ['account'], how='left')
        #    df = df.withColumn('{}_ratio'.format(name), df['etc_num']/df["{}_sum".format(name)]).drop("{}_sum".format(name))
    if ratio == True:
        print("Processing ratio features...")
    else:
        pass
    print("Processing end")
    return train, test

In [None]:
# sit information
train_label, test_label = action_features(train_label, test_label, df_total_sample, name = 'sit', log_id = 118 , etc_num = 'actor_account', count = True, ratio = True, per_day_count = True )

# Exp_get information
train_label, test_label = action_features(train_label, test_label, df_total_sample, name = 'Exp_get', log_id = 143 , etc_num = 'actor_account', count = True, ratio = True, per_day_count = True )

# item_get information
train_label, test_label = action_features(train_label, test_label, df_total_sample, name = 'item_get', log_id = 225 , etc_num = 'actor_account', count = True, ratio = True, per_day_count = True )

# money_get information
train_label, test_label = action_features(train_label, test_label, df_total_sample, name = 'money_get', log_id = 187 , etc_num = 'actor_account', count = True, ratio = True, per_day_count = True )

# abyss_get information
train_label, test_label = action_features(train_label, test_label, df_total_sample, name = 'abyss_get', log_id = 156 , etc_num = 'actor_account', count = True, ratio = True, per_day_count = True )

In [None]:
# Exp_repair information
train_label, test_label = action_features(train_label, test_label, df_total_sample, name = 'Exp_repair', log_id = 148 , etc_num = 'actor_account', count = True, ratio = False, per_day_count = True )

# Use_portal False
train_label, test_label = action_features(train_label, test_label, df_total_sample, name = 'Use_portal', log_id = 151 , etc_num = 'actor_account', count = True, ratio = False, per_day_count = True )

# Killed_bypc information
train_label, test_label = action_features(train_label, test_label, df_total_sample, name = 'Killed_bypc', log_id = 137 , etc_num = 'actor_account', count = True, ratio = False, per_day_count = True )

# Killed_bynpc information
train_label, test_label = action_features(train_label, test_label, df_total_sample, name = 'Killed_bynpc', log_id = 138 , etc_num = 'actor_account', count = True, ratio = False, per_day_count = True )

# Teleport information
train_label, test_label = action_features(train_label, test_label, df_total_sample, name = 'Teleport', log_id = 142 , etc_num = 'actor_account', count = True, ratio = False, per_day_count = True )

# Reborn information
train_label, test_label = action_features(train_label, test_label, df_total_sample, name = 'Reborn', log_id = 145 , etc_num = 'actor_account', count = True, ratio = False, per_day_count = True )

In [None]:
print((train_label.count(), len(train_label.columns)))

Group activities features
- Avg_PartyTime
- GuildAct_count (계산 못하겠음)
- GuildJoin_count

In [None]:
#Avg_PartyTime
df_total_sample_agg = df_total_sample.filter(df_total_sample['log_id'] == 127).groupBy('account').agg({'etc_num7':'mean'}).withColumnRenamed("mean(etc_num12)", "Avg_PartyTime")
train_label = train_label.join(df_total_sample_agg, ['account'], how='left')
test_label = test_label.join(df_total_sample_agg, ['account'], how='left')

In [None]:
#GuildJoin_count
df_total_sample_agg = df_total_sample.filter(df_total_sample['log_id'] == 605).groupBy('account').agg({'account':'count'}).withColumnRenamed("count(account)", "GuildJoin_count")
train_label = train_label.join(df_total_sample_agg, ['account'], how='left')
test_label = test_label.join(df_total_sample_agg, ['account'], how='left')

Network measures features
- Party 
- Friend
- Trade 
- mail 
- dual 

Setting 
방법1. 
- 패키지 설치 : pip install graphframes
- 패키지 로드 : from graphframes import *
- graphframes example : https://towardsdatascience.com/graphframes-in-jupyter-a-practical-guide-9b3b346cebc5

방법2. 
하드코딩 
- In_degree : 나한테 들어오는 갯수 
- out_degree : 나에서 나가는 갯수 
- Eccentricity : The eccentricity of a node s is the longest shortest path d between this node and all other nodes t of the network:

방법3. 
- python : https://www.kirenz.com/post/2019-08-13-network_analysis/
- 이론 : http://blog.naver.com/PostView.nhn?blogId=happyrachy&logNo=221273644056&parentCategoryNo=&categoryNo=1&viewDate=&isShowPopularPosts=true&from=search
- 이론2 : https://bab2min.tistory.com/554

How to use graphframes in Jupyter notebook by referencing graphrames.jar
- https://github.com/graphframes/graphframes/issues/104#

In [None]:
def social_network_features(train, test, df, log_id, name):
    df_agg1 = df.filter(df['log_id'] == log_id).groupBy(['account']).agg({'account':'count'}).withColumnRenamed("count(account)", "{}_in_deg".format(name))
    df_agg2 = df.filter(df['log_id'] == log_id).groupBy(['target_account']).agg({'target_account':'count'}).withColumnRenamed("count(target_account)", "{}_out_deg".format(name))

    train = train.join(df_agg1, ['account'], how='left')
    test = test.join(df_agg1, ['account'], how='left')

    train = train.join(df_agg2, ['account'], how='left')
    test = test.join(df_agg2, ['account'], how='left')
    return train, test

In [None]:
# party 
train_label, test_label = social_network_features(train_label, test_label, df_total_sample_agg, log_id = 126, name = 'p')

# Friend 
train_label, test_label = social_network_features(train_label, test_label, df_total_sample_agg, log_id = 134, name = 'f')

# Dual
train_label, test_label = social_network_features(train_label, test_label, df_total_sample_agg, log_id = 158, name = 'd')

# Mail 
train_label, test_label = social_network_features(train_label, test_label, df_total_sample_agg, log_id = 229, name = 'm')

# Trade 
train_label, test_label = social_network_features(train_label, test_label, df_total_sample_agg, log_id = 210, name = 't')

In [None]:
train_label.write.csv("train_label.csv")
test_label.write.csv("test_label.csv")