In [34]:
import time
import gc
import numpy as np
import pandas as pd
#import lightgbm as lgb
import seaborn as sns
import matplotlib.pyplot as plt
plt.style.use('ggplot')
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import udf
from pyspark.sql.functions import *
from pyspark.sql.functions import udf
from pyspark.sql import functions
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType
from pyspark import SparkContext
from pyspark.sql import SparkSession

from scipy.sparse import hstack, csr_matrix
from sklearn.metrics import log_loss
from contextlib import contextmanager

from sklearn.preprocessing import LabelEncoder,OneHotEncoder
from sklearn.decomposition import PCA

@contextmanager
def timer(name):
    start_time = time.time()
    yield
    print('[{name} done in {time.time() - start_time:.2f} s]')

In [35]:
# ------------------------------------------------------------------------------------
# Following functions basically add groupby aggregation statistics, 
# then merge with original data frame
def equalDepthNormal(data, feature_name="null", n=200):
    # 连续特征等频归一化 分为n个bucket(每个bucket样本数相同)
    if (feature_name != "null"):
        TOTAL_NUM = len(data)
        data = (data-data.mean())/data.std()
    return data

def divide_vector(data):
    del data['item_id']
    del data['item_brand_id']
    vector_list = ['item_id_vec', 'item_brand_id_vec']
    for feat in vector_list:
        for i in range(50):
            data[feat+'_'+str(i)] = data[feat].apply(lambda x: x[i])
        del data[feat]
    return data

def add_count(df, cols, cname, value):
    df_count = pd.DataFrame(df.groupby(cols)[value].count()).reset_index()
    df_count.columns = cols + [cname]
    df = df.merge(df_count, on=cols, how='left')
    del df_count
    gc.collect()
    return df

def add_mean(df, cols, cname, value):
    df_mean = pd.DataFrame(df.groupby(cols)[value].mean()).reset_index()
    df_mean.columns = cols + [cname]
    df = df.merge(df_mean, on=cols, how='left')
    del df_mean
    gc.collect()
    return df

def add_std(df, cols, cname, value):
    df_std = pd.DataFrame(df.groupby(cols)[value].std()).reset_index()
    df_std.columns = cols + [cname]
    df = df.merge(df_std, on=cols, how='left')
    del df_std
    gc.collect()
    return df

def add_nunique(df, cols, cname, value):
    df_nunique = pd.DataFrame(df.groupby(cols)[value].nunique()).reset_index()
    df_nunique.columns = cols + [cname]
    df = df.merge(df_nunique, on=cols, how='left')
    del df_nunique
    gc.collect()
    return df
    
def add_cumcount(df, cols, cname):
    df[cname] = df.groupby(cols).cumcount() + 1
    return df

# ------------------------------------------------------------------------------------
# Following functions are dealing with predicted category and property:
# calculate the number of correctly predicted category / property and
# precision / recall

def true_predict_count(true_lst, pred_lst):
    items, cnt = true_lst.split(';'), 0
    for i in pred_lst:
        if i in items:
            cnt += 1
    return cnt

def true_predict_precision(true_lst, pred_lst):
    return true_predict_count(true_lst, pred_lst) / len(pred_lst)

def true_predict_recall(true_lst, pred_lst):
    return true_predict_count(true_lst, pred_lst) / len(true_lst.split(';'))

