In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

conf = SparkConf().setMaster('local').setAppName('spark_ml')
spark = SparkContext(conf=conf)
sqlCtx = SQLContext(spark)
sqlCtx

<pyspark.sql.context.SQLContext at 0x28481dd16d8>

## Titanic ML

In [2]:
titanic = sqlCtx.read.csv('./data/spark_titanic_train.csv',
                         header = True,
                         inferSchema=True)

type(titanic)

pyspark.sql.dataframe.DataFrame

In [3]:
titanic.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [4]:
titanic.select(['Survived', 'Pclass','Embarked']).show()

+--------+------+--------+
|Survived|Pclass|Embarked|
+--------+------+--------+
|       0|     3|       S|
|       1|     1|       C|
|       1|     3|       S|
|       1|     1|       S|
|       0|     3|       S|
|       0|     3|       Q|
|       0|     1|       S|
|       0|     3|       S|
|       1|     3|       S|
|       1|     2|       C|
|       1|     3|       S|
|       1|     1|       S|
|       0|     3|       S|
|       0|     3|       S|
|       0|     3|       S|
|       1|     2|       S|
|       0|     3|       Q|
|       1|     2|       S|
|       0|     3|       S|
|       1|     3|       C|
+--------+------+--------+
only showing top 20 rows



- EDA

In [5]:
titanic.groupBy('Survived').count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



In [6]:
titanic.groupBy('Pclass', 'Survived').count().show()

+------+--------+-----+
|Pclass|Survived|count|
+------+--------+-----+
|     1|       0|   80|
|     3|       1|  119|
|     1|       1|  136|
|     2|       1|   87|
|     2|       0|   97|
|     3|       0|  372|
+------+--------+-----+



In [7]:
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit

In [8]:
# This function use to print feature with null values and null count 
def null_value_count(df):
  null_columns_counts = []
  numRows = df.count()
  for k in df.columns:
    nullRows = df.where(col(k).isNull()).count()
    if(nullRows > 0):
      temp = k,nullRows
      null_columns_counts.append(temp)
  return(null_columns_counts)

In [9]:
null_list = null_value_count(titanic)

In [10]:
null_list

[('Age', 177), ('Cabin', 687), ('Embarked', 2)]

In [11]:
# 나이 평균을 구한다면?
titanic.select(mean('Age')).show()

+-----------------+
|         avg(Age)|
+-----------------+
|29.69911764705882|
+-----------------+



In [12]:
titanic.select('Name').show()

+--------------------+
|                Name|
+--------------------+
|Braund, Mr. Owen ...|
|Cumings, Mrs. Joh...|
|Heikkinen, Miss. ...|
|Futrelle, Mrs. Ja...|
|Allen, Mr. Willia...|
|    Moran, Mr. James|
|McCarthy, Mr. Tim...|
|Palsson, Master. ...|
|Johnson, Mrs. Osc...|
|Nasser, Mrs. Nich...|
|Sandstrom, Miss. ...|
|Bonnell, Miss. El...|
|Saundercock, Mr. ...|
|Andersson, Mr. An...|
|Vestrom, Miss. Hu...|
|Hewlett, Mrs. (Ma...|
|Rice, Master. Eugene|
|Williams, Mr. Cha...|
|Vander Planke, Mr...|
|Masselmani, Mrs. ...|
+--------------------+
only showing top 20 rows



In [13]:
# type(col('Name'))
# regexp_extract(col('Name'))

titanic = titanic.withColumn('initial', regexp_extract(col('Name'), '([A-Za-z]+)\.',1))

In [14]:
titanic.select('initial').distinct().show()

+--------+
| initial|
+--------+
|     Don|
|    Miss|
|Countess|
|     Col|
|     Rev|
|    Lady|
|  Master|
|     Mme|
|    Capt|
|      Mr|
|      Dr|
|     Mrs|
|     Sir|
|Jonkheer|
|    Mlle|
|   Major|
|      Ms|
+--------+



In [15]:
titanic = titanic.replace(['Mlle','Mme', 'Ms', 'Dr','Major','Lady','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'],
               ['Miss','Miss','Miss','Mr','Mr',  'Mrs',  'Mrs',  'Other',  'Other','Other','Mr','Mr','Mr'])

In [16]:
titanic.select('initial').distinct().collect()

[Row(initial='Miss'),
 Row(initial='Other'),
 Row(initial='Master'),
 Row(initial='Mr'),
 Row(initial='Mrs')]

In [17]:
titanic.groupby('initial').avg('age').show()

+-------+------------------+
|initial|          avg(age)|
+-------+------------------+
|   Miss|             21.86|
|  Other|45.888888888888886|
| Master| 4.574166666666667|
|     Mr| 32.73960880195599|
|    Mrs|35.981818181818184|
+-------+------------------+



In [18]:
# null 처리 방법
titanic.groupby('Embarked').count().show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   77|
|    null|    2|
|       C|  168|
|       S|  644|
+--------+-----+



In [19]:
titanic = titanic.na.fill({'Embarked' : 'S'})
titanic.groupby('Embarked').count().show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   77|
|       C|  168|
|       S|  646|
+--------+-----+



In [20]:
# 컬럼 삭제
titanic = titanic.drop('Cabin')
titanic.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = false)
 |-- initial: string (nullable = true)



