## spark로 타이타닉 데이터 다뤄보기

In [77]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("spark-dataframe").getOrCreate()
spark

In [78]:
filepath = "titanic_data.csv"

# inferSchema=True: 컬럼 타입을 자동 추론
titanic_sdf = spark.read.csv(filepath, inferSchema=True, header=True)

In [79]:
import pyspark.sql.functions as F

In [80]:
len(titanic_sdf.columns)

12

In [81]:
titanic_sdf.count()

891

In [82]:
titanic_sdf.show(3)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
only showing top 3 rows



In [83]:
titanic_sdf.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [84]:
# 이름, 티켓, 객실번호 컬럼 제외
df = titanic_sdf.drop(F.col("Name"))
df = df.drop(F.col("Ticket"))
df = df.drop(F.col("Cabin"))
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = true)



In [85]:
# 컬럼명 변경
df = df.withColumnRenamed("PassengerId", "승객번호")
df = df.withColumnRenamed("Survived", "생존여부")
df = df.withColumnRenamed("Pclass", "좌석등급")
df = df.withColumnRenamed("Sex", "성별")
df = df.withColumnRenamed("Age", "나이")
df = df.withColumnRenamed("SibSp", "형제,배우자수")
df = df.withColumnRenamed("Parch", "부모,자녀수")
df = df.withColumnRenamed("Fare", "요금")
df = df.withColumnRenamed("Embarked", "탑승항구")

df.show(5)

+--------+--------+--------+------+----+-------------+-----------+-------+--------+
|승객번호|생존여부|좌석등급|  성별|나이|형제,배우자수|부모,자녀수|   요금|탑승항구|
+--------+--------+--------+------+----+-------------+-----------+-------+--------+
|       1|       0|       3|  male|22.0|            1|          0|   7.25|       S|
|       2|       1|       1|female|38.0|            1|          0|71.2833|       C|
|       3|       1|       3|female|26.0|            0|          0|  7.925|       S|
|       4|       1|       1|female|35.0|            1|          0|   53.1|       S|
|       5|       0|       3|  male|35.0|            0|          0|   8.05|       S|
+--------+--------+--------+------+----+-------------+-----------+-------+--------+
only showing top 5 rows



In [86]:
# otherwise(F.col()): 해당 데이터 아닐 경우 기존의 값을 유지
df = df.withColumn("탑승항구", F.when(F.col("탑승항구") == "S", "사우스햄튼").otherwise(F.col("탑승항구")))
df = df.withColumn("탑승항구", F.when(F.col("탑승항구") == "C", "셰르부르").otherwise(F.col("탑승항구")))
df = df.withColumn("탑승항구", F.when(F.col("탑승항구") == "Q", "퀸즈타운").otherwise(F.col("탑승항구")))
df = df.withColumn("성별", F.when(F.col("성별") == "male", "남").otherwise(F.col("성별")))
df = df.withColumn("성별", F.when(F.col("성별") == "female", "여").otherwise(F.col("성별")))

df.show(5)

+--------+--------+--------+----+----+-------------+-----------+-------+----------+
|승객번호|생존여부|좌석등급|성별|나이|형제,배우자수|부모,자녀수|   요금|  탑승항구|
+--------+--------+--------+----+----+-------------+-----------+-------+----------+
|       1|       0|       3|  남|22.0|            1|          0|   7.25|사우스햄튼|
|       2|       1|       1|  여|38.0|            1|          0|71.2833|  셰르부르|
|       3|       1|       3|  여|26.0|            0|          0|  7.925|사우스햄튼|
|       4|       1|       1|  여|35.0|            1|          0|   53.1|사우스햄튼|
|       5|       0|       3|  남|35.0|            0|          0|   8.05|사우스햄튼|
+--------+--------+--------+----+----+-------------+-----------+-------+----------+
only showing top 5 rows



# 전체 평균

In [107]:
df_mean = df.groupBy().agg(
    F.mean("생존여부"), F.mean("좌석등급"), F.mean("나이"), F.mean("형제,배우자수"), F.mean("부모,자녀수"), F.mean("요금"))

df_mean.show()

+------------------+-----------------+-----------------+------------------+-------------------+----------------+
|     avg(생존여부)|    avg(좌석등급)|        avg(나이)|avg(형제,배우자수)|   avg(부모,자녀수)|       avg(요금)|
+------------------+-----------------+-----------------+------------------+-------------------+----------------+
|0.3838383838383838|2.308641975308642|29.69911764705882|0.5230078563411896|0.38159371492704824|32.2042079685746|
+------------------+-----------------+-----------------+------------------+-------------------+----------------+



