In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars xgboost4j-spark-0.90.jar,xgboost4j-0.90.jar pyspark-shell'

In [3]:
import findspark
findspark.init()
master_url = "yarn"


In [3]:
# pyspark + xgboost test
from pyspark.sql import SparkSession, types
spark = SparkSession\
        .builder\
        .appName("PySpark XGBOOST Titanic")\
        .master(master_url) \
        .getOrCreate()

spark.sparkContext.addPyFile("sparkxgb.zip")


from sparkxgb import XGBoostClassifier
xgboost = XGBoostClassifier(
    featuresCol="features", 
    labelCol="Survival", 
    predictionCol="prediction"
)


spark.stop()

In [4]:
# create spark session
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .master(master_url) \
        .appName("Training")
spark = spark.getOrCreate()

In [5]:
# read data
import pyspark
from pyspark.sql import types
from pyspark.sql.types import StructType, StructField

def get_schema():
    return StructType([
        # user names
        StructField("encoded_ghost_user_id", types.StringType(), nullable=True),
        StructField("encoded_ghost_friend_user_id", types.StringType(), nullable=True),
        # user feature
        StructField("user_contact_book_size", types.IntegerType(), nullable=True),
        StructField("user_snapchatter_in_contact_size", types.IntegerType(), nullable=True),
        StructField("user_story_post_score_7d", types.IntegerType(), nullable=True),
        StructField("user_snap_sent_score_7d", types.IntegerType(), nullable=True),
        # friend feature
        StructField("friend_contact_book_size", types.IntegerType(), nullable=True),
        StructField("friend_snapchatter_in_contact_size", types.IntegerType(), nullable=True),
        StructField("friend_story_post_score_7d", types.IntegerType(), nullable=True),
        StructField("friend_snap_sent_score_7d", types.IntegerType(), nullable=True),
        # edge feature
        StructField("friend_counting_score", types.IntegerType(), nullable=True),
        StructField("contact_book_similarity_score", types.IntegerType(), nullable=True),
        StructField("contact_book_normalized_similarity_score", types.IntegerType(), nullable=True),
        StructField("is_friend_in_users_contact_book", types.BooleanType(), nullable=True),
        # labels
        StructField("friend_addition_result", types.BooleanType(), nullable=False),
        StructField("friend_reciprocation_result", types.BooleanType(), nullable=False),
        StructField("total_snap_sent", types.IntegerType(), nullable=False),
        StructField("total_snap_sent_from_friend", types.IntegerType(), nullable=False),
        StructField("total_story_viewed", types.StringType(), nullable=False),
        StructField("total_story_viewed_from_friend", types.StringType(), nullable=False),
    ])

data = spark.read.csv("gs://shuyi-test/reg_model/data.csv",
                      header=True,
                      nullValue="MISSING_VALUE",
                      schema=get_schema())
data = data.withColumn("friend_addition_result", data["friend_addition_result"].cast(types.DoubleType()))
data = data.withColumn("friend_reciprocation_result", data["friend_reciprocation_result"].cast(types.DoubleType()))
data = data[data.columns[:-2]] # drop story engagement data (last 2 columns)
data = data.cache()


In [6]:
# This row is reserved for manually checking data only
#data.describe().show()
print(data.count())
data.na.drop()
print(data.count())
data.columns

2524366
2524366


['encoded_ghost_user_id',
 'encoded_ghost_friend_user_id',
 'user_contact_book_size',
 'user_snapchatter_in_contact_size',
 'user_story_post_score_7d',
 'user_snap_sent_score_7d',
 'friend_contact_book_size',
 'friend_snapchatter_in_contact_size',
 'friend_story_post_score_7d',
 'friend_snap_sent_score_7d',
 'friend_counting_score',
 'contact_book_similarity_score',
 'contact_book_normalized_similarity_score',
 'is_friend_in_users_contact_book',
 'friend_addition_result',
 'friend_reciprocation_result',
 'total_snap_sent',
 'total_snap_sent_from_friend']

In [8]:
# Train a random forest model
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier

assembler = VectorAssembler(
    inputCols=["user_contact_book_size", "user_snapchatter_in_contact_size", "user_story_post_score_7d", "user_snap_sent_score_7d",
               "friend_contact_book_size", "friend_snapchatter_in_contact_size", "friend_story_post_score_7d", "friend_snap_sent_score_7d",
               "friend_counting_score", "contact_book_similarity_score", "contact_book_normalized_similarity_score", "is_friend_in_users_contact_book",
              ],
    outputCol="features")

rf = RandomForestClassifier(featuresCol="features", labelCol="friend_addition_result")

pipeline = Pipeline(stages=[assembler, rf])
pipeline.fit(data)


PipelineModel_c9c0771ff1c2

In [7]:
# Train a xgboost model
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, StandardScaler
from pyspark.ml import Pipeline
spark.sparkContext.addPyFile("sparkxgb.zip") # read xgboost pyspark client lib
from sparkxgb import XGBoostClassifier

assembler = VectorAssembler(
    inputCols=["user_contact_book_size", "user_snapchatter_in_contact_size", "user_story_post_score_7d", "user_snap_sent_score_7d",
               "friend_contact_book_size", "friend_snapchatter_in_contact_size", "friend_story_post_score_7d", "friend_snap_sent_score_7d",
               "friend_counting_score", "contact_book_similarity_score", "contact_book_normalized_similarity_score", "is_friend_in_users_contact_book",
              ],
    outputCol="features")

xgboost = XGBoostClassifier(
    objective="reg:logistic",
    maxDepth=3,
    missing=float(0.0),
    featuresCol="features", 
    labelCol="friend_addition_result", 
)

# pipeline = Pipeline(stages=[assembler, xgboost])
# trained_model = pipeline.fit(data)

td = assembler.transform(data)
trained_raw_model = xgboost.fit(td)


In [8]:
# result = trained_model.transform(data)
# result.select(["friend_addition_result", "rawPrediction", "probability", "prediction"]).show()

result = trained_raw_model.transform(td)
result.select(["friend_addition_result", "rawPrediction", "probability", "prediction"]).show()

trained_raw_model.

+----------------------+--------------------+--------------------+----------+
|friend_addition_result|       rawPrediction|         probability|prediction|
+----------------------+--------------------+--------------------+----------+
|                   0.0|[0.36648640036582...|[0.59060969948768...|       0.0|
|                   0.0|[0.44295924901962...|[0.60896393656730...|       0.0|
|                   0.0|[0.49293401837348...|[0.62079739570617...|       0.0|
|                   0.0|[0.36648640036582...|[0.59060969948768...|       0.0|
|                   0.0|[0.36648640036582...|[0.59060969948768...|       0.0|
|                   1.0|[0.44295924901962...|[0.60896393656730...|       0.0|
|                   0.0|[0.36648640036582...|[0.59060969948768...|       0.0|
|                   0.0|[0.36648640036582...|[0.59060969948768...|       0.0|
|                   0.0|[0.36648640036582...|[0.59060969948768...|       0.0|
|                   1.0|[0.36648640036582...|[0.59060969948768..

In [11]:
# save trained model to local disk
trained_raw_model.nativeBooster.saveModel("outputmodel.xgboost")