In [21]:
# 파생 컬럼 만드는 방법
titanic = titanic.withColumn('Family_Size', col('Sibsp') + col('Parch'))
titanic.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = false)
 |-- initial: string (nullable = true)
 |-- Family_Size: integer (nullable = true)



In [24]:
titanic.groupBy('Family_Size').count().show()

+-----------+-----+
|Family_Size|count|
+-----------+-----+
|          1|  161|
|          6|   12|
|          3|   29|
|          5|   22|
|          4|   15|
|          7|    6|
|         10|    7|
|          2|  102|
|          0|  537|
+-----------+-----+



In [25]:
titanic = titanic.withColumn('Alone', lit(0))

In [26]:
titanic.select('Alone').show()

+-----+
|Alone|
+-----+
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
+-----+
only showing top 20 rows



In [28]:
titanic = titanic.withColumn('Alone', when(titanic['Family_Size']==0, 1).otherwise(titanic['Alone']))

In [29]:
titanic.select('Alone').show()

+-----+
|Alone|
+-----+
|    0|
|    0|
|    1|
|    0|
|    1|
|    1|
|    1|
|    0|
|    0|
|    0|
|    0|
|    1|
|    1|
|    0|
|    1|
|    1|
|    0|
|    1|
|    0|
|    1|
+-----+
only showing top 20 rows



In [50]:
# 임의로 age null값 처리
titanic = titanic.na.fill({'Age' : 20})

In [51]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [52]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(titanic) for column in ["Sex","Embarked","initial"]]
pipeline = Pipeline(stages=indexers)
titanic = pipeline.fit(titanic).transform(titanic)

Py4JJavaError: An error occurred while calling o403.fit.
: org.apache.spark.SparkException: Input column Sex does not exist.
	at org.apache.spark.ml.feature.StringIndexerBase.$anonfun$validateAndTransformSchema$2(StringIndexer.scala:128)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
	at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
	at org.apache.spark.ml.feature.StringIndexerBase.validateAndTransformSchema(StringIndexer.scala:123)
	at org.apache.spark.ml.feature.StringIndexerBase.validateAndTransformSchema$(StringIndexer.scala:115)
	at org.apache.spark.ml.feature.StringIndexer.validateAndTransformSchema(StringIndexer.scala:145)
	at org.apache.spark.ml.feature.StringIndexer.transformSchema(StringIndexer.scala:251)
	at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:71)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:236)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)


In [53]:
titanic.show()

+--------+------+----+-----+-----+-------+-----------+-----+---------+--------------+-------------+
|Survived|Pclass| Age|SibSp|Parch|   Fare|Family_Size|Alone|Sex_index|Embarked_index|initial_index|
+--------+------+----+-----+-----+-------+-----------+-----+---------+--------------+-------------+
|       0|     3|22.0|    1|    0|   7.25|          1|    0|      0.0|           0.0|          0.0|
|       1|     1|38.0|    1|    0|71.2833|          1|    0|      1.0|           1.0|          2.0|
|       1|     3|26.0|    0|    0|  7.925|          0|    1|      1.0|           0.0|          1.0|
|       1|     1|35.0|    1|    0|   53.1|          1|    0|      1.0|           0.0|          2.0|
|       0|     3|35.0|    0|    0|   8.05|          0|    1|      0.0|           0.0|          0.0|
|       0|     3|20.0|    0|    0| 8.4583|          0|    1|      0.0|           2.0|          0.0|
|       0|     1|54.0|    0|    0|51.8625|          0|    1|      0.0|           0.0|          0.0|