In [108]:
# 스파크의 round는 해당 자리까지 반올림
df_mean2 = df_mean.withColumn("avg(생존여부)", round("avg(생존여부)", 1))
df_mean2 = df_mean2.withColumn("avg(좌석등급)", round("avg(좌석등급)", 1))
df_mean2 = df_mean2.withColumn("avg(나이)", round("avg(나이)", 1))
df_mean2 = df_mean2.withColumn("avg(형제,배우자수)", round("avg(형제,배우자수)", 1))
df_mean2 = df_mean2.withColumn("avg(부모,자녀수)", round("avg(부모,자녀수)", 1))
df_mean2 = df_mean2.withColumn("avg(요금)", round("avg(요금)", 1))

df_mean2.show()

+-------------+-------------+---------+------------------+----------------+---------+
|avg(생존여부)|avg(좌석등급)|avg(나이)|avg(형제,배우자수)|avg(부모,자녀수)|avg(요금)|
+-------------+-------------+---------+------------------+----------------+---------+
|          0.4|          2.3|     29.7|               0.5|             0.4|     32.2|
+-------------+-------------+---------+------------------+----------------+---------+



In [109]:
# 전체 최대, 최소
df_mm = df.groupBy().agg(
    F.max("나이"), F.min("나이"), F.max("형제,배우자수"), F.min("형제,배우자수"), F.max("부모,자녀수"), F.min("부모,자녀수"), F.max("요금"), F.min("요금"))

df_mm.show()

+---------+---------+------------------+------------------+----------------+----------------+---------+---------+
|max(나이)|min(나이)|max(형제,배우자수)|min(형제,배우자수)|max(부모,자녀수)|min(부모,자녀수)|max(요금)|min(요금)|
+---------+---------+------------------+------------------+----------------+----------------+---------+---------+
|     80.0|     0.42|                 8|                 0|               6|               0| 512.3292|      0.0|
+---------+---------+------------------+------------------+----------------+----------------+---------+---------+



In [93]:
# 나이대 만들기
df_age = df.withColumn("나이대", F.when(F.col("나이") < 10, "0대")
            .when((F.col("나이") >= 10) & (F.col("나이") < 20), "10대")
            .when((F.col("나이") >= 20) & (F.col("나이") < 30), "20대")
            .when((F.col("나이") >= 30) & (F.col("나이") < 40), "30대")
            .when((F.col("나이") >= 40) & (F.col("나이") < 50), "40대")
            .when((F.col("나이") >= 50) & (F.col("나이") < 60), "50대")
            .when((F.col("나이") >= 60) & (F.col("나이") < 70), "60대")
            .when((F.col("나이") >= 70) & (F.col("나이") < 80), "70대")
            .when((F.col("나이") >= 80) & (F.col("나이") < 90), "80대").otherwise("null"))

df_age.show(5)

+--------+--------+--------+----+----+-------------+-----------+-------+----------+------+
|승객번호|생존여부|좌석등급|성별|나이|형제,배우자수|부모,자녀수|   요금|  탑승항구|나이대|
+--------+--------+--------+----+----+-------------+-----------+-------+----------+------+
|       1|       0|       3|  남|22.0|            1|          0|   7.25|사우스햄튼|  20대|
|       2|       1|       1|  여|38.0|            1|          0|71.2833|  셰르부르|  30대|
|       3|       1|       3|  여|26.0|            0|          0|  7.925|사우스햄튼|  20대|
|       4|       1|       1|  여|35.0|            1|          0|   53.1|사우스햄튼|  30대|
|       5|       0|       3|  남|35.0|            0|          0|   8.05|사우스햄튼|  30대|
+--------+--------+--------+----+----+-------------+-----------+-------+----------+------+
only showing top 5 rows



In [95]:
print(df_age.filter(F.col("나이대") == "0대").count())
print(df_age.filter(F.col("나이대") == "10대").count())
print(df_age.filter(F.col("나이대") == "20대").count())
print(df_age.filter(F.col("나이대") == "30대").count())
print(df_age.filter(F.col("나이대") == "40대").count())
print(df_age.filter(F.col("나이대") == "50대").count())
print(df_age.filter(F.col("나이대") == "60대").count())
print(df_age.filter(F.col("나이대") == "70대").count())
print(df_age.filter(F.col("나이대") == "80대").count())

62
102
220
167
89
48
19
6
1


In [96]:
df_age = df_age.groupBy("나이대").agg(
    F.mean("생존여부"), F.mean("좌석등급"), F.mean("형제,배우자수"), F.mean("부모,자녀수"), F.mean("요금"),
    F.count("*")
)

