##датасет: https://www.kaggle.com/competitions/titanic/data

##импорт библиотек

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [None]:
# ! pip install great_expectations
# import great_expectations as ge
import pandas as pd
import os



##настройка среды

In [None]:
spark = SparkSession.builder \
    .appName("Titanic") \
    .getOrCreate()

##загрузка данных

In [None]:
!unzip /content/titanic.zip

Archive:  /content/titanic.zip
  inflating: gender_submission.csv   
  inflating: test.csv                
  inflating: train.csv               


In [None]:
df = spark.read.csv("/content/train.csv", header=True, inferSchema=True)
df.show(5)
df.printSchema()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [None]:
df.describe().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                NULL|  NULL| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

##проверка NULL-значений (completeness)

In [None]:
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|177|    0|    0|     0|   0|  687|       2|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



много утерянных данных об вохрасте пассажиров и номерах кают (Cabin)

##проверка дубликатов (uniqueness)

In [None]:
df.groupBy("PassengerId").count().filter("count > 1").show()

+-----------+-----+
|PassengerId|count|
+-----------+-----+
+-----------+-----+



дубликатов не обнаружено, все данные уникальны

##проверка данных по соответствию правилу (consistency), возраст > 0

In [None]:
df.filter(F.col("Age") < 0).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



все данные соответствуют условию

In [None]:
df.select("Name", "Age", "Sex").filter(F.col("Age") > 30).show(5)

+--------------------+----+------+
|                Name| Age|   Sex|
+--------------------+----+------+
|Cumings, Mrs. Joh...|38.0|female|
|Futrelle, Mrs. Ja...|35.0|female|
|Allen, Mr. Willia...|35.0|  male|
|McCarthy, Mr. Tim...|54.0|  male|
|Bonnell, Miss. El...|58.0|female|
+--------------------+----+------+
only showing top 5 rows



смотрим первые имя, возраст, пол первых 5 пассажиров старше 30 лет

In [None]:
df.groupBy("Pclass").agg(
F.avg("Age").alias("avg_age"),
F.max("Fare").alias("max_fare"),
F.count("PassengerId").alias("num_passengers")
).show()

+------+------------------+--------+--------------+
|Pclass|           avg_age|max_fare|num_passengers|
+------+------------------+--------+--------------+
|     1|38.233440860215055|512.3292|           216|
|     3| 25.14061971830986|   69.55|           491|
|     2| 29.87763005780347|    73.5|           184|
+------+------------------+--------+--------------+



смотрим средний возраст и количество пассажиров, исходя из класса их билета

In [None]:
window_spec = Window.partitionBy("Pclass").orderBy(F.desc("Fare"))
df_with_rank = df.withColumn("fare_rank", F.rank().over(window_spec))
df_with_rank.select("PassengerId", "Pclass", "Fare", "fare_rank").show(10)

+-----------+------+--------+---------+
|PassengerId|Pclass|    Fare|fare_rank|
+-----------+------+--------+---------+
|        259|     1|512.3292|        1|
|        680|     1|512.3292|        1|
|        738|     1|512.3292|        1|
|         28|     1|   263.0|        4|
|         89|     1|   263.0|        4|
|        342|     1|   263.0|        4|
|        439|     1|   263.0|        4|
|        312|     1| 262.375|        8|
|        743|     1| 262.375|        8|
|        119|     1|247.5208|       10|
+-----------+------+--------+---------+
only showing top 10 rows



делим данные на партиции по классу билета и присваиваем ранги в новый столбец. похоже на RANK в SQL, т.к. после группы одинаковых значений ранг увеличивается на кол-во строк

##реализация Data Pipeline Architecture

In [None]:
# stage - загружаем сырые данные
df_stage = df

In [None]:
# clean - чистим данные: заполняем NULL-значения(возраст, номер кабины, порт отправления)
df_clean = df_stage.fillna({"Age": df_stage.select(F.avg("Age")).first()[0],
"Embarked": "Unknown", "Cabin": "Unknown"})

In [None]:
# feature - формируем признак для ML/аналитики (бинарный признак - совершеннолетний или нет)
df_feature = df_clean.withColumn("is_adult", F.when(F.col("Age") >= 18, 1).otherwise(0))
df_feature.select("Name", "Age", "is_adult").show(10)

+--------------------+-----------------+--------+
|                Name|              Age|is_adult|
+--------------------+-----------------+--------+
|Braund, Mr. Owen ...|             22.0|       1|
|Cumings, Mrs. Joh...|             38.0|       1|
|Heikkinen, Miss. ...|             26.0|       1|
|Futrelle, Mrs. Ja...|             35.0|       1|
|Allen, Mr. Willia...|             35.0|       1|
|    Moran, Mr. James|29.69911764705882|       1|
|McCarthy, Mr. Tim...|             54.0|       1|
|Palsson, Master. ...|              2.0|       0|
|Johnson, Mrs. Osc...|             27.0|       1|
|Nasser, Mrs. Nich...|             14.0|       0|
+--------------------+-----------------+--------+
only showing top 10 rows



In [None]:
# model/reporting