In [1]:
#!pip install -q pyspark

### Создаем Spark сессию и устанавливаем зависимости

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Titanic').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/23 15:35:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Импортирование дополнительных библиотек

In [2]:
from itertools import chain
from pyspark.sql.functions import count, mean, when, lit, create_map, regexp_extract

### Импортирование данных

In [3]:
df = spark.read.csv('tit_train.csv', header=True, inferSchema=True)

### Просмотр схемы

In [4]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [5]:
df.show(3)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
only showing top 3 rows


### Отображение в Pandas

In [6]:
df.limit(20).toPandas()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S
5,6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
6,7,0,1,"McCarthy, Mr. Timothy J",male,54.0,0,0,17463,51.8625,E46,S
7,8,0,3,"Palsson, Master. Gosta Leonard",male,2.0,3,1,349909,21.075,,S
8,9,1,3,"Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)",female,27.0,0,2,347742,11.1333,,S
9,10,1,2,"Nasser, Mrs. Nicholas (Adele Achem)",female,14.0,1,0,237736,30.0708,,C


### Выборки

In [7]:
df.select('Survived', 'Pclass', 'Age', 'Fare').show(4)

+--------+------+----+-------+
|Survived|Pclass| Age|   Fare|
+--------+------+----+-------+
|       0|     3|22.0|   7.25|
|       1|     1|38.0|71.2833|
|       1|     3|26.0|  7.925|
|       1|     1|35.0|   53.1|
+--------+------+----+-------+
only showing top 4 rows


### Описательная статистика

In [8]:
df.select('Survived', 'Pclass', 'Age', 'Fare').summary().show()

25/10/23 15:36:28 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+-------------------+------------------+------------------+-----------------+
|summary|           Survived|            Pclass|               Age|             Fare|
+-------+-------------------+------------------+------------------+-----------------+
|  count|                891|               891|               714|              891|
|   mean| 0.3838383838383838| 2.308641975308642| 29.69911764705882| 32.2042079685746|
| stddev|0.48659245426485753|0.8360712409770491|14.526497332334035|49.69342859718089|
|    min|                  0|                 1|              0.42|              0.0|
|    25%|                  0|                 2|              20.0|           7.8958|
|    50%|                  0|                 3|              28.0|          14.4542|
|    75%|                  1|                 3|              38.0|             31.0|
|    max|                  1|                 3|              80.0|         512.3292|
+-------+-------------------+------------------+------

In [9]:
print('Количество записей: \t', df.count())
print('Количество переменных: \t', len(df.columns))

Количество записей: 	 891
Количество переменных: 	 12


### EDA

#### Сколько человек выживших

In [10]:
df.groupBy('Survived').count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



#### Количественные характеристики

In [11]:
df.groupBy('Survived').mean('Fare', 'Age').show()

+--------+------------------+------------------+
|Survived|         avg(Fare)|          avg(Age)|
+--------+------------------+------------------+
|       1| 48.39540760233917|28.343689655172415|
|       0|22.117886885245877| 30.62617924528302|
+--------+------------------+------------------+



### Факторные переменные

In [12]:
df.groupBy('Survived').pivot('Sex').count().show()

+--------+------+----+
|Survived|female|male|
+--------+------+----+
|       1|   233| 109|
|       0|    81| 468|
+--------+------+----+



In [13]:
df.groupBy('Survived').pivot('Pclass').count().show()

+--------+---+---+---+
|Survived|  1|  2|  3|
+--------+---+---+---+
|       1|136| 87|119|
|       0| 80| 97|372|
+--------+---+---+---+



In [14]:
df.groupBy('Survived').pivot('SibSp').count().show()

+--------+---+---+---+---+---+----+----+
|Survived|  0|  1|  2|  3|  4|   5|   8|
+--------+---+---+---+---+---+----+----+
|       1|210|112| 13|  4|  3|NULL|NULL|
|       0|398| 97| 15| 12| 15|   5|   7|
+--------+---+---+---+---+---+----+----+



### Извлечение признаков

In [15]:
for col in df.columns:
    print(col.ljust(20), df.filter(df[col].isNull()).count())

PassengerId          0
Survived             0
Pclass               0
Name                 0
Sex                  0
Age                  177
SibSp                0
Parch                0
Ticket               0
Fare                 0
Cabin                687
Embarked             2


### Извлечение данных о титулах

In [16]:
df = df.withColumn('Title', regexp_extract(df['Name'], '([A-Za-z]+)\.', 1))
df.groupBy('Title').agg(count('Age'), mean('Age')).sort('count(Age)').show()

+--------+----------+------------------+
|   Title|count(Age)|          avg(Age)|
+--------+----------+------------------+
|     Don|         1|              40.0|
|Countess|         1|              33.0|
|    Lady|         1|              48.0|
|     Mme|         1|              24.0|
|    Capt|         1|              70.0|
|     Sir|         1|              49.0|
|Jonkheer|         1|              38.0|
|      Ms|         1|              28.0|
|     Col|         2|              58.0|
|    Mlle|         2|              24.0|
|   Major|         2|              48.5|
|     Rev|         6|43.166666666666664|
|      Dr|         6|              42.0|
|  Master|        36| 4.574166666666667|
|     Mrs|       108|35.898148148148145|
|    Miss|       146|21.773972602739725|
|      Mr|       398|32.368090452261306|
+--------+----------+------------------+



