## Лабораторная работа №1 по СОБД
## Выполнение разведочного анализа больших данных с использованием фреймворка Apache Spark
## Часть 1
## Датасет "NYC Yellow Taxi Trip Data"
### Объём: 6.88 Гб 

## Подключение необходимых библиотек

In [2]:
from pyspark.sql import SparkSession
from functools import reduce
from pyspark import SparkConf
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F 
import math

## Инициализация PySpark и создание сессии

In [3]:
conf = SparkConf()
conf.setAppName("Trip Data Analysis")
conf.setMaster("local[*]")  # Локальный режим (все ядра контейнера)
conf.set("spark.hadoop.fs.defaultFS", "hdfs://hadoop-namenode:9820")  # Указываем HDFS
conf.set("spark.driver.memory", "4g")
conf.set("spark.driver.cores", "2")
conf.set("spark.executor.memory", "12g")
conf.set("spark.executor.cores", "8")
conf.set("spark.sql.adaptive.enabled", "true")  # Оптимизация запросов

spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

## Обзор датасета
Для исследования используется датасет 
<h3  style=\"text-align:center;\"><a href=\"https://www.kaggle.com/datasets/elemento/nyc-yellow-taxi-trip-data/data\">NYC Yellow Taxi Trip Data</a></h3>

Датасет уже загружен по адресу "hdfs:///user/vladislav_vaganov/datasets". Датасет состоит из 4-х частей: tripdata_01.csv, tripdata_02.csv, tripdata_03.csv, tripdata_04.csv. 
Определим схему типов данных датасета заранее для экономии вычислительных ресурсов

In [6]:
schema = StructType([
    StructField("VendorID", IntegerType(), True),                
    StructField("tpep_pickup_datetime", TimestampType(), True),
    StructField("tpep_dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("pickup_longitude", DoubleType(), True),
    StructField("pickup_latitude", DoubleType(), True),
    StructField("RateCodeID", IntegerType(), True),               
    StructField("store_and_fwd_flag", StringType(), True),       
    StructField("dropoff_longitude", DoubleType(), True),
    StructField("dropoff_latitude", DoubleType(), True),
    StructField("payment_type", IntegerType(), True),            
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True)
])

Определим путь до директории датасета

In [7]:
path = "hdfs:///user/vladislav_vaganov/datasets/"

Заполним датафрейм данными из директории datasets

In [8]:
df_combined = spark.read \
    .option("inferSchema", "true") \
    .schema(schema) \
    .csv(path)

Выведем фрагмент датафрейма на экран

In [9]:
df_combined.limit(20).toPandas().style\
    .set_properties(**{'text-align': 'left', 'max-width': '0', 'white-space': 'nowrap', 'overflow': 'hidden', 'text-overflow': 'ellipsis'})\
    .set_table_styles([{'selector': 'th', 'props': [('text-align', 'left')]}])\
    .format(precision=2)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,,NaT,NaT,,,,,,store_and_fwd_flag,,,,,,,,,,
1,2.0,2015-01-15 19:05:39,2015-01-15 19:23:42,1.0,1.59,-73.99,40.75,1.0,N,-73.97,40.75,1.0,12.0,1.0,0.5,3.25,0.0,0.3,17.05
2,1.0,2015-01-10 20:33:38,2015-01-10 20:53:28,1.0,3.3,-74.0,40.72,1.0,N,-73.99,40.76,1.0,14.5,0.5,0.5,2.0,0.0,0.3,17.8
3,1.0,2015-01-10 20:33:38,2015-01-10 20:43:41,1.0,1.8,-73.96,40.8,1.0,N,-73.95,40.82,2.0,9.5,0.5,0.5,0.0,0.0,0.3,10.8
4,1.0,2015-01-10 20:33:39,2015-01-10 20:35:31,1.0,0.5,-74.01,40.71,1.0,N,-74.0,40.72,2.0,3.5,0.5,0.5,0.0,0.0,0.3,4.8
5,1.0,2015-01-10 20:33:39,2015-01-10 20:52:58,1.0,3.0,-73.97,40.76,1.0,N,-74.0,40.74,2.0,15.0,0.5,0.5,0.0,0.0,0.3,16.3
6,1.0,2015-01-10 20:33:39,2015-01-10 20:53:52,1.0,9.0,-73.87,40.77,1.0,N,-73.99,40.76,1.0,27.0,0.5,0.5,6.7,5.33,0.3,40.33
7,1.0,2015-01-10 20:33:39,2015-01-10 20:58:31,1.0,2.2,-73.98,40.73,1.0,N,-73.99,40.75,2.0,14.0,0.5,0.5,0.0,0.0,0.3,15.3
8,1.0,2015-01-10 20:33:39,2015-01-10 20:42:20,3.0,0.8,-74.0,40.73,1.0,N,-74.0,40.73,1.0,7.0,0.5,0.5,1.66,0.0,0.3,9.96
9,1.0,2015-01-10 20:33:39,2015-01-10 21:11:35,3.0,18.2,-73.78,40.64,2.0,N,-73.99,40.76,2.0,52.0,0.0,0.5,0.0,5.33,0.3,58.13


