# 导入库

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types as typ
from pyspark import SparkContext
from pyspark.sql.types import StructType, StructField
import os


# 设置环境变量

In [2]:
os.environ['JAVA_HOME']='/usr/lib/jdk/jdk1.8.0_191'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars xgboost4j-spark-0.90.jar,xgboost4j-0.90.jar pyspark-shell'
os.environ['SPARK_HOME']='/usr/lib/spark/spark-2.4.4-bin-hadoop2.7'
#import findspark
#findspark.init()

# pyspark初始化
- SParkSession：这是一切spark程序的入口

在 spark1.x 中，SparkContext 是 spark 的主要切入点，由于 RDD 作为主要的 API，我们通过 SparkContext 来创建和操作 RDD,
这个问题在于：
1. 不同的应用中，需要使用不同的 context，在 Streaming 中需要使用 StreamingContext，在 sql 中需要使用 sqlContext，在 hive 中需要使用 hiveContext，比较麻烦  
2. 随着 DataSet 和 DataFrame API 逐渐成为标准 API，需要为他们创建接入点，即 SparkSession   

SparkSession 是 spark2.x 引入的新概念，SparkSession 为用户提供统一的切入点，字面理解是创建会话，或者连接 spark
SparkSession.  
实际上封装了SparkContext，比如可以调用`spark.sparkContext.addPyFile("sparkxgb.zip")`, 另外也封装了 SparkConf、sqlContext，随着版本增加，可能更多。

所以我们尽量使用 SparkSession ，如果发现有些 API 不在 SparkSession 中，也可以通过 SparkSession 拿到 SparkContext 和其他 Context 等

In [4]:
# pyspark + xgboost test
spark = SparkSession\
        .builder\
        .appName("PySpark XGBOOST Titanic")\
        .master('local') \
        .getOrCreate()

# 加载数据

In [6]:
# read data
labels = [
    ('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),   #Y
    ('BIRTH_PLACE', typ.StringType()),              # onehot变量
    ('MOTHER_AGE_YEARS', typ.IntegerType()),
    ('FATHER_COMBINED_AGE', typ.IntegerType()),
    ('CIG_BEFORE', typ.IntegerType()),
    ('CIG_1_TRI', typ.IntegerType()),
    ('CIG_2_TRI', typ.IntegerType()),
    ('CIG_3_TRI', typ.IntegerType()),
    ('MOTHER_HEIGHT_IN', typ.IntegerType()),
    ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
    ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
    ('DIABETES_PRE', typ.IntegerType()),
    ('DIABETES_GEST', typ.IntegerType()),
    ('HYP_TENS_PRE', typ.IntegerType()),
    ('HYP_TENS_GEST', typ.IntegerType()),
    ('PREV_BIRTH_PRETERM', typ.IntegerType())
]

# 读取时指定每一列的数据类型
schema = typ.StructType([
    typ.StructField(e[0], e[1], False) for e in labels
])

df = spark.read.csv('births_transformed.csv.gz', 
                        header=True, 
                        schema=schema).withColumnRenamed('INFANT_ALIVE_AT_REPORT', 'label')

# 数据处理管道&xgboost模型

In [7]:
# Train a xgboost model
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, StandardScaler
from pyspark.ml import Pipeline
spark.sparkContext.addPyFile("sparkxgb.zip") # read xgboost pyspark client lib

from sparkxgb import XGBoostClassifier

assembler = VectorAssembler(
    inputCols=[c[0] for c in labels[2:]],
    outputCol="features")

xgboost = XGBoostClassifier(
    objective="reg:logistic",
    maxDepth=3,
    missing=float(0.0),
    featuresCol="features", 
    labelCol="label", 
)



# fit & predict

In [12]:
# fit on the train dataset
td = assembler.transform(df)
model = xgboost.fit(td)

# predict on the train dataset
result = model.transform(td)
print(result.columns)

['label', 'BIRTH_PLACE', 'MOTHER_AGE_YEARS', 'FATHER_COMBINED_AGE', 'CIG_BEFORE', 'CIG_1_TRI', 'CIG_2_TRI', 'CIG_3_TRI', 'MOTHER_HEIGHT_IN', 'MOTHER_PRE_WEIGHT', 'MOTHER_DELIVERY_WEIGHT', 'MOTHER_WEIGHT_GAIN', 'DIABETES_PRE', 'DIABETES_GEST', 'HYP_TENS_PRE', 'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM', 'features', 'rawPrediction', 'probability', 'prediction']


# predict results

In [10]:
result.select(["label", "rawPrediction", "probability", "prediction"]).show(4)


+-----+--------------------+--------------------+----------+
|label|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+----------+
|    0|[0.46121883392333...|[0.61330327391624...|       0.0|
|    0|[-0.0788606479763...|[0.48029506206512...|       1.0|
|    0|[0.34962576627731...|[0.58652684092521...|       0.0|
|    0|[-0.0788606479763...|[0.48029506206512...|       1.0|
+-----+--------------------+--------------------+----------+
only showing top 4 rows



# save model

In [11]:
# save trained model to local disk
trained_raw_model.nativeBooster.saveModel("outputmodel.xgboost")