## Лабораторная работа № 1 
## Выполнение разведочного анализа больших данных с использованием фреймворка Apache Spark

### Часть 1

В данной части работы рассмотрены:
* базовые преобразования данных;

Подключим необходимые библиотеки.

In [1]:
import os
from pyspark.sql import SparkSession, DataFrame
from pyspark import SparkConf
from pyspark.sql.functions import (
    regexp_replace,
    regexp_extract_all,
    col,
    lit,
)


Сформируем объект конфигурации для `Apache Spark`, указав необходимые параметры.

In [2]:
def create_spark_configuration() -> SparkConf:
    """
    Создает и конфигурирует экземпляр SparkConf для приложения Spark.

    Returns:
        SparkConf: Настроенный экземпляр SparkConf.
    """
    # Получаем имя пользователя
    user_name = os.getenv("USER")
    
    conf = SparkConf()
    conf.setAppName("lab 1 Test")
    conf.setMaster("local[*]")
    conf.set("spark.executor.memory", "12g")
    conf.set("spark.executor.cores", "8")
    conf.set("spark.executor.instances", "2")
    conf.set("spark.driver.memory", "4g")
    conf.set("spark.driver.cores", "2")

    return conf

Создаём сам объект конфигурации.

In [3]:
conf = create_spark_configuration()

Создаём и выводим на экран сессию `Apache Spark`. 

In [4]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

Для исследования будем использовать датасет `"New York City Taxi Trips"`, расположенный на платформе `Kaggle` по адресу https://www.kaggle.com/datasets/dhruvildave/new-york-city-taxi-trips-2019/data.

Датасет включает в себя информацию о 84,5 миллионах поездок желтого такси Нью-Йорка в 2019 году.

Указываем путь для файла с данными.

In [5]:
path = "/home/user/work/csvtaxi/tripdata.csv"

Заполняем датафрейм данными из файла.

In [6]:
df = (spark.read.format("csv")
      .option("header", "true")
      .load(path)
)

Выводим фрагмент датафрейма на экран.

In [7]:
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|ratecodeid|store_and_fwd_flag|pulocationid|dolocationid|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|     1.0|2019-01-01 00:46:...| 2019-01-01 00:53:...|            1.0|          1.5|       1.0|                 N|       151.0|       239.0|         1.0|        7.0|  0.5|    0.5|      1.65|         0.0|                  0.3

Очевидно, что в целях сохранения ясности изложения и сокращения расчетного времени имеет смысл рассматривать не все солбцы датасета. Оставим следующие колонки, удалив остальные:

| Название столбца  | Расшифровка |
| ------------- | ------------- |
| vendorid               | Идентификационный номер компании перевозчика. 1 = Creative Mobile Technologies, LLC; 2 = VeriFone Inc. |
| tpep_pickup_datetime        | Дата и время включения счетчика  |
| tpep_dropoff_datetime       | Дата и время отключения счетчика |
| passenger_count             | Количество пассажиров в транспортном средстве. Это значение вводится водителем |
| trip_distance     | Пройденное расстояние в милях, указанное таксометром |
| pulocationid         | Зона такси TLC, в которой был включен таксометр |
| dolocationid     | Зона такси TLC, в которой был отключен таксометр |
| ratecodeid           | Код окончательного тарифа, действующий в конце поездки. 1 = Стандартный тариф. 2 = Аэропорт Кеннеди. 3 = Ньюарк. 4 = Нассау или Вестчестер. 5 = Договорная стоимость. 6 = Групповая поездка |
| store_and_fwd_flag         | Этот флаг указывает, была ли запись о поездке сохранена в памяти транспортного средства перед отправкой поставщику услуг(так называемое «хранение и пересылка»), поскольку транспортное средство не было подключено к серверу. Y = поездка с сохранением и пересылкой. N = поездка не с сохранением и пересылкой. |
| payment_type     | Числовой код, указывающий, как пассажир оплатил поездку. 1 = Кредитная карта. 2 = Наличные. 3 = Без комиссии. 4 = Оспаривание. 5 = Неизвестно. 6 = Аннулированная поездка. |
| fare_amount   | Стоимость проезда с учетом времени и расстояния, рассчитанная по счетчику. |
| extra            | Различные дополнительные сборы и доплаты. Сюда входят только сборы за час пик 0,50 и 1 доллар США за ночь. |
| mta_tax             | Налог в размере 0,50 долларов |
| improvement_surcharge      | Доплата за благоустройство такси в размере 0,30 доллара |
| tip_amount      | Сумма чаевых — это поле автоматически заполняется для чаевых по кредитной карте |
| tolls_amount      | Общая сумма всех оплаченных дорожных сборов за поездку |
| total_amount      | Общая сумма, взимаемая с пассажиров. Не включает наличные чаевые. |
| congestion_surcharge      | Доплата за пробки |

In [8]:
df = df.select(
    "vendorid", "tpep_pickup_datetime", "tpep_dropoff_datetime", "passenger_count", "trip_distance", "ratecodeid", "store_and_fwd_flag", "payment_type", "fare_amount",
    "extra", "mta_tax", "improvement_surcharge", "tip_amount", "congestion_surcharge"
)

