In [2]:
from pyspark import SparkConf , SparkContext
from pyspark.sql import SQLContext

conf = SparkConf().setMaster('local').setAppName('sparkApp')
spark = SparkContext(conf=conf)

sqlCtx = SQLContext(spark)
sqlCtx

<pyspark.sql.context.SQLContext at 0x294dec712e8>

### Titanic ML

In [3]:
titanic = sqlCtx.read.csv('C:/Users/i/data/spark_titanic_train.csv',
                        header=True ,
                        inferSchema=True) # 인퍼스키마를 줘야 타입에 맞게 데이터가 로드된다
type( titanic )

pyspark.sql.dataframe.DataFrame

In [4]:
titanic.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [5]:
titanic.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [6]:
titanic.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [7]:
titanic.select(['Survived' , 'Pclass' , 'Embarked']).show()

+--------+------+--------+
|Survived|Pclass|Embarked|
+--------+------+--------+
|       0|     3|       S|
|       1|     1|       C|
|       1|     3|       S|
|       1|     1|       S|
|       0|     3|       S|
|       0|     3|       Q|
|       0|     1|       S|
|       0|     3|       S|
|       1|     3|       S|
|       1|     2|       C|
|       1|     3|       S|
|       1|     1|       S|
|       0|     3|       S|
|       0|     3|       S|
|       0|     3|       S|
|       1|     2|       S|
|       0|     3|       Q|
|       1|     2|       S|
|       0|     3|       S|
|       1|     3|       C|
+--------+------+--------+
only showing top 20 rows



In [8]:
titanic.groupBy('Sex','Survived').count().show()

+------+--------+-----+
|   Sex|Survived|count|
+------+--------+-----+
|  male|       0|  468|
|female|       1|  233|
|female|       0|   81|
|  male|       1|  109|
+------+--------+-----+



In [9]:
titanic.groupBy('Pclass', 'Survived').count().show()

+------+--------+-----+
|Pclass|Survived|count|
+------+--------+-----+
|     1|       0|   80|
|     3|       1|  119|
|     1|       1|  136|
|     2|       1|   87|
|     2|       0|   97|
|     3|       0|  372|
+------+--------+-----+



In [10]:
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit

In [11]:
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
# This function use to print feature with null values and null count 
def null_value_count(df):
  null_columns_counts = []
  numRows = df.count()
  for k in df.columns:
    nullRows = df.where(col(k).isNull()).count() #k를 피처로 인식하도록 col(k) , spark 의 문법
    if(nullRows > 0):
      temp = k,nullRows
      null_columns_counts.append(temp)
  return(null_columns_counts)

In [12]:
null_list = null_value_count(titanic)

In [13]:
null_list

[('Age', 177), ('Cabin', 687), ('Embarked', 2)]

In [14]:
sqlCtx.createDataFrame(null_list , ['column' , 'cnt']).show()

+--------+---+
|  column|cnt|
+--------+---+
|     Age|177|
|   Cabin|687|
|Embarked|  2|
+--------+---+



In [15]:
# 나이 평균을 구한다면?
titanic.select(mean('Age')).show()

+-----------------+
|         avg(Age)|
+-----------------+
|29.69911764705882|
+-----------------+



In [16]:
titanic.select('Name').show()