In [17]:
title_dic = {'Mr':'Mr', 'Miss':'Miss', 'Mrs':'Mrs', 'Master':'Master', \
             'Mlle': 'Miss', 'Major': 'Mr', 'Col': 'Mr', 'Sir': 'Mr',\
             'Don': 'Mr', 'Mme': 'Miss', 'Jonkheer': 'Mr', 'Lady': 'Mrs',\
             'Capt': 'Mr', 'Countess': 'Mrs', 'Ms': 'Miss', 'Dona': 'Mrs', \
             'Dr':'Mr', 'Rev':'Mr'}

In [18]:
mapping = create_map([lit(x) for x in chain(*title_dic.items())])

df = df.withColumn('Title', mapping[df['Title']])
df.groupBy('Title').mean('Age').show()

+------+------------------+
| Title|          avg(Age)|
+------+------------------+
|  Miss|             21.86|
|Master| 4.574166666666667|
|    Mr| 33.02272727272727|
|   Mrs|35.981818181818184|
+------+------------------+



### Извлечение сведений по ворзрасту

In [19]:
def age_imputer(df, title, age):
    return df.withColumn('Age', when((df['Age'].isNull()) & (df['Title'] == title), age).otherwise(df['Age']))

In [20]:
df = age_imputer(df, 'Mr', 33.02)
df = age_imputer(df, 'Mrs', 35.98)
df = age_imputer(df, 'Miss', 21.86)
df = age_imputer(df, 'Master', 4.57)

### Добавление агрегирующих данных и удаление неиспользуемых

In [21]:
df = df.withColumn('FamilySize', df['Parch'] + df['SibSp']).drop('Parch', 'SibSp')
df = df.drop('PassengerId', 'Cabin', 'Name', 'Ticket', 'Title')

In [22]:
df.show(3)

+--------+------+------+----+-------+--------+----------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|FamilySize|
+--------+------+------+----+-------+--------+----------+
|       0|     3|  male|22.0|   7.25|       S|         1|
|       1|     1|female|38.0|71.2833|       C|         1|
|       1|     3|female|26.0|  7.925|       S|         0|
+--------+------+------+----+-------+--------+----------+
only showing top 3 rows


In [23]:
df = df.fillna({'Embarked' : 'S'})

In [24]:
for col in df.columns:
    print(col.ljust(20), df.filter(df[col].isNull()).count())

Survived             0
Pclass               0
Sex                  0
Age                  0
Fare                 0
Embarked             0
FamilySize           0


### Построение модели (Spark ML)

- Stringindexer: преобразование строковых категорий в численные
- Векторный Ассемблер: Spark API
- Логистическая регрессия, Случайный лес, GBT
- Конвейер

### Подключение библиотек из Spark ML

In [25]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

### String Indexer

In [26]:
stringIndexer = StringIndexer(inputCols=['Sex', 'Embarked'], outputCols=['SexNum', 'EmbarkedNum'])
stringIndexer_model =stringIndexer.fit(df)
df_ = stringIndexer_model.transform(df).drop('Sex', 'Embarked')
df_.show(3)

+--------+------+----+-------+----------+------+-----------+
|Survived|Pclass| Age|   Fare|FamilySize|SexNum|EmbarkedNum|
+--------+------+----+-------+----------+------+-----------+
|       0|     3|22.0|   7.25|         1|   0.0|        0.0|
|       1|     1|38.0|71.2833|         1|   1.0|        1.0|
|       1|     3|26.0|  7.925|         0|   1.0|        0.0|
+--------+------+----+-------+----------+------+-----------+
only showing top 3 rows


### Векторный ассемблер

In [27]:
vec_asmbl = VectorAssembler(inputCols=df_.columns[1:], outputCol='features')
df_ = vec_asmbl.transform(df_).select('features', 'Survived')
df_.show(3, truncate=False)

+------------------------------+--------+
|features                      |Survived|
+------------------------------+--------+
|[3.0,22.0,7.25,1.0,0.0,0.0]   |0       |
|[1.0,38.0,71.2833,1.0,1.0,1.0]|1       |
|[3.0,26.0,7.925,0.0,1.0,0.0]  |1       |
+------------------------------+--------+
only showing top 3 rows


### Разбивка данных

In [32]:
train_df, test_df = df_.randomSplit([0.7, 0.3])
train_df.show(3, truncate=False)

+--------------------+--------+
|features            |Survived|
+--------------------+--------+
|(6,[0,1],[1.0,38.0])|0       |
|(6,[0,1],[1.0,39.0])|0       |
|(6,[0,1],[1.0,40.0])|0       |
+--------------------+--------+
only showing top 3 rows


### Вычисления и метрики

In [33]:
evalutor = MulticlassClassificationEvaluator(labelCol='Survived', metricName='accuracy')


### Логистическая регрессия

In [34]:
ridge = LogisticRegression(labelCol='Survived', maxIter=100, elasticNetParam=0, regParam=0.3)

model = ridge.fit(train_df)
pred = model.transform(test_df)
evalutor.evaluate(pred)

25/10/23 16:23:38 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


0.7785977859778598