In [54]:
titanic.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Family_Size: integer (nullable = true)
 |-- Alone: integer (nullable = false)
 |-- Sex_index: double (nullable = false)
 |-- Embarked_index: double (nullable = false)
 |-- initial_index: double (nullable = false)



In [55]:
titanic = titanic.drop('PassengerId','Name','Sex','Ticket','Embarked','initial')

In [56]:
titanic.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Family_Size: integer (nullable = true)
 |-- Alone: integer (nullable = false)
 |-- Sex_index: double (nullable = false)
 |-- Embarked_index: double (nullable = false)
 |-- initial_index: double (nullable = false)



In [57]:
from pyspark.ml.feature import VectorAssembler

In [58]:
feature = VectorAssembler(inputCols = titanic.columns[1:], outputCol = 'features')

In [59]:
feature_vector = feature.transform(titanic)
feature_vector.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Family_Size: integer (nullable = true)
 |-- Alone: integer (nullable = false)
 |-- Sex_index: double (nullable = false)
 |-- Embarked_index: double (nullable = false)
 |-- initial_index: double (nullable = false)
 |-- features: vector (nullable = true)



In [60]:
# 데이터 세트 나누는 작업
trainData , testData = feature_vector.randomSplit([.8,.2],seed=100)

In [61]:
trainData.show()

+--------+------+----+-----+-----+--------+-----------+-----+---------+--------------+-------------+--------------------+
|Survived|Pclass| Age|SibSp|Parch|    Fare|Family_Size|Alone|Sex_index|Embarked_index|initial_index|            features|
+--------+------+----+-----+-----+--------+-----------+-----+---------+--------------+-------------+--------------------+
|       0|     1| 2.0|    1|    2|  151.55|          3|    0|      1.0|           0.0|          1.0|[1.0,2.0,1.0,2.0,...|
|       0|     1|18.0|    1|    0|   108.9|          1|    0|      0.0|           1.0|          0.0|[1.0,18.0,1.0,0.0...|
|       0|     1|19.0|    1|    0|    53.1|          1|    0|      0.0|           0.0|          0.0|(10,[0,1,2,4,5],[...|
|       0|     1|20.0|    0|    0|     0.0|          0|    1|      0.0|           0.0|          0.0|(10,[0,1,6],[1.0,...|
|       0|     1|20.0|    0|    0|     0.0|          0|    1|      0.0|           0.0|          0.0|(10,[0,1,6],[1.0,...|
|       0|     1|20.0|  

- 모델링
- Spark MLLIB(DTC, LR, RFC, GDTC, NB, SVM)

In [63]:
# LogisticRegression
# 데이터의 범주가 0, 1 사이의 값으로 예측하는 분류 알고리즘

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol='Survived', featuresCol='features')
lr_model = lr.fit(trainData)
lr_pred = lr_model.transform(testData)

In [67]:
#lr_pred.printSchema()
lr_pred.select('prediction','Survived','features').show()

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       0.0|       0|[1.0,19.0,3.0,2.0...|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       1.0|       0|(10,[0,1,3,4,5],[...|
|       1.0|       0|[1.0,27.0,0.0,2.0...|
|       0.0|       0|(10,[0,1,2,4,5],[...|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,2,4,5],[...|
|       0.0|       0|(10,[0,1,2,4,5],[...|
|       1.0|       0|(10,[0,1,6,9],[1....|
|       0.0|       0|(10,[0,1,6],[1.0,...|
|       0.0|       0|(10,[0,1,6],[1.0,...|
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|[1.0,50.0,1.0,0.0...|
|       0.0|       0|[1.0,51.0,0.0,1.0...|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,4,6],[2....|
|       0.0|       0|(10,[0,1,2,4,5],[...|
+----------

In [69]:
# 평가

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol = 'Survived',
                                              predictionCol = 'prediction',
                                              metricName = 'accuracy')

In [71]:
acc = evaluator.evaluate(lr_pred)
print('acc : ', acc)
print('err : ', 1.0-acc)

acc :  0.8
err :  0.19999999999999996