+--------------------+
|                Name|
+--------------------+
|Braund, Mr. Owen ...|
|Cumings, Mrs. Joh...|
|Heikkinen, Miss. ...|
|Futrelle, Mrs. Ja...|
|Allen, Mr. Willia...|
|    Moran, Mr. James|
|McCarthy, Mr. Tim...|
|Palsson, Master. ...|
|Johnson, Mrs. Osc...|
|Nasser, Mrs. Nich...|
|Sandstrom, Miss. ...|
|Bonnell, Miss. El...|
|Saundercock, Mr. ...|
|Andersson, Mr. An...|
|Vestrom, Miss. Hu...|
|Hewlett, Mrs. (Ma...|
|Rice, Master. Eugene|
|Williams, Mr. Cha...|
|Vander Planke, Mr...|
|Masselmani, Mrs. ...|
+--------------------+
only showing top 20 rows



In [17]:
type( col('Name') )
# regexp_extract(col('Name'))
titanic = titanic.withColumn('initial' , regexp_extract( col ('Name') , '([A-Za-z]+)' , 1) )

In [18]:
titanic.select(['PassengerId','initial']).show()

+-----------+-----------+
|PassengerId|    initial|
+-----------+-----------+
|          1|     Braund|
|          2|    Cumings|
|          3|  Heikkinen|
|          4|   Futrelle|
|          5|      Allen|
|          6|      Moran|
|          7|   McCarthy|
|          8|    Palsson|
|          9|    Johnson|
|         10|     Nasser|
|         11|  Sandstrom|
|         12|    Bonnell|
|         13|Saundercock|
|         14|  Andersson|
|         15|    Vestrom|
|         16|    Hewlett|
|         17|       Rice|
|         18|   Williams|
|         19|     Vander|
|         20| Masselmani|
+-----------+-----------+
only showing top 20 rows



In [19]:
titanic.select('initial').distinct().show()

+-------------+
|      initial|
+-------------+
|     Pavlovic|
|         Saad|
|      Palsson|
| Thorneycroft|
|   Johannesen|
|     Meanwell|
|      Markoff|
|       Porter|
|     Harrison|
|     Bissette|
|        Hampe|
|      Hegarty|
|Lemberopolous|
|      Ekstrom|
|       Boulos|
|       Bourke|
|     McGovern|
|     Kvillner|
|   Goldenberg|
|        Keefe|
+-------------+
only showing top 20 rows



In [20]:
titanic = titanic.replace(['Don'] , ['Other'])

In [21]:
titanic.select('initial').distinct().show()

+-------------+
|      initial|
+-------------+
|     Pavlovic|
|         Saad|
|      Palsson|
| Thorneycroft|
|   Johannesen|
|     Meanwell|
|      Markoff|
|       Porter|
|     Harrison|
|     Bissette|
|        Hampe|
|      Hegarty|
|Lemberopolous|
|      Ekstrom|
|       Boulos|
|       Bourke|
|     McGovern|
|     Kvillner|
|   Goldenberg|
|        Keefe|
+-------------+
only showing top 20 rows



In [22]:
titanic.groupby('initial').avg('age').show()

+-------------+--------+
|      initial|avg(age)|
+-------------+--------+
|     Pavlovic|    32.0|
|         Saad|    25.0|
|      Palsson|    10.5|
| Thorneycroft|    null|
|   Johannesen|    null|
|     Meanwell|    null|
|      Markoff|    35.0|
|       Porter|    47.0|
|     Harrison|    40.0|
|     Bissette|    35.0|
|        Hampe|    20.0|
|      Hegarty|    18.0|
|Lemberopolous|    34.5|
|      Ekstrom|    45.0|
|       Boulos|     9.0|
|       Bourke|    36.0|
|     McGovern|    null|
|     Kvillner|    31.0|
|   Goldenberg|    49.0|
|        Keefe|    null|
+-------------+--------+
only showing top 20 rows



In [23]:
titanic = titanic.replace(['Mlle','Mme', 'Ms', 'Dr','Major','Lady','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'],
               ['Miss','Miss','Miss','Mr','Mr',  'Mrs',  'Mrs',  'Other',  'Other','Other','Mr','Mr','Mr'])

In [24]:
titanic.groupby('initial').avg('age').show()

+-------------+--------+
|      initial|avg(age)|
+-------------+--------+
|     Pavlovic|    32.0|
|         Saad|    25.0|
|      Palsson|    10.5|
| Thorneycroft|    null|
|   Johannesen|    null|
|     Meanwell|    null|
|      Markoff|    35.0|
|       Porter|    47.0|
|     Harrison|    40.0|
|     Bissette|    35.0|
|        Hampe|    20.0|
|      Hegarty|    18.0|
|Lemberopolous|    34.5|
|      Ekstrom|    45.0|
|       Boulos|     9.0|
|       Bourke|    36.0|
|     McGovern|    null|
|     Kvillner|    31.0|
|   Goldenberg|    49.0|
|        Keefe|    null|
+-------------+--------+
only showing top 20 rows



In [25]:
titanic.groupby('initial').avg('age').collect()

[Row(initial='Pavlovic', avg(age)=32.0),
 Row(initial='Saad', avg(age)=25.0),
 Row(initial='Palsson', avg(age)=10.5),
 Row(initial='Thorneycroft', avg(age)=None),
 Row(initial='Johannesen', avg(age)=None),
 Row(initial='Meanwell', avg(age)=None),
 Row(initial='Markoff', avg(age)=35.0),
 Row(initial='Porter', avg(age)=47.0),
 Row(initial='Harrison', avg(age)=40.0),
 Row(initial='Bissette', avg(age)=35.0),
 Row(initial='Hampe', avg(age)=20.0),
 Row(initial='Hegarty', avg(age)=18.0),
 Row(initial='Lemberopolous', avg(age)=34.5),
 Row(initial='Ekstrom', avg(age)=45.0),
 Row(initial='Boulos', avg(age)=9.0),
 Row(initial='Bourke', avg(age)=36.0),
 Row(initial='McGovern', avg(age)=None),
 Row(initial='Kvillner', avg(age)=31.0),
 Row(initial='Goldenberg', avg(age)=49.0),
 Row(initial='Keefe', avg(age)=None),
 Row(initial='Leader', avg(age)=49.0),
 Row(initial='Kraeff', avg(age)=None),
 Row(initial='van', avg(age)=40.5),
 Row(initial='Cairns', avg(age)=None),
 Row(initial='Horgan', avg(age)=Non

In [26]:
titanic.filter(titanic['Age'] == 48).select('initial').show()

+--------+
| initial|
+--------+
|Anderson|
| Milling|
|    Duff|
|  Harper|
|  Taylor|
|    Ford|
|  Herman|
|  Jensen|
|   Swift|
+--------+



In [27]:
# null 처리 방법
titanic.groupby('Embarked').count().show()
titanic = titanic.na.fill({'Embarked' : 'S'})

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   77|
|    null|    2|
|       C|  168|
|       S|  644|
+--------+-----+



In [28]:
titanic.groupby('Embarked').count().show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   77|
|       C|  168|
|       S|  646|
+--------+-----+



In [29]:
titanic = titanic.drop('Cabin')

In [30]:
titanic.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = false)
 |-- initial: string (nullable = true)



In [31]:
# 파생 컬럼 만드는 방법
titanic = titanic.withColumn('Family_Size' , col('SibSp')+col('Parch'))

In [32]:
titanic.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = false)
 |-- initial: string (nullable = true)
 |-- Family_Size: integer (nullable = true)



In [33]:
titanic.select('Family_Size').show()

+-----------+
|Family_Size|
+-----------+
|          1|
|          1|
|          0|
|          1|
|          0|
|          0|
|          0|
|          4|
|          2|
|          1|
|          2|
|          0|
|          0|
|          6|
|          0|
|          0|
|          5|
|          0|
|          1|
|          0|
+-----------+
only showing top 20 rows



In [34]:
titanic.groupby('Family_Size').count().show()

+-----------+-----+
|Family_Size|count|
+-----------+-----+
|          1|  161|
|          6|   12|
|          3|   29|
|          5|   22|
|          4|   15|
|          7|    6|
|         10|    7|
|          2|  102|
|          0|  537|
+-----------+-----+



In [35]:
titanic = titanic.withColumn('Alone' , lit(0))

In [36]:
titanic.select('Alone').show()

+-----+
|Alone|
+-----+
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
+-----+
only showing top 20 rows



In [37]:
# when ~ otherwise -> if ~ else 와 비슷한 함수
titanic = titanic.withColumn('Alone' , when(titanic['Family_Size'] == 0, 1).otherwise(titanic['Alone']))

In [38]:
titanic.select('Alone').show()

+-----+
|Alone|
+-----+
|    0|
|    0|
|    1|
|    0|
|    1|
|    1|
|    1|
|    0|
|    0|
|    0|
|    0|
|    1|
|    1|
|    0|
|    1|
|    1|
|    0|
|    1|
|    0|
|    1|
+-----+
only showing top 20 rows



In [39]:
titanic.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Embarked',
 'initial',
 'Family_Size',
 'Alone']