## Описание столбцов датасета

| Название столбца | Тип данных | Расшифровка |
|:----------------:|:----------:|:------------|
| VendorID | integer | Идентификатор провайдера телематических данных |
| tpep_pickup_datetime | timestamp | Дата и время посадки пассажира в такси |
| tpep_dropoff_datetime | timestamp | Дата и время высадки пассажира из такси |
| passenger_count | integer | Количество пассажиров в такси |
| trip_distance | double | Расстояние поездки в милях по счетчику такси |
| pickup_longitude | double | Географические координаты места посадки (долгота) |
| pickup_latitude | double | Географические координаты места посадки (широта) |
| RateCodeID | integer | Код типа тарифа |
| store_and_fwd_flag | string | Технический флаг хранения данных в буфере |
| dropoff_longitude | double | Географические координаты места высадки (долгота) |
| dropoff_latitude | double | Географические координаты места высадки (широта) |
| payment_type | integer | Способ оплаты поездки |
| fare_amount | double | Основная стоимость поездки (базовый тариф + расстояние) |
| extra | double | Дополнительные сборы и надбавки |
| mta_tax | double | Налог Metropolitan Transportation Authority |
| tip_amount | double | Сумма чаевых водителю |
| tolls_amount | double | Стоимость платных дорог, мостов, туннелей |
| improvement_surcharge | double | Сбор на улучшение сервиса такси |
| total_amount | double | Итоговая сумма к оплате |вая сумма к оплате |говая сумма к оплате ||  Row 4   |   |ка автомобиля |"ка автомобиля |"

Преобразуем датафрейм

### Оставленные столбцы

| Название столбца        | Тип данных  | Расшифровка                                                                 | Комментарий |
|:------------------------|:------------|:----------------------------------------------------------------------------|:------------|
| `VendorID`              | integer     | Идентификатор провайдера данных                              | Может отражать различия в тарифах между компаниями |
| `tpep_pickup_datetime`  | timestamp   | Дата и время посадки пассажира в такси                                      | - |
| `passenger_count`       | integer     | Количество пассажиров в такси                                               | Может влиять на тариф |
| `trip_distance`         | double      | Расстояние поездки в милях по счетчику такси                                | Один из главных факторов стоимости |
| `pickup_longitude`      | double      | Географические координаты места посадки (долгота)                           | Позволяет определить стартовую зону |
| `pickup_latitude`       | double      | Географические координаты места посадки (широта)                            | Позволяет определить стартовую зону |
| `dropoff_longitude`     | double      | Географические координаты места высадки (долгота)                           | Позволяет оценить направление и конечную зону |
| `dropoff_latitude`      | double      | Географические координаты места высадки (широта)                            | Позволяет оценить направление и конечную зону |
| `RateCodeID`            | integer     | Код типа тарифа                                                             | Напрямую определяет базовую ставку |
| `payment_type`          | integer     | Способ оплаты поездки                                                       | - |
| `total_amount`          | integer     | Итоговая стоимость поездки                                                       | Целевая переменная |

---

### Исключённые столбцы

| Столбец                 | Причина исключения |
|:------------------------|:-------------------|
| `tpep_dropoff_datetime` | Известен только после поездки нельзя использовать при прогнозе |
| `store_and_fwd_flag`    | Технический флаг, не влияет на стоимость |
| `fare_amount`           | Часть `total_amount` |
| `extra`                 | Может включать постфактум-сборы; лучше не использовать без чёткой логики |
| `mta_tax`               | Константа |
| `tip_amount`            | Устанавливается пассажиром после поездки недоступно заранее |
| `tolls_amount`          | Зависит от фактического маршрута неизвестно до поездки |
| `improvement_surcharge` | Константа |