In [9]:
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+-----------+-----+-------+---------------------+----------+--------------------+
|vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|ratecodeid|store_and_fwd_flag|payment_type|fare_amount|extra|mta_tax|improvement_surcharge|tip_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+-----------+-----+-------+---------------------+----------+--------------------+
|     1.0|2019-01-01 00:46:...| 2019-01-01 00:53:...|            1.0|          1.5|       1.0|                 N|         1.0|        7.0|  0.5|    0.5|                  0.3|      1.65|                NULL|
|     1.0|2019-01-01 00:59:...| 2019-01-01 01:18:...|            1.0|          2.6|       1.0|                 N|         1.0|       14.0|  0.5|    0.5|                  0.

Выведем на экран метаданные датасета.

In [10]:
df.printSchema()

root
 |-- vendorid: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- ratecodeid: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)



In [11]:
df = df.sample(False, 0.2)

Видно, что все столбцы датасета содержат строковый тип данных, что не соответствует ожиданиям. Выполним преобразования типов данных некоторых столбцов.

In [12]:
from pyspark.sql import functions as F
def transform_dataframe(data: DataFrame) -> DataFrame:
    """
    Преобразует столбцы DataFrame в указанные значения и типы данных и
    выполняет необходимые преобразования.

    Args:
        data (DataFrame): Исходный DataFrame.

    Returns:
        DataFrame: Преобразованный DataFrame.
    """
    # Преобразуем столбцы в соответствующие значения
    data = data.withColumn(
    "vendorid",
    F.when(col("vendorid") == "1.0", "Creative Mobile Technologies, LLC")
    .when(col("vendorid") == "2.0", "VeriFone Inc")
    .otherwise(None)
       )
    data = data.withColumn(
    "ratecodeid", 
    F.when(F.col("ratecodeid") == "1.0", "Standard rate")
     .when(F.col("ratecodeid") == "2.0", "JFK")
     .when(F.col("ratecodeid") == "3.0", "Newark")
     .when(F.col("ratecodeid") == "4.0", "Nassau or Westchester")
     .when(F.col("ratecodeid") == "5.0", "Negotiated fare")
     .when(F.col("ratecodeid") == "6.0", "Group ride")
     .otherwise(None)
       )
    data = data.withColumn(
    "payment_type", 
    F.when(F.col("payment_type") == "1.0", "Credit card")
     .when(F.col("payment_type") == "2.0", "Cash")
     .when(F.col("payment_type") == "3.0", "No charge")
     .when(F.col("payment_type") == "4.0", "Dispute")
     .when(F.col("payment_type") == "5.0", "Unknown")
     .when(F.col("payment_type") == "6.0", "Voided trip")
     .otherwise(None)
       )
    data = data.withColumn(
    "store_and_fwd_flag", 
    F.when(F.col("store_and_fwd_flag") == "Y", True)
     .when(F.col("store_and_fwd_flag") == "N", False)
     .otherwise(None)
       )
    # Преобразуем столбцы в соответствующие типы данных
    data = data.withColumn("tpep_pickup_datetime",
                           col("tpep_pickup_datetime").cast("Timestamp"))
    data = data.withColumn("tpep_dropoff_datetime",
                           col("tpep_dropoff_datetime").cast("Timestamp"))
    data = data.withColumn("passenger_count",
                           col("passenger_count").cast("Integer"))
    data = data.withColumn("trip_distance",
                           col("trip_distance").cast("Double"))
    data = data.withColumn("fare_amount",
                           col("fare_amount").cast("Double"))
    data = data.withColumn("extra",
                           col("extra").cast("Double"))
    data = data.withColumn("mta_tax",
                           col("mta_tax").cast("Double"))
    data = data.withColumn("improvement_surcharge",
                           col("improvement_surcharge").cast("Double"))
    data = data.withColumn("tip_amount",
                           col("tip_amount").cast("Double"))
    data = data.withColumn("congestion_surcharge",
                           col("congestion_surcharge").cast("Double"))

    return data

In [13]:
df = transform_dataframe(df)

In [14]:
df.show()

+--------------------+--------------------+---------------------+---------------+-------------+-------------+------------------+------------+-----------+-----+-------+---------------------+----------+--------------------+
|            vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|   ratecodeid|store_and_fwd_flag|payment_type|fare_amount|extra|mta_tax|improvement_surcharge|tip_amount|congestion_surcharge|
+--------------------+--------------------+---------------------+---------------+-------------+-------------+------------------+------------+-----------+-----+-------+---------------------+----------+--------------------+
|Creative Mobile T...| 2019-01-01 00:46:40|  2019-01-01 00:53:20|              1|          1.5|Standard rate|             false| Credit card|        7.0|  0.5|    0.5|                  0.3|      1.65|                NULL|
|        VeriFone Inc| 2018-11-28 16:29:37|  2018-11-28 16:33:43|              5|          0.0|          JFK|   

In [15]:
df.printSchema()

root
 |-- vendorid: string (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- ratecodeid: string (nullable = true)
 |-- store_and_fwd_flag: boolean (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [16]:
df.write.parquet("/home/user/work/csvtaxi/csvtaxi6.parquet")

In [17]:
spark.stop()