In [40]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [41]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(titanic) for column in ["Sex","Embarked","initial"]]
pipeline = Pipeline(stages=indexers)
titanic = pipeline.fit(titanic).transform(titanic)

In [42]:
titanic.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+-----------+-----------+-----+---------+--------------+-------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Embarked|    initial|Family_Size|Alone|Sex_index|Embarked_index|initial_index|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+-----------+-----------+-----+---------+--------------+-------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|       S|     Braund|          1|    0|      0.0|           0.0|         58.0|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|       C|    Cumings|          1|    0|      1.0|           1.0|        406.0|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925|       S|  

In [43]:
titanic.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+-----------+-----------+-----+---------+--------------+-------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Embarked|    initial|Family_Size|Alone|Sex_index|Embarked_index|initial_index|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+-----------+-----------+-----+---------+--------------+-------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|       S|     Braund|          1|    0|      0.0|           0.0|         58.0|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|       C|    Cumings|          1|    0|      1.0|           1.0|        406.0|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925|       S|  

In [44]:
titanic.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = false)
 |-- initial: string (nullable = true)
 |-- Family_Size: integer (nullable = true)
 |-- Alone: integer (nullable = false)
 |-- Sex_index: double (nullable = false)
 |-- Embarked_index: double (nullable = false)
 |-- initial_index: double (nullable = false)