df_age.show()

+------+-------------------+------------------+-------------------+-------------------+------------------+--------+
|나이대|      avg(생존여부)|     avg(좌석등급)| avg(형제,배우자수)|   avg(부모,자녀수)|         avg(요금)|count(1)|
+------+-------------------+------------------+-------------------+-------------------+------------------+--------+
|  80대|                1.0|               1.0|                0.0|                0.0|              30.0|       1|
|  20대|               0.35|              2.45|0.32272727272727275|               0.25|27.278937272727294|     220|
|  50대| 0.4166666666666667|            1.5625| 0.2916666666666667| 0.2708333333333333| 47.93333333333334|      48|
|  10대| 0.4019607843137255|2.4705882352941178| 0.6666666666666666|0.47058823529411764| 32.53513235294118|     102|
|  70대|                0.0|1.8333333333333333|0.16666666666666666|0.16666666666666666|30.197233333333333|       6|
|  40대|0.38202247191011235|1.9662921348314606| 0.3707865168539326|0.47191011235955055| 38.00229662921

In [97]:
# 컬럼명 변경
df_age2 = df_age.withColumnRenamed("avg(생존여부)", "생존률")
df_age2 = df_age2.withColumnRenamed("avg(좌석등급)", "좌석등급평균")
df_age2 = df_age2.withColumnRenamed("avg(형제,배우자수)", "형제,배우자수평균")
df_age2 = df_age2.withColumnRenamed("avg(부모,자녀수)", "부모,자녀수평균")
df_age2 = df_age2.withColumnRenamed("avg(요금)", "요금평균")
df_age2 = df_age2.withColumnRenamed("count(1)", "인원수")

# 데이터 타입 변환 (생존률: 반올림 오류가 나는 경우)
from pyspark.sql.functions import col
df_age2 = df_age2.withColumn("생존률", col("생존률").cast("double"))
 
# 반올림
df_age2 = df_age2.withColumn("생존률", round(col("생존률"), 1))
df_age2 = df_age2.withColumn("좌석등급평균", round("좌석등급평균", 1))
df_age2 = df_age2.withColumn("형제,배우자수평균", round("형제,배우자수평균", 1))
df_age2 = df_age2.withColumn("부모,자녀수평균", round("부모,자녀수평균", 1))
df_age2 = df_age2.withColumn("요금평균", round("요금평균", 1))

df_age2 = df_age2.orderBy("나이대")
df_age2.show()

+------+------+------------+-----------------+---------------+--------+------+
|나이대|생존률|좌석등급평균|형제,배우자수평균|부모,자녀수평균|요금평균|인원수|
+------+------+------------+-----------------+---------------+--------+------+
|   0대|   0.6|         2.6|              1.9|            1.4|    30.6|    62|
|  10대|   0.4|         2.5|              0.7|            0.5|    32.5|   102|
|  20대|   0.4|         2.5|              0.3|            0.3|    27.3|   220|
|  30대|   0.4|         2.1|              0.4|            0.3|    40.4|   167|
|  40대|   0.4|         2.0|              0.4|            0.5|    38.0|    89|
|  50대|   0.4|         1.6|              0.3|            0.3|    47.9|    48|
|  60대|   0.3|         1.5|              0.3|            0.4|    48.4|    19|
|  70대|   0.0|         1.8|              0.2|            0.2|    30.2|     6|
|  80대|   1.0|         1.0|              0.0|            0.0|    30.0|     1|
|  null|   0.3|         2.6|              0.6|            0.2|    22.2|   177|
+------+------+-

# 나이 불명

In [106]:
df_null = df.filter(df['나이'].isNull())
df_null.show(5)
print(df_null.count())

+--------+--------+--------+----+----+-------------+-----------+------+----------+
|승객번호|생존여부|좌석등급|성별|나이|형제,배우자수|부모,자녀수|  요금|  탑승항구|
+--------+--------+--------+----+----+-------------+-----------+------+----------+
|       6|       0|       3|  남|NULL|            0|          0|8.4583|  퀸즈타운|
|      18|       1|       2|  남|NULL|            0|          0|  13.0|사우스햄튼|
|      20|       1|       3|  여|NULL|            0|          0| 7.225|  셰르부르|
|      27|       0|       3|  남|NULL|            0|          0| 7.225|  셰르부르|
|      29|       1|       3|  여|NULL|            0|          0|7.8792|  퀸즈타운|
+--------+--------+--------+----+----+-------------+-----------+------+----------+
only showing top 5 rows