In [36]:
with timer('Read train and test'):
    train_data = sc.textFile("./data/train.txt")
    test_data = sc.textFile("./data/test.txt")
    train_pairs = train_data.map(lambda m: tuple(m.split(' ')))
    train_pairs.cache()
    test_pairs = test_data.map(lambda m: tuple(m.split(' ')))
    test_pairs.cache()
    # Create the DataFrame from an RDD of tuples
    train_header = ['instance_id', 'item_id', 'item_category_list', \
                    'item_property_list', 'item_brand_id', 'item_city_id', 'item_price_level', \
                    'item_sales_level', 'item_collected_level', 'item_pv_level', 'user_id', 'user_gender_id', \
                    'user_age_level', 'user_occupation_id', 'user_star_level', 'context_id', 'context_timestamp', \
                    'context_page_id', 'predict_category_property', 'shop_id', 'shop_review_num_level', \
                    'shop_review_positive_rate', 'shop_star_level', u'shop_score_service', 'shop_score_delivery', \
                    'shop_score_description', 'is_trade']
    test_header = test_pairs.collect()[0]
    train_df = spark.createDataFrame(train_pairs, train_header)
    test_df = spark.createDataFrame(test_pairs, test_header)
    train = train_df.filter("instance_id!='instance_id'")  
    test = test_df.filter("instance_id!='instance_id'")
    # sort by context_timestamp
    train = train.orderBy('context_timestamp')
    categorical_list = []
    categorical_list.append('item_category')
    split = udf(lambda x: x.split(";")[1])
    split1 = udf(lambda x: x.split(";")[i] if len(x.split(";")) > i else " ")
    train = train.select('*', split(train['item_category_list']).alias('item_category'))
    for i in range(1,3):
        categorical_list.append('item_property_'+str(i))
        train = train.select('*', split1(train['item_property_list']).alias('item_property_%d' % (i)))
    le = LabelEncoder()
    encode = udf(lambda l: le.fit_transform(l))
    for i in categorical_list:
        train = train.select('*', encode(train[i]))
    train = train.drop('item_category','item_property_1','item_property_2')
    # split train-validation with 9:1
    train_ = train.limit(430324)

[{name} done in {time.time() - start_time:.2f} s]


In [41]:
with timer('Feature engineering'):
    le = LabelEncoder()
    df_full = [train, test]
    df_full_processed = []
    # get the customers (users) gender ratio for each shop
    #df_shop_gender_ratio = train.groupby(['shop_id'])['user_gender_id']\
                            #.agg([lambda x: np.mean(x == 0)])\
                            #.rename(columns={'<lambda>': 'shop_user_gender_ratio'})
    df_shop_gender_ratio = train.select('user_gender_id','shop_id').groupby('shop_id','user_gender_id').count()
    df_shop_gender_ratio.show(10)
    
    # get the average age level of customers for each shop
    df_shop_avg_age_level = train.select('user_age_level','shop_id').groupby('shop_id','user_age_level').avg()
    df_shop_avg_age_level = df_shop_avg_age_level.select('*',df_shop_avg_age_level.user_age_level.cast('int').alias('user_age_level_int'))
    df_shop_avg_age_level = df_shop_avg_age_level.agg(avg('user_age_level_int'))
    df_shop_avg_age_level.show(5)
    df_shop_avg_age_level.printSchema()
    
    for df in df_full:
        # first item_category are same for all samples
        
        #df['context_page_id'] = df['context_page_id'] % 4000
        df = df.agg(df['context_page_id'] % 4000)

+-------------------+--------------+-----+
|            shop_id|user_gender_id|count|
+-------------------+--------------+-----+
|6622647717976293081|             1|   25|
|4261109238019410948|            -1|   10|
|8083840712256217698|            -1|    4|
|4132428120266854135|             0|   90|
|6764523132891870704|            -1|    2|
|1239481181711478081|             2|   17|
|4791188645345883463|             1|   34|
| 586859544414558419|             1|    1|
|4215742678121737516|            -1|   68|
|4456737157684315890|             0|   20|
+-------------------+--------------+-----+
only showing top 10 rows

+-----------------------+
|avg(user_age_level_int)|
+-----------------------+
|      978.6990072444326|
+-----------------------+

root
 |-- avg(user_age_level_int): double (nullable = true)



AnalysisException: u"grouping expressions sequence is empty, and '`context_page_id`' is not an aggregate function. Wrap '()' in windowing function(s) or wrap '`context_page_id`' in first() (or first_value) if you don't care which value you get.;;\nAggregate [(cast(context_page_id#1082 as double) % cast(4000 as double)) AS (context_page_id % 4000)#1645]\n+- Project [instance_id#1065, item_id#1066, item_category_list#1067, item_property_list#1068, item_brand_id#1069, item_city_id#1070, item_price_level#1071, item_sales_level#1072, item_collected_level#1073, item_pv_level#1074, user_id#1075, user_gender_id#1076, user_age_level#1077, user_occupation_id#1078, user_star_level#1079, context_id#1080, context_timestamp#1081, context_page_id#1082, predict_category_property#1083, shop_id#1084, shop_review_num_level#1085, shop_review_positive_rate#1086, shop_star_level#1087, shop_score_service#1088, ... 6 more fields]\n   +- Project [instance_id#1065, item_id#1066, item_category_list#1067, item_property_list#1068, item_brand_id#1069, item_city_id#1070, item_price_level#1071, item_sales_level#1072, item_collected_level#1073, item_pv_level#1074, user_id#1075, user_gender_id#1076, user_age_level#1077, user_occupation_id#1078, user_star_level#1079, context_id#1080, context_timestamp#1081, context_page_id#1082, predict_category_property#1083, shop_id#1084, shop_review_num_level#1085, shop_review_positive_rate#1086, shop_star_level#1087, shop_score_service#1088, ... 9 more fields]\n      +- Project [instance_id#1065, item_id#1066, item_category_list#1067, item_property_list#1068, item_brand_id#1069, item_city_id#1070, item_price_level#1071, item_sales_level#1072, item_collected_level#1073, item_pv_level#1074, user_id#1075, user_gender_id#1076, user_age_level#1077, user_occupation_id#1078, user_star_level#1079, context_id#1080, context_timestamp#1081, context_page_id#1082, predict_category_property#1083, shop_id#1084, shop_review_num_level#1085, shop_review_positive_rate#1086, shop_star_level#1087, shop_score_service#1088, ... 8 more fields]\n         +- Project [instance_id#1065, item_id#1066, item_category_list#1067, item_property_list#1068, item_brand_id#1069, item_city_id#1070, item_price_level#1071, item_sales_level#1072, item_collected_level#1073, item_pv_level#1074, user_id#1075, user_gender_id#1076, user_age_level#1077, user_occupation_id#1078, user_star_level#1079, context_id#1080, context_timestamp#1081, context_page_id#1082, predict_category_property#1083, shop_id#1084, shop_review_num_level#1085, shop_review_positive_rate#1086, shop_star_level#1087, shop_score_service#1088, ... 7 more fields]\n            +- Project [instance_id#1065, item_id#1066, item_category_list#1067, item_property_list#1068, item_brand_id#1069, item_city_id#1070, item_price_level#1071, item_sales_level#1072, item_collected_level#1073, item_pv_level#1074, user_id#1075, user_gender_id#1076, user_age_level#1077, user_occupation_id#1078, user_star_level#1079, context_id#1080, context_timestamp#1081, context_page_id#1082, predict_category_property#1083, shop_id#1084, shop_review_num_level#1085, shop_review_positive_rate#1086, shop_star_level#1087, shop_score_service#1088, ... 6 more fields]\n               +- Project [instance_id#1065, item_id#1066, item_category_list#1067, item_property_list#1068, item_brand_id#1069, item_city_id#1070, item_price_level#1071, item_sales_level#1072, item_collected_level#1073, item_pv_level#1074, user_id#1075, user_gender_id#1076, user_age_level#1077, user_occupation_id#1078, user_star_level#1079, context_id#1080, context_timestamp#1081, context_page_id#1082, predict_category_property#1083, shop_id#1084, shop_review_num_level#1085, shop_review_positive_rate#1086, shop_star_level#1087, shop_score_service#1088, ... 5 more fields]\n                  +- Project [instance_id#1065, item_id#1066, item_category_list#1067, item_property_list#1068, item_brand_id#1069, item_city_id#1070, item_price_level#1071, item_sales_level#1072, item_collected_level#1073, item_pv_level#1074, user_id#1075, user_gender_id#1076, user_age_level#1077, user_occupation_id#1078, user_star_level#1079, context_id#1080, context_timestamp#1081, context_page_id#1082, predict_category_property#1083, shop_id#1084, shop_review_num_level#1085, shop_review_positive_rate#1086, shop_star_level#1087, shop_score_service#1088, ... 4 more fields]\n                     +- Sort [context_timestamp#1081 ASC NULLS FIRST], true\n                        +- Filter NOT (instance_id#1065 = instance_id)\n                           +- LogicalRDD [instance_id#1065, item_id#1066, item_category_list#1067, item_property_list#1068, item_brand_id#1069, item_city_id#1070, item_price_level#1071, item_sales_level#1072, item_collected_level#1073, item_pv_level#1074, user_id#1075, user_gender_id#1076, user_age_level#1077, user_occupation_id#1078, user_star_level#1079, context_id#1080, context_timestamp#1081, context_page_id#1082, predict_category_property#1083, shop_id#1084, shop_review_num_level#1085, shop_review_positive_rate#1086, shop_star_level#1087, shop_score_service#1088, ... 3 more fields]\n"