In [54]:
titanic = titanic.na.fill({'Age' : 20})

In [55]:
titanic = titanic.drop("PassengerId","Name","Ticket","Cabin","Embarked","Sex","initial")

In [56]:
titanic.show()

+--------+------+----+-----+-----+-------+-----------+-----+---------+--------------+-------------+
|Survived|Pclass| Age|SibSp|Parch|   Fare|Family_Size|Alone|Sex_index|Embarked_index|initial_index|
+--------+------+----+-----+-----+-------+-----------+-----+---------+--------------+-------------+
|       0|     3|22.0|    1|    0|   7.25|          1|    0|      0.0|           0.0|         58.0|
|       1|     1|38.0|    1|    0|71.2833|          1|    0|      1.0|           1.0|        406.0|
|       1|     3|26.0|    0|    0|  7.925|          0|    1|      1.0|           0.0|        473.0|
|       1|     1|35.0|    1|    0|   53.1|          1|    0|      1.0|           0.0|         93.0|
|       0|     3|35.0|    0|    0|   8.05|          0|    1|      0.0|           0.0|         94.0|
|       0|     3|20.0|    0|    0| 8.4583|          0|    1|      0.0|           2.0|         48.0|
|       0|     1|54.0|    0|    0|51.8625|          0|    1|      0.0|           0.0|        620.0|


In [57]:
titanic.columns[1:]

['Pclass',
 'Age',
 'SibSp',
 'Parch',
 'Fare',
 'Family_Size',
 'Alone',
 'Sex_index',
 'Embarked_index',
 'initial_index']

In [58]:
from pyspark.ml.feature import VectorAssembler

In [59]:
feature = VectorAssembler(inputCols = titanic.columns[1:], outputCol = 'features')
feature_vector = feature.transform(titanic)

In [60]:
feature_vector.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Family_Size: integer (nullable = true)
 |-- Alone: integer (nullable = false)
 |-- Sex_index: double (nullable = false)
 |-- Embarked_index: double (nullable = false)
 |-- initial_index: double (nullable = false)
 |-- features: vector (nullable = true)



In [61]:
# 데이터 세트를 나누는 작업
trainData , testData = feature_vector.randomSplit([.8 , .2] , seed=100)

In [62]:
# type(trainData)
trainData

DataFrame[Survived: int, Pclass: int, Age: double, SibSp: int, Parch: int, Fare: double, Family_Size: int, Alone: int, Sex_index: double, Embarked_index: double, initial_index: double, features: vector]

- 모델링
- Spark ML(DTC , LR , RFC , GDTC , NB , SVM)

In [72]:
# LogisticRegression
# 데이터의 범주가 0 , 1  사이의 값으로 예측하는 분류 알고리즘

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol='Survived' , featuresCol='features')
lr_model = lr.fit(trainData) #모델 생성
lr_pred = lr_model.transform(testData) #예측

In [66]:
# type ( lr_pred )
lr_pred.printSchema()
lr_pred.select('prediction' , 'Survived' , 'features').show()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Family_Size: integer (nullable = true)
 |-- Alone: integer (nullable = false)
 |-- Sex_index: double (nullable = false)
 |-- Embarked_index: double (nullable = false)
 |-- initial_index: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       0.0|       0|(10,[0,1,6,9],[1....|
|       0.0|       0|(10,[0,1,4,6,9],[...|
|       0.0|       0|(10,[0,1,4,6,9],[...|
|       1.0|       0|[1.0,20.0,0.0,0.0...|
|       1.0|       0|(10,[0,1,4,6,9],[...|
|       1.0|       0|[1.0,20.0,0.

In [68]:
# 평가
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol= 'Survived', 
                                              predictionCol= 'prediction', 
                                              metricName='accuracy')

In [73]:
acc = evaluator.evaluate(lr_pred)
print('acc : ' , acc) 
print('err : ' , 1.0 - acc)

acc :  0.8
err :  0.19999999999999996