177


In [73]:
df_null = df_null.groupBy("나이").agg(
    F.mean("생존여부"), F.mean("좌석등급"), F.mean("형제,배우자수"), F.mean("부모,자녀수"), F.mean("요금"))

# 컬럼명 변경
df_null = df_null.withColumnRenamed("avg(생존여부)", "생존률")
df_null = df_null.withColumnRenamed("avg(좌석등급)", "좌석등급평균")
df_null = df_null.withColumnRenamed("avg(형제,배우자수)", "형제,배우자수평균")
df_null = df_null.withColumnRenamed("avg(부모,자녀수)", "부모,자녀수평균")
df_null = df_null.withColumnRenamed("avg(요금)", "요금평균")

# 반올림
df_null = df_null.withColumn("생존률", round("생존률", 1))
df_null = df_null.withColumn("좌석등급평균", round("좌석등급평균", 1))
df_null = df_null.withColumn("형제,배우자수평균", round("형제,배우자수평균", 1))
df_null = df_null.withColumn("부모,자녀수평균", round("부모,자녀수평균", 1))
df_null = df_null.withColumn("요금평균", round("요금평균", 1))

df_null.show()

+----+------+------------+-----------------+---------------+--------+
|나이|생존률|좌석등급평균|형제,배우자수평균|부모,자녀수평균|요금평균|
+----+------+------------+-----------------+---------------+--------+
|NULL|   0.3|         2.6|              0.6|            0.2|    22.2|
+----+------+------------+-----------------+---------------+--------+



In [74]:
# Embarked

df_emb = df.groupBy("탑승항구").agg(
    F.mean("생존여부"), F.mean("좌석등급"), F.mean("형제,배우자수"), F.mean("부모,자녀수"), F.mean("요금"))

df_emb.show()

+----------+-------------------+------------------+-------------------+-------------------+------------------+
|  탑승항구|      avg(생존여부)|     avg(좌석등급)| avg(형제,배우자수)|   avg(부모,자녀수)|         avg(요금)|
+----------+-------------------+------------------+-------------------+-------------------+------------------+
|사우스햄튼|0.33695652173913043|2.3509316770186337| 0.5714285714285714|0.41304347826086957| 27.07981180124218|
|      NULL|                1.0|               1.0|                0.0|                0.0|              80.0|
|  셰르부르| 0.5535714285714286|1.8869047619047619| 0.3869047619047619| 0.3630952380952381| 59.95414404761905|
|  퀸즈타운|0.38961038961038963| 2.909090909090909|0.42857142857142855|0.16883116883116883|13.276029870129872|
+----------+-------------------+------------------+-------------------+-------------------+------------------+



In [75]:
# 컬럼명 변경
df_emb2 = df_emb.withColumnRenamed("avg(생존여부)", "생존률")
df_emb2 = df_emb2.withColumnRenamed("avg(좌석등급)", "좌석등급평균")
df_emb2 = df_emb2.withColumnRenamed("avg(형제,배우자수)", "형제,배우자수평균")
df_emb2 = df_emb2.withColumnRenamed("avg(부모,자녀수)", "부모,자녀수평균")
df_emb2 = df_emb2.withColumnRenamed("avg(요금)", "요금평균")

# 반올림
df_emb2 = df_emb2.withColumn("생존률", round("생존률", 1))
df_emb2 = df_emb2.withColumn("좌석등급평균", round("좌석등급평균", 1))
df_emb2 = df_emb2.withColumn("형제,배우자수평균", round("형제,배우자수평균", 1))
df_emb2 = df_emb2.withColumn("부모,자녀수평균", round("부모,자녀수평균", 1))
df_emb2 = df_emb2.withColumn("요금평균", round("요금평균", 1))

df_emb2 = df_emb2.orderBy(F.desc("생존률"))
df_emb = df_emb2
df_emb.show()

+----------+------+------------+-----------------+---------------+--------+
|  탑승항구|생존률|좌석등급평균|형제,배우자수평균|부모,자녀수평균|요금평균|
+----------+------+------------+-----------------+---------------+--------+
|      NULL|   1.0|         1.0|              0.0|            0.0|    80.0|
|  셰르부르|   0.6|         1.9|              0.4|            0.4|    60.0|
|  퀸즈타운|   0.4|         2.9|              0.4|            0.2|    13.3|
|사우스햄튼|   0.3|         2.4|              0.6|            0.4|    27.1|
+----------+------+------------+-----------------+---------------+--------+



In [76]:
spark.stop()