In [1]:
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

In [6]:
appName = "spark"
# create context for spark
sc = SparkContext(master = 'local',appName= appName)

# create session for sql
    # use builder (start to build)
    # set appName (create a new task and give a name to it )
    # set config (config your new env , use SparkConf)
    # getOrCreate() to start the workflow
sess = SparkSession.\
            builder.\
            appName("dataLoader").\
            config(conf  = SparkConf()).\
            getOrCreate()


In [7]:
trainset  = sess.read.csv("../data/round1_ijcai_18_train_20180301.txt",sep = ' ',header =  True)

In [64]:
trainset.schema.fields

[StructField(instance_id,StringType,true),
 StructField(item_id,StringType,true),
 StructField(item_category_list,StringType,true),
 StructField(item_property_list,StringType,true),
 StructField(item_brand_id,StringType,true),
 StructField(item_city_id,StringType,true),
 StructField(item_price_level,StringType,true),
 StructField(item_sales_level,StringType,true),
 StructField(item_collected_level,StringType,true),
 StructField(item_pv_level,StringType,true),
 StructField(user_id,StringType,true),
 StructField(user_gender_id,StringType,true),
 StructField(user_age_level,StringType,true),
 StructField(user_occupation_id,StringType,true),
 StructField(user_star_level,StringType,true),
 StructField(context_id,StringType,true),
 StructField(context_timestamp,StringType,true),
 StructField(context_page_id,StringType,true),
 StructField(predict_category_property,StringType,true),
 StructField(shop_id,StringType,true),
 StructField(shop_review_num_level,StringType,true),
 StructField(shop_rev

# create columns

## user defined function

In [67]:
timestamp = trainset.select("context_timestamp")

In [68]:
import datetime
s = timestamp.first()


In [69]:
ti = timestamp.limit(100)
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

f = udf(lambda x: datetime.datetime.utcfromtimestamp(float(x)).day ,IntegerType())
ti.withColumn("day",f("context_timestamp")).show()

+-----------------+---+
|context_timestamp|day|
+-----------------+---+
|       1537236544| 18|
|       1537243232| 18|
|       1537211052| 17|
|       1537222670| 17|
|       1537271320| 18|
|       1537282855| 18|
|       1537280317| 18|
|       1537261120| 18|
|       1537208871| 17|
|       1537285390| 18|
|       1537263036| 18|
|       1537240361| 18|
|       1537285082| 18|
|       1537280484| 18|
|       1537253096| 18|
|       1537202495| 17|
|       1537265813| 18|
|       1537256335| 18|
|       1537267960| 18|
|       1537278755| 18|
+-----------------+---+
only showing top 20 rows



## withColumns : use multiple features

In [73]:
from pyspark.sql.functions import udf , array
from pyspark.sql.types import IntegerType
f = udf(lambda x: x[0]+x[1] , IntegerType())
tiwithHour.withColumn("new",f(array("day","hour"))).show()

+-----------------+---+----+---+
|context_timestamp|day|hour|new|
+-----------------+---+----+---+
|       1537236544| 18|   2| 20|
|       1537285082| 18|  15| 33|
|       1537283431| 18|  15| 33|
|       1537265363| 18|  10| 28|
|       1537263058| 18|   9| 27|
|       1537203951| 17|  17| 34|
|       1537267462| 18|  10| 28|
|       1537236361| 18|   2| 20|
|       1537258150| 18|   8| 26|
|       1537238066| 18|   2| 20|
|       1537237197| 18|   2| 20|
|       1537274454| 18|  12| 30|
|       1537241342| 18|   3| 21|
|       1537233194| 18|   1| 19|
|       1537232815| 18|   1| 19|
|       1537268997| 18|  11| 29|
|       1537251067| 18|   6| 24|
|       1537275785| 18|  13| 31|
|       1537275682| 18|  13| 31|
|       1537249147| 18|   5| 23|
+-----------------+---+----+---+
only showing top 20 rows



## mapping

In [36]:
timestamp.select("context_timestamp").rdd.map(lambda x: (x,1)).collect()

[(Row(context_timestamp='1537236544'), 1),
 (Row(context_timestamp='1537243232'), 1),
 (Row(context_timestamp='1537211052'), 1),
 (Row(context_timestamp='1537222670'), 1),
 (Row(context_timestamp='1537271320'), 1),
 (Row(context_timestamp='1537282855'), 1),
 (Row(context_timestamp='1537280317'), 1),
 (Row(context_timestamp='1537261120'), 1),
 (Row(context_timestamp='1537208871'), 1),
 (Row(context_timestamp='1537285390'), 1),
 (Row(context_timestamp='1537263036'), 1),
 (Row(context_timestamp='1537240361'), 1),
 (Row(context_timestamp='1537285082'), 1),
 (Row(context_timestamp='1537280484'), 1),
 (Row(context_timestamp='1537253096'), 1),
 (Row(context_timestamp='1537202495'), 1),
 (Row(context_timestamp='1537265813'), 1),
 (Row(context_timestamp='1537256335'), 1),
 (Row(context_timestamp='1537267960'), 1),
 (Row(context_timestamp='1537278755'), 1),
 (Row(context_timestamp='1537282521'), 1),
 (Row(context_timestamp='1537281674'), 1),
 (Row(context_timestamp='1537277481'), 1),
 (Row(conte

## useful method

In [12]:
## rows
ti.count()

100

In [14]:
## distinct
ti.distinct()

# or 
ti.select("context_timestamp").dropDuplicates()

DataFrame[context_timestamp: string]

In [15]:
## fillna 
ti.fillna(-1)

DataFrame[context_timestamp: string]

# groupby and join method

In [37]:
ti = timestamp.sample(False,0.1)
day = udf(lambda x: datetime.datetime.utcfromtimestamp(float(x)).day ,IntegerType())
hour = udf(lambda x: datetime.datetime.utcfromtimestamp(float(x)).hour ,IntegerType())
tiwithDay = ti.withColumn("day",day("context_timestamp"))
tiwithHour = tiwithDay.withColumn("hour",hour("context_timestamp"))


In [38]:
# simple groupby
tiwithDay.groupby("day").count().show()

+---+-----+
|day|count|
+---+-----+
| 22| 6789|
| 20| 6924|
| 19| 7021|
| 17|  684|
| 23| 6237|
| 24| 5240|
| 21| 7184|
| 18| 7917|
+---+-----+



In [29]:
# groupby and use other agg
tiwithDay.groupby("day").agg({"context_timestamp":"mean","context_timestamp":"max"}).show()

+---+----------------------+
|day|max(context_timestamp)|
+---+----------------------+
| 22|            1537660759|
| 20|            1537487999|
| 19|            1537401559|
| 17|            1537228798|
| 23|            1537747199|
| 24|            1537804763|
| 21|            1537574391|
| 18|            1537315189|
+---+----------------------+



In [48]:
import  pyspark.sql.functions  as F
tiwithHour.groupby('day','hour').agg(
    F.mean("context_timestamp"),
    F.count("context_timestamp")
    
).show()

+---+----+----------------------+------------------------+
|day|hour|avg(context_timestamp)|count(context_timestamp)|
+---+----+----------------------+------------------------+
| 23|   6|  1.5376842216923077E9|                     403|
| 24|  10|  1.5377851129103942E9|                     279|
| 24|   9|   1.537781369686567E9|                     268|
| 22|   8|  1.5376049720107527E9|                     372|
| 18|  16|   1.537287913372549E9|                     153|
| 22|   9|  1.5376085462010581E9|                     378|
| 18|   1|  1.5372342539483695E9|                     368|
| 22|   6|  1.5375976811907218E9|                     388|
| 20|  22|    1.53748297215625E9|                      64|
| 19|   2|   1.537324245409742E9|                     349|
| 22|  23|  1.5376592308915663E9|                     166|
| 20|   2|   1.537410550621134E9|                     388|
| 23|  17|  1.5377234538958333E9|                      48|
| 21|  11|   1.537529470607004E9|                     51

## groupby to support user defined function:

In [57]:

# use collect_list and collect_set
dayHour = tiwithHour.groupby('day').agg(F.collect_list("hour"))


In [61]:
dayHour.show(5)

+---+--------------------+
|day|  collect_list(hour)|
+---+--------------------+
| 22|[14, 11, 11, 13, ...|
| 20|[23, 22, 17, 16, ...|
| 19|[10, 1, 9, 5, 4, ...|
| 17|[17, 17, 22, 23, ...|
| 23|[11, 12, 13, 11, ...|
+---+--------------------+
only showing top 5 rows



In [62]:
tiwithDay.show(5)

+-----------------+---+
|context_timestamp|day|
+-----------------+---+
|       1537236544| 18|
|       1537285082| 18|
|       1537283431| 18|
|       1537265363| 18|
|       1537263058| 18|
+-----------------+---+
only showing top 5 rows



# join

In [66]:
tiwithDay.join(dayHour,tiwithDay.day == dayHour.day,how = 'left').show()

+-----------------+---+---+--------------------+
|context_timestamp|day|day|  collect_list(hour)|
+-----------------+---+---+--------------------+
|       1537236544| 18| 18|[2, 15, 15, 10, 9...|
|       1537285082| 18| 18|[2, 15, 15, 10, 9...|
|       1537283431| 18| 18|[2, 15, 15, 10, 9...|
|       1537265363| 18| 18|[2, 15, 15, 10, 9...|
|       1537263058| 18| 18|[2, 15, 15, 10, 9...|
|       1537203951| 17| 17|[17, 17, 22, 23, ...|
|       1537267462| 18| 18|[2, 15, 15, 10, 9...|
|       1537236361| 18| 18|[2, 15, 15, 10, 9...|
|       1537258150| 18| 18|[2, 15, 15, 10, 9...|
|       1537238066| 18| 18|[2, 15, 15, 10, 9...|
|       1537237197| 18| 18|[2, 15, 15, 10, 9...|
|       1537274454| 18| 18|[2, 15, 15, 10, 9...|
|       1537241342| 18| 18|[2, 15, 15, 10, 9...|
|       1537233194| 18| 18|[2, 15, 15, 10, 9...|
|       1537232815| 18| 18|[2, 15, 15, 10, 9...|
|       1537268997| 18| 18|[2, 15, 15, 10, 9...|
|       1537251067| 18| 18|[2, 15, 15, 10, 9...|
|       1537275785| 

# feature transform

## 离散变量的处理方法

      - OneHotEncoderEstimator
      - StringIndexer
      - indexToString

## 连续变量的处理方法
      
      - bucketizer
      - interaction:很牛逼：不确定是否可以处理稀疏矩阵交互
      - normalizer :数据进行归一化（保证列具有norm = 1）
      - standerdScaler: unit std and zero mean
      - minmaxScaler
      - elementwiseProduct


## one hot transformer

In [14]:
trainset.schema

StructType(List(StructField(instance_id,StringType,true),StructField(item_id,StringType,true),StructField(item_category_list,StringType,true),StructField(item_property_list,StringType,true),StructField(item_brand_id,StringType,true),StructField(item_city_id,StringType,true),StructField(item_price_level,StringType,true),StructField(item_sales_level,StringType,true),StructField(item_collected_level,StringType,true),StructField(item_pv_level,StringType,true),StructField(user_id,StringType,true),StructField(user_gender_id,StringType,true),StructField(user_age_level,StringType,true),StructField(user_occupation_id,StringType,true),StructField(user_star_level,StringType,true),StructField(context_id,StringType,true),StructField(context_timestamp,StringType,true),StructField(context_page_id,StringType,true),StructField(predict_category_property,StringType,true),StructField(shop_id,StringType,true),StructField(shop_review_num_level,StringType,true),StructField(shop_review_positive_rate,StringTyp

In [20]:
trainset.select("item_brand_id").distinct().show()

+-------------------+
|      item_brand_id|
+-------------------+
|4553478234358480210|
|3558483762596688694|
|7844054043114368242|
|4770150641133846549|
|4475274987980152756|
|2370561303120881603|
|3196245114618206915|
|3232465741406156427|
|5502112173726415383|
|3889178859023712903|
|4142193175809505979|
|1779279822658813774|
|   3718718487310364|
|5609686233480469348|
|5402555686380756010|
|2753949486266415812|
|6484835263364658312|
|4911434975952515346|
|9143916258066354406|
|2284060312728715872|
+-------------------+
only showing top 20 rows



## 对item_brand_id 进行onehot编码

In [57]:
# 都在feature 类中
from pyspark.ml.feature import OneHotEncoderEstimator,StringIndexer
from pyspark.ml import Pipeline



trainset  = sess.read.csv("../data/round1_ijcai_18_train_20180301.txt",sep = ' ',header =  True)

needCoded = ['item_brand_id','shop_id']
needCodedIdx = [ col + 'Idx' for col in needCoded]
needCodedIdxOnehot = [ i+'_OneHot' for i in needCodedIdx]
indexer = [  StringIndexer(inputCol= col , outputCol= col + 'Idx' )
               for col in needCoded
          ]
onehot  = [ OneHotEncoderEstimator(
                inputCols = [i.getOutputCol()],
                outputCols = [i.getOutputCol() + '_OneHot']
            )
            for i in indexer
]

pipeLine = Pipeline(stages  = indexer + onehot)

trainset2 =  pipeLine.fit(trainset).transform(trainset)
trainset2 = trainset2.drop(*(needCoded+needCodedIdx))

In [56]:
trainset2.show()

+-------------------+-------------------+--------------------+--------------------+-------------------+----------------+----------------+--------------------+-------------+-------------------+--------------+--------------+------------------+---------------+-------------------+-----------------+---------------+-------------------------+---------------------+-------------------------+---------------+------------------+-------------------+----------------------+--------+
|        instance_id|            item_id|  item_category_list|  item_property_list|       item_city_id|item_price_level|item_sales_level|item_collected_level|item_pv_level|            user_id|user_gender_id|user_age_level|user_occupation_id|user_star_level|         context_id|context_timestamp|context_page_id|predict_category_property|shop_review_num_level|shop_review_positive_rate|shop_star_level|shop_score_service|shop_score_delivery|shop_score_description|is_trade|
+-------------------+-------------------+-------------

## 关于编码的还原





In [76]:
from pyspark.ml.feature import IndexToString,StringIndexer
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
import datetime
f = udf(lambda x: datetime.datetime.utcfromtimestamp(float(x)).day ,IntegerType())
trainset  = sess.read.csv("../data/round1_ijcai_18_train_20180301.txt",sep = ' ',header =  True)
trainset = trainset.withColumn("day",f("context_timestamp"))

brand_indexer = StringIndexer(inputCol= 'item_brand_id',outputCol='item_brand_idIdx')
trainset = brand_indexer.fit(trainset).transform(trainset)



In [77]:
brand_indexer.getInputCol(),brand_indexer.getOutputCol()

('item_brand_id', 'item_brand_idIdx')

In [86]:
brandName = IndexToString(inputCol=brand_indexer.getOutputCol(),outputCol='brand_original')
brandName.transform(trainset).select('item_brand_idIdx','brand_original').limit(2).show()

+----------------+-------------------+
|item_brand_idIdx|     brand_original|
+----------------+-------------------+
|           607.0|1975590437749032870|
|           607.0|1975590437749032870|
+----------------+-------------------+



## VectorAssembler

In [70]:
# before you build your model  , you need to assemble your vector
from pyspark.ml.feature import VectorAssembler


# 注意： vectorassembler 只会搞数值型
assemble = VectorAssembler(inputCols= needCodedIdxOnehot,outputCol='feature')
assemble.transform(trainset2).select('feature').first()

Row(feature=SparseVector(6012, {607: 1.0, 3903: 1.0}))

[StructField(instance_id,StringType,true),
 StructField(item_id,StringType,true),
 StructField(item_category_list,StringType,true),
 StructField(item_property_list,StringType,true),
 StructField(item_city_id,StringType,true),
 StructField(item_price_level,StringType,true),
 StructField(item_sales_level,StringType,true),
 StructField(item_collected_level,StringType,true),
 StructField(item_pv_level,StringType,true),
 StructField(user_id,StringType,true),
 StructField(user_gender_id,StringType,true),
 StructField(user_age_level,StringType,true),
 StructField(user_occupation_id,StringType,true),
 StructField(user_star_level,StringType,true),
 StructField(context_id,StringType,true),
 StructField(context_timestamp,StringType,true),
 StructField(context_page_id,StringType,true),
 StructField(predict_category_property,StringType,true),
 StructField(shop_review_num_level,StringType,true),
 StructField(shop_review_positive_rate,StringType,true),
 StructField(shop_star_level,StringType,true),
 

In [117]:
# user defined function in pysparkSQL.ipynb

In [131]:
from pyspark.ml.feature import SQLTransformer

userfunction = sess.udf.register(
    "userfunction",
    lambda x: "success" if x == 0 else "failure",
    "string"
)

transHour = [SQLTransformer(
    statement = "select * , userfunction(is_trade) from __THIS__"
)]

#  transHour[0].transform(trainset2)

pip  = Pipeline(stages= transHour)
pip.fit(trainset2).transform(trainset2)

DataFrame[instance_id: string, item_id: string, item_category_list: string, item_property_list: string, item_city_id: string, item_price_level: string, item_sales_level: string, item_collected_level: string, item_pv_level: string, user_id: string, user_gender_id: string, user_age_level: string, user_occupation_id: string, user_star_level: string, context_id: string, context_timestamp: string, context_page_id: string, predict_category_property: string, shop_review_num_level: string, shop_review_positive_rate: string, shop_star_level: string, shop_score_service: string, shop_score_delivery: string, shop_score_description: string, is_trade: string, item_brand_idIdx_OneHot: vector, shop_idIdx_OneHot: vector, userfunction(is_trade): string]

In [123]:
trainset2.schema.fields

[StructField(instance_id,StringType,true),
 StructField(item_id,StringType,true),
 StructField(item_category_list,StringType,true),
 StructField(item_property_list,StringType,true),
 StructField(item_city_id,StringType,true),
 StructField(item_price_level,StringType,true),
 StructField(item_sales_level,StringType,true),
 StructField(item_collected_level,StringType,true),
 StructField(item_pv_level,StringType,true),
 StructField(user_id,StringType,true),
 StructField(user_gender_id,StringType,true),
 StructField(user_age_level,StringType,true),
 StructField(user_occupation_id,StringType,true),
 StructField(user_star_level,StringType,true),
 StructField(context_id,StringType,true),
 StructField(context_timestamp,StringType,true),
 StructField(context_page_id,StringType,true),
 StructField(predict_category_property,StringType,true),
 StructField(shop_review_num_level,StringType,true),
 StructField(shop_review_positive_rate,StringType,true),
 StructField(shop_star_level,StringType,true),
 

      

## 缺失值处理
      
      

In [97]:
from pyspark.ml.feature import  Imputer
i = Imputer(inputCols=["shop_star_level"],outputCols=["shop_star_level"])
i.fit(trainset2)

IllegalArgumentException: 'requirement failed: Column shop_star_level must be of type equal to one of the following types: [DoubleType, FloatType] but was actually of type StringType.'

In [96]:
trainset2.schema.fields

[StructField(instance_id,StringType,true),
 StructField(item_id,StringType,true),
 StructField(item_category_list,StringType,true),
 StructField(item_property_list,StringType,true),
 StructField(item_city_id,StringType,true),
 StructField(item_price_level,StringType,true),
 StructField(item_sales_level,StringType,true),
 StructField(item_collected_level,StringType,true),
 StructField(item_pv_level,StringType,true),
 StructField(user_id,StringType,true),
 StructField(user_gender_id,StringType,true),
 StructField(user_age_level,StringType,true),
 StructField(user_occupation_id,StringType,true),
 StructField(user_star_level,StringType,true),
 StructField(context_id,StringType,true),
 StructField(context_timestamp,StringType,true),
 StructField(context_page_id,StringType,true),
 StructField(predict_category_property,StringType,true),
 StructField(shop_review_num_level,StringType,true),
 StructField(shop_review_positive_rate,StringType,true),
 StructField(shop_star_level,StringType,true),
 

# 关于train_test split 和 train_test concat

In [116]:
train_set , test_set = trainset.randomSplit([0.8,0.2])

# 关于表的修改

## 数据类型的修改

In [100]:
from pyspark.sql.types import DoubleType
trainset.withColumn("shop_star_level_double",trainset.shop_star_level.cast(DoubleType()))

DataFrame[instance_id: string, item_id: string, item_category_list: string, item_property_list: string, item_brand_id: string, item_city_id: string, item_price_level: string, item_sales_level: string, item_collected_level: string, item_pv_level: string, user_id: string, user_gender_id: string, user_age_level: string, user_occupation_id: string, user_star_level: string, context_id: string, context_timestamp: string, context_page_id: string, predict_category_property: string, shop_id: string, shop_review_num_level: string, shop_review_positive_rate: string, shop_star_level: string, shop_score_service: string, shop_score_delivery: string, shop_score_description: string, is_trade: string, day: int, item_brand_idIdx: double, shop_star_level_double: double]

## 数据的修改

In [105]:
trainset.groupby("user_age_level").count().show()

+--------------+------+
|user_age_level| count|
+--------------+------+
|            -1|   964|
|          1004|126131|
|          1006| 30190|
|          1007|  5217|
|          1000| 12958|
|          1005| 56608|
|          1003|169449|
|          1002| 70175|
|          1001|  6446|
+--------------+------+



In [114]:
import numpy as np
import pyspark.sql.functions as F
trainset.withColumn("new_user_age_level",F.when(
    trainset.user_age_level != -1,trainset.user_age_level
).otherwise(np.nan)).groupby("new_user_age_level").count().show()



+------------------+------+
|new_user_age_level| count|
+------------------+------+
|              1004|126131|
|              1006| 30190|
|              1007|  5217|
|              1000| 12958|
|              1005| 56608|
|               NaN|   964|
|              1003|169449|
|              1002| 70175|
|              1001|  6446|
+------------------+------+