Исключим из датафрейма выбранные столбцы

In [12]:
df_combined = df_combined.select(
    "VendorID", "tpep_pickup_datetime", "passenger_count", "trip_distance", "pickup_longitude", "pickup_latitude",
    "dropoff_longitude", "dropoff_latitude", "RateCodeID", "payment_type",
    "total_amount")

In [13]:
df_combined.limit(20).toPandas().style\
    .set_properties(**{'text-align': 'left', 'max-width': '0', 'white-space': 'nowrap', 'overflow': 'hidden', 'text-overflow': 'ellipsis'})\
    .set_table_styles([{'selector': 'th', 'props': [('text-align', 'left')]}])\
    .format(precision=2)

Unnamed: 0,VendorID,tpep_pickup_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,RateCodeID,payment_type,total_amount
0,,NaT,,,,,,,,,
1,2.0,2015-01-15 19:05:39,1.0,1.59,-73.99,40.75,-73.97,40.75,1.0,1.0,17.05
2,1.0,2015-01-10 20:33:38,1.0,3.3,-74.0,40.72,-73.99,40.76,1.0,1.0,17.8
3,1.0,2015-01-10 20:33:38,1.0,1.8,-73.96,40.8,-73.95,40.82,1.0,2.0,10.8
4,1.0,2015-01-10 20:33:39,1.0,0.5,-74.01,40.71,-74.0,40.72,1.0,2.0,4.8
5,1.0,2015-01-10 20:33:39,1.0,3.0,-73.97,40.76,-74.0,40.74,1.0,2.0,16.3
6,1.0,2015-01-10 20:33:39,1.0,9.0,-73.87,40.77,-73.99,40.76,1.0,1.0,40.33
7,1.0,2015-01-10 20:33:39,1.0,2.2,-73.98,40.73,-73.99,40.75,1.0,2.0,15.3
8,1.0,2015-01-10 20:33:39,3.0,0.8,-74.0,40.73,-74.0,40.73,1.0,1.0,9.96
9,1.0,2015-01-10 20:33:39,3.0,18.2,-73.78,40.64,-73.99,40.76,2.0,2.0,58.13


Выведем метаданные датасета

In [14]:
df_combined.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- RateCodeID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- total_amount: double (nullable = true)



## Преобразования датасета

В полученном датафрейме есть географические признаки широты и долготы посадки/высадки пассажиров. Выполним следующие преобразования:

1. Согласно формуле Хаверсина вычислим расстояние между точками посадки/высадки пассажиров на поверхности сферы по широте и долготе.

In [15]:
def haversine_miles(lat1, lon1, lat2, lon2):
    R = 3958.8
    lat1_rad = F.radians(lat1)
    lon1_rad = F.radians(lon1)
    lat2_rad = F.radians(lat2)
    lon2_rad = F.radians(lon2)
    dlat = lat2_rad - lat1_rad
    dlon = lon2_rad - lon1_rad
    a = F.sin(dlat / 2)**2 + F.cos(lat1_rad) * F.cos(lat2_rad) * F.sin(dlon / 2)**2
    c = 2 * F.atan2(F.sqrt(a), F.sqrt(1 - a))
    return R * c

df_with_distance = df_combined.withColumn(
    "haversine_distance",
    haversine_miles(
        F.col("pickup_latitude"), F.col("pickup_longitude"),
        F.col("dropoff_latitude"), F.col("dropoff_longitude")
    )
)

2. Определим константы координаты аэропортов в Нью-Йорке для того, чтобы ввести в датафрейм признак, определяющий была ли поездка начата/закончена в области аэропортов

In [16]:
# Координаты аэропортов Нью-Йорка
JFK_LONG, JFK_LAT = -73.78222222, 40.64416667
LGA_LONG, LGA_LAT = -73.87388889, 40.77694444
EWR_LONG, EWR_LAT = -74.17472222, 40.68972222
AIRPORT_RADIUS_MILES = 0.9  # область аэропорта

Рассчитаем признак "is_airport_trip" для датафрейма:

In [17]:
# Добавляем флаг is_airport_trip
df_with_airports = df_with_distance.withColumn(
    "is_airport_trip",
    F.when(
        (haversine_miles(F.col("pickup_latitude"), F.col("pickup_longitude"), F.lit(JFK_LAT), F.lit(JFK_LONG)) <= AIRPORT_RADIUS_MILES) |
        (haversine_miles(F.col("dropoff_latitude"), F.col("dropoff_longitude"), F.lit(JFK_LAT), F.lit(JFK_LONG)) <= AIRPORT_RADIUS_MILES) |
        (haversine_miles(F.col("pickup_latitude"), F.col("pickup_longitude"), F.lit(LGA_LAT), F.lit(LGA_LONG)) <= AIRPORT_RADIUS_MILES) |
        (haversine_miles(F.col("dropoff_latitude"), F.col("dropoff_longitude"), F.lit(LGA_LAT), F.lit(LGA_LONG)) <= AIRPORT_RADIUS_MILES) |
        (haversine_miles(F.col("pickup_latitude"), F.col("pickup_longitude"), F.lit(EWR_LAT), F.lit(EWR_LONG)) <= AIRPORT_RADIUS_MILES) |
        (haversine_miles(F.col("dropoff_latitude"), F.col("dropoff_longitude"), F.lit(EWR_LAT), F.lit(EWR_LONG)) <= AIRPORT_RADIUS_MILES),
        1
    ).otherwise(0)
)

3. Обработаем признак "tpep_pickup_datetime". Рассчитаем из него следующие признаки: "pickup_hour" (час посадки), "pickup_dow" (день недели).

In [18]:
df_with_time_features = df_with_airports \
    .withColumn("pickup_hour", F.hour("tpep_pickup_datetime")) \
    .withColumn("pickup_dow", F.dayofweek("tpep_pickup_datetime"))

Исключим лишние столбцы после преобразований и приведём датафрейм к следующей структуре:

| Название столбца        | Тип данных  | Расшифровка                                                                 |
|:------------------------|:------------|:----------------------------------------------------------------------------|
| `VendorID`              | integer     | Идентификатор провайдера телематических данных                              |
| `passenger_count`       | integer     | Количество пассажиров                                                       |
| `trip_distance`         | double      | Фактическое расстояние поездки в милях                                      |
| `haversine_distance`    | double      | Расстояние по прямой между посадкой и высадкой (в милях)                    |
| `RateCodeID`            | integer     | Код типа тарифа                                                             |
| `payment_type`          | integer     | Способ оплаты                                                               |
| `is_airport_trip`       | integer     | Флаг поездки в/из аэропорта (1 = да, 0 = нет)                               |
| `pickup_hour`           | integer     | Час посадки (0–23)                                                          |
| `pickup_dow`            | integer     | День недели (1 = воскресенье, ..., 7 = суббота)                             |
| `total_amount`          | double      | Итоговая сумма к оплате

In [33]:
df_transformed = df_with_time_features.select(
    "VendorID", "passenger_count", "trip_distance", "haversine_distance","RateCodeID", "payment_type", "is_airport_trip", 
    "pickup_hour", "pickup_dow", "total_amount")

df_transformed.show()

row_count = df_combined.count()
print(f"Количество строк: {row_count}")

+--------+---------------+-------------+-------------------+----------+------------+---------------+-----------+----------+------------+
|VendorID|passenger_count|trip_distance| haversine_distance|RateCodeID|payment_type|is_airport_trip|pickup_hour|pickup_dow|total_amount|
+--------+---------------+-------------+-------------------+----------+------------+---------------+-----------+----------+------------+
|    NULL|           NULL|         NULL|               NULL|      NULL|        NULL|              0|       NULL|      NULL|        NULL|
|       2|              1|         1.59| 1.0009738583401306|         1|           1|              0|         19|         5|       17.05|
|       1|              1|          3.3|  2.438630505784401|         1|           1|              0|         20|         7|        17.8|
|       1|              1|          1.8| 1.6110715418320922|         1|           2|              0|         20|         7|        10.8|
|       1|              1|          0.5| 

Сохраним получившийся датафрейм в виде формата Parquet

Определим путь до директории паркет файлов

In [32]:
path_output = "hdfs:///user/vladislav_vaganov/datasets/parquets/tripdata_transformed.parquet"

Сохраним датафрейм в виде файла "tripdata_transformed.parquet"

In [33]:
df_transformed.write \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .parquet(path_output)

Останавливаем spark-сессию

In [34]:
spark.stop()