In [39]:
import pyspark.sql.types as typ #使用时采用typ.DoubleType()的形式
from pyspark.sql.types import DoubleType #直接使用DoubleType()
labels = [
('INFANT_ALIVE_AT_REPORT', typ.DoubleType()),
('BIRTH_PLACE', typ.StringType()),
('MOTHER_AGE_YEARS', DoubleType()),
('FATHER_COMBINED_AGE', typ.DoubleType()),
('CIG_BEFORE', typ.DoubleType()),
('CIG_1_TRI', typ.DoubleType()),
('CIG_2_TRI', typ.DoubleType()),
('CIG_3_TRI', typ.DoubleType()),
('MOTHER_HEIGHT_IN', typ.DoubleType()),
('MOTHER_PRE_WEIGHT', typ.DoubleType()),
('MOTHER_DELIVERY_WEIGHT', typ.DoubleType()),
('MOTHER_WEIGHT_GAIN', typ.DoubleType()),
('DIABETES_PRE', typ.DoubleType()),
('DIABETES_GEST', typ.DoubleType()),
('HYP_TENS_PRE', typ.DoubleType()),
('HYP_TENS_GEST', typ.DoubleType()),
('PREV_BIRTH_PRETERM', typ.DoubleType())
]
# Specifying the schema of the DataFrame
schema = typ.StructType([
    typ.StructField(e[0], e[1], False) for e in labels
])
births = spark.read.csv('file:/home/hadoop/pythonwork/InfantAlive/births_train.csv',header=True,schema=schema)
births.show(2)

+----------------------+-----------+----------------+-------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+
|INFANT_ALIVE_AT_REPORT|BIRTH_PLACE|MOTHER_AGE_YEARS|FATHER_COMBINED_AGE|CIG_BEFORE|CIG_1_TRI|CIG_2_TRI|CIG_3_TRI|MOTHER_HEIGHT_IN|MOTHER_PRE_WEIGHT|MOTHER_DELIVERY_WEIGHT|MOTHER_WEIGHT_GAIN|DIABETES_PRE|DIABETES_GEST|HYP_TENS_PRE|HYP_TENS_GEST|PREV_BIRTH_PRETERM|
+----------------------+-----------+----------------+-------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+
|                   0.0|          1|            29.0|               99.0|       0.0|      0.0|      0.0|      0.0|            99.0|            999.0|                 999.0|              99.0|         0.0| 

In [40]:
births = births.withColumn('BIRTH_PLACE_INT', births['BIRTH_PLACE']\
                           .cast(typ.IntegerType()))
births.printSchema()

root
 |-- INFANT_ALIVE_AT_REPORT: double (nullable = true)
 |-- BIRTH_PLACE: string (nullable = true)
 |-- MOTHER_AGE_YEARS: double (nullable = true)
 |-- FATHER_COMBINED_AGE: double (nullable = true)
 |-- CIG_BEFORE: double (nullable = true)
 |-- CIG_1_TRI: double (nullable = true)
 |-- CIG_2_TRI: double (nullable = true)
 |-- CIG_3_TRI: double (nullable = true)
 |-- MOTHER_HEIGHT_IN: double (nullable = true)
 |-- MOTHER_PRE_WEIGHT: double (nullable = true)
 |-- MOTHER_DELIVERY_WEIGHT: double (nullable = true)
 |-- MOTHER_WEIGHT_GAIN: double (nullable = true)
 |-- DIABETES_PRE: double (nullable = true)
 |-- DIABETES_GEST: double (nullable = true)
 |-- HYP_TENS_PRE: double (nullable = true)
 |-- HYP_TENS_GEST: double (nullable = true)
 |-- PREV_BIRTH_PRETERM: double (nullable = true)
 |-- BIRTH_PLACE_INT: integer (nullable = true)



In [41]:
import pyspark.ml.feature as ft

# Using the OneHotEncoder to encode
encoder = ft.OneHotEncoder(
inputCol='BIRTH_PLACE_INT',
outputCol='BIRTH_PLACE_VEC')

featuresCreator = ft.VectorAssembler(
inputCols=[col[0] for col in labels[2:]] + [encoder.getOutputCol()]\
    ,outputCol='features')


import pyspark.ml.classification as cl
logistic = cl.LogisticRegression(maxIter=10,regParam=0.01\
                                 ,labelCol='INFANT_ALIVE_AT_REPORT')

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[encoder,featuresCreator,logistic])
births_train, births_test = births.randomSplit([0.7, 0.3], seed=666)
pipelineModel = pipeline.fit(births_train)
predictions = pipelineModel.transform(births_test)
predictions.show(1)

+----------------------+-----------+----------------+-------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+---------------+---------------+--------------------+--------------------+--------------------+----------+
|INFANT_ALIVE_AT_REPORT|BIRTH_PLACE|MOTHER_AGE_YEARS|FATHER_COMBINED_AGE|CIG_BEFORE|CIG_1_TRI|CIG_2_TRI|CIG_3_TRI|MOTHER_HEIGHT_IN|MOTHER_PRE_WEIGHT|MOTHER_DELIVERY_WEIGHT|MOTHER_WEIGHT_GAIN|DIABETES_PRE|DIABETES_GEST|HYP_TENS_PRE|HYP_TENS_GEST|PREV_BIRTH_PRETERM|BIRTH_PLACE_INT|BIRTH_PLACE_VEC|            features|       rawPrediction|         probability|prediction|
+----------------------+-----------+----------------+-------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+-------------

In [42]:
# 评估模型的性能
import pyspark.ml.evaluation as ev

# 使用BinaryClassificationEvaluator来检验模型的表现，
# rawPredictionCol可以是由评估器产生的rawPrediction列，也可以是probability列
evaluator = ev.BinaryClassificationEvaluator(
rawPredictionCol='probability',
labelCol='INFANT_ALIVE_AT_REPORT')

print(evaluator.evaluate(predictions,{evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(predictions,{evaluator.metricName: 'areaUnderPR'}))

0.7342616552067652
0.587468578936341


In [43]:
test = spark.read.csv('file:/home/hadoop/pythonwork/InfantAlive/births_test.csv',header=True,schema=schema)
test.show(2)

+----------------------+-----------+----------------+-------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+
|INFANT_ALIVE_AT_REPORT|BIRTH_PLACE|MOTHER_AGE_YEARS|FATHER_COMBINED_AGE|CIG_BEFORE|CIG_1_TRI|CIG_2_TRI|CIG_3_TRI|MOTHER_HEIGHT_IN|MOTHER_PRE_WEIGHT|MOTHER_DELIVERY_WEIGHT|MOTHER_WEIGHT_GAIN|DIABETES_PRE|DIABETES_GEST|HYP_TENS_PRE|HYP_TENS_GEST|PREV_BIRTH_PRETERM|
+----------------------+-----------+----------------+-------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+
|                  null|          1|            29.0|               34.0|       0.0|      0.0|      0.0|      0.0|            66.0|            195.0|                 199.0|               4.0|         0.0| 

In [44]:
test1 = test.withColumn('BIRTH_PLACE_INT', test['BIRTH_PLACE']\
                           .cast(typ.IntegerType()))
test1.printSchema()

root
 |-- INFANT_ALIVE_AT_REPORT: double (nullable = true)
 |-- BIRTH_PLACE: string (nullable = true)
 |-- MOTHER_AGE_YEARS: double (nullable = true)
 |-- FATHER_COMBINED_AGE: double (nullable = true)
 |-- CIG_BEFORE: double (nullable = true)
 |-- CIG_1_TRI: double (nullable = true)
 |-- CIG_2_TRI: double (nullable = true)
 |-- CIG_3_TRI: double (nullable = true)
 |-- MOTHER_HEIGHT_IN: double (nullable = true)
 |-- MOTHER_PRE_WEIGHT: double (nullable = true)
 |-- MOTHER_DELIVERY_WEIGHT: double (nullable = true)
 |-- MOTHER_WEIGHT_GAIN: double (nullable = true)
 |-- DIABETES_PRE: double (nullable = true)
 |-- DIABETES_GEST: double (nullable = true)
 |-- HYP_TENS_PRE: double (nullable = true)
 |-- HYP_TENS_GEST: double (nullable = true)
 |-- PREV_BIRTH_PRETERM: double (nullable = true)
 |-- BIRTH_PLACE_INT: integer (nullable = true)



In [45]:
predictions1 = pipelineModel.transform(test1)
predictions1.show(1)

Py4JJavaError: An error occurred while calling o2394.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 254.0 failed 1 times, most recent failure: Lost task 0.0 in stage 254.0 (TID 229, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (vector) => vector)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: requirement failed: BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching sizes: x.size = 22, y.size = 24
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.ml.linalg.BLAS$.dot(BLAS.scala:104)
	at org.apache.spark.ml.classification.LogisticRegressionModel$$anonfun$33.apply(LogisticRegression.scala:998)
	at org.apache.spark.ml.classification.LogisticRegressionModel$$anonfun$33.apply(LogisticRegression.scala:997)
	at org.apache.spark.ml.classification.LogisticRegressionModel.predictRaw(LogisticRegression.scala:1154)
	at org.apache.spark.ml.classification.LogisticRegressionModel.predictRaw(LogisticRegression.scala:927)
	at org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:117)
	at org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:116)
	... 19 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3273)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (vector) => vector)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
Caused by: java.lang.IllegalArgumentException: requirement failed: BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching sizes: x.size = 22, y.size = 24
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.ml.linalg.BLAS$.dot(BLAS.scala:104)
	at org.apache.spark.ml.classification.LogisticRegressionModel$$anonfun$33.apply(LogisticRegression.scala:998)
	at org.apache.spark.ml.classification.LogisticRegressionModel$$anonfun$33.apply(LogisticRegression.scala:997)
	at org.apache.spark.ml.classification.LogisticRegressionModel.predictRaw(LogisticRegression.scala:1154)
	at org.apache.spark.ml.classification.LogisticRegressionModel.predictRaw(LogisticRegression.scala:927)
	at org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:117)
	at org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:116)
	... 19 more
