## Spark

Схема данных, датасеты

В наборе данных Iris:
	•	Сепал (sepal) – это внешняя, защитная часть цветка, которая закрывает и оберегает его до распускания.
	•	Лепесток (petal) – это внутренняя, часто яркая часть цветка, предназначенная для привлечения опылителей.

Spark DataFrame - https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html

In [5]:
import os
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast, col
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType

# 1. Создаем Spark сессию
spark = SparkSession.builder \
    .appName("Пример: датасет из нескольких частей") \
    .getOrCreate()

# Задаем схему для исходного датасета Iris
iris_schema = StructType([
    StructField("sepal_length", DoubleType(), True),  # название поля, тип, nullable
    StructField("sepal_width", DoubleType(), True),
    StructField("petal_length", DoubleType(), True),
    StructField("petal_width", DoubleType(), True),
    StructField("species", StringType(), True)
])

# 2. Скачиваем датасет Iris, если файл отсутствует
iris_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"
iris_file = "iris.csv"
if not os.path.exists(iris_file):
    print("Скачиваем датасет Iris...")
    response = requests.get(iris_url)
    with open(iris_file, "wb") as f:
        f.write(response.content)
    print("Скачивание завершено!")

# 3. Читаем CSV-файл в DataFrame с заданной схемой
# header - false - не читать заголовок
df_iris = spark.read.option("header", "false") \
                    .schema(iris_schema) \
                    .csv(iris_file)
# Убираем пустые строки (если они есть)
df_iris = df_iris.filter(col("species") != "")
print("Исходный датасет Iris:")
df_iris.show(5)

Исходный датасет Iris:
+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|    species|
+------------+-----------+------------+-----------+-----------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|
+------------+-----------+------------+-----------+-----------+
only showing top 5 rows



In [6]:
# 4. Записываем датасет Iris на диск, разделяя его по столбцу "species"
output_dir = "data/iris_parts"
df_iris.write.mode("overwrite").partitionBy("species").csv(output_dir, header=True)
print(f"Датасет Iris записан в виде нескольких частей в папку '{output_dir}'.")

# 5. Читаем объединенный датасет из нескольких частей
# Важно: читаем корневую папку без подстановочного шаблона, чтобы Spark обнаружил колонку partition ("species")
# При этом указываем схему для остальных колонок
data_schema = StructType([
    StructField("sepal_length", DoubleType(), True),
    StructField("sepal_width", DoubleType(), True),
    StructField("petal_length", DoubleType(), True),
    StructField("petal_width", DoubleType(), True)
])
df_parts = spark.read.option("header", "true").schema(data_schema).csv(output_dir)
print("Схема объединенного датасета из нескольких частей:")
df_parts.printSchema()
df_parts.show(5)

Датасет Iris записан в виде нескольких частей в папку 'data/iris_parts'.
Схема объединенного датасета из нескольких частей:
root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- species: string (nullable = true)

+------------+-----------+------------+-----------+--------------+
|sepal_length|sepal_width|petal_length|petal_width|       species|
+------------+-----------+------------+-----------+--------------+
|         6.3|        3.3|         6.0|        2.5|Iris-virginica|
|         5.8|        2.7|         5.1|        1.9|Iris-virginica|
|         7.1|        3.0|         5.9|        2.1|Iris-virginica|
|         6.3|        2.9|         5.6|        1.8|Iris-virginica|
|         6.5|        3.0|         5.8|        2.2|Iris-virginica|
+------------+-----------+------------+-----------+--------------+
only showing top 5 rows



In [7]:
# 6. Создаем справочный DataFrame с информацией о видах
lookup_data = [
    ("Iris-setosa", "Маленькие лепестки"),
    ("Iris-versicolor", "Средние лепестки"),
    ("Iris-virginica", "Большие лепестки")
]
df_lookup = spark.createDataFrame(lookup_data, ["species", "description"])
print("Справочный датасет:")
df_lookup.show()


Справочный датасет:
+---------------+------------------+
|        species|       description|
+---------------+------------------+
|    Iris-setosa|Маленькие лепестки|
|Iris-versicolor|  Средние лепестки|
| Iris-virginica|  Большие лепестки|
+---------------+------------------+



https://medium.com/@Prashank.jauhari/understanding-broadcast-variables-in-apache-spark-8233d35726fc#:~:text=Broadcast%20Variables%20in%20Spark%20allow,RDD%20(Resilient%20Distributed%20Dataset).  Больше про broadcast

In [8]:

# 7. Выполняем broadcast join между объединенным датасетом и справочным
df_joined = df_parts.join(broadcast(df_lookup), on="species", how="left")
print("Результат broadcast join:")
df_joined.show(5)


Результат broadcast join:
+--------------+------------+-----------+------------+-----------+----------------+
|       species|sepal_length|sepal_width|petal_length|petal_width|     description|
+--------------+------------+-----------+------------+-----------+----------------+
|Iris-virginica|         6.3|        3.3|         6.0|        2.5|Большие лепестки|
|Iris-virginica|         5.8|        2.7|         5.1|        1.9|Большие лепестки|
|Iris-virginica|         7.1|        3.0|         5.9|        2.1|Большие лепестки|
|Iris-virginica|         6.3|        2.9|         5.6|        1.8|Большие лепестки|
|Iris-virginica|         6.5|        3.0|         5.8|        2.2|Большие лепестки|
+--------------+------------+-----------+------------+-----------+----------------+
only showing top 5 rows



In [9]:
# 8. Демонстрируем репартитионирование объединенного датасета.
df_parts = df_parts.withColumn("sepal_int", col("sepal_length").cast("integer"))
df_repartitioned = df_parts.repartition(20, col("sepal_int"))
print("Количество партиций после репартитионирования:", df_repartitioned.rdd.getNumPartitions())

Количество партиций после репартитионирования: 20


Spark UI https://spark.apache.org/docs/3.5.3/web-ui.html


Что такое партиционирование?

В Apache Spark данные распределяются между узлами кластера в виде RDD (резилиентных распределённых наборов данных) или DataFrame. Каждый такой набор данных разбивается на «партиции» – логические блоки данных, которые обрабатываются параллельно. Количество партиций напрямую влияет на уровень параллелизма: каждая партиция обрабатывается отдельной задачей (task).

In [10]:
# 9. Сохраняем репартитиционированный датасет в формате Parquet с разделением по столбцу "species"
parquet_output = "output/iris_parquet_multi"
df_repartitioned.write.mode("overwrite").partitionBy("species").parquet(parquet_output)
print(f"Датасет сохранен в формате Parquet в папку '{parquet_output}'.")

# 10. Читаем сохраненный датасет из Parquet
df_restored = spark.read.parquet(parquet_output)
print("Восстановленный DataFrame из Parquet:")
df_restored.show(5, truncate=False)
print("Количество партиций во восстановленном DataFrame:", df_restored.rdd.getNumPartitions())



                                                                                

Датасет сохранен в формате Parquet в папку 'output/iris_parquet_multi'.
Восстановленный DataFrame из Parquet:
+------------+-----------+------------+-----------+---------+--------------+
|sepal_length|sepal_width|petal_length|petal_width|sepal_int|species       |
+------------+-----------+------------+-----------+---------+--------------+
|6.3         |3.3        |6.0         |2.5        |6        |Iris-virginica|
|6.3         |2.9        |5.6         |1.8        |6        |Iris-virginica|
|6.5         |3.0        |5.8         |2.2        |6        |Iris-virginica|
|6.7         |2.5        |5.8         |1.8        |6        |Iris-virginica|
|6.5         |3.2        |5.1         |2.0        |6        |Iris-virginica|
+------------+-----------+------------+-----------+---------+--------------+
only showing top 5 rows

Количество партиций во восстановленном DataFrame: 10


Parquet - колоночный формат данных
https://aws.amazon.com/ru/compare/the-difference-between-olap-and-oltp/

https://softwareengineeringdaily.com/2017/01/13/columnar-data-apache-arrow-and-parquet-with-julien-le-dem-and-jacques-nadeau/

In [11]:
# Предположим, что мы продолжаем работать с исходным DataFrame df_iris,
# который был создан ранее.

# 1. Выведем описательную статистику для числовых столбцов
print("Описательная статистика набора данных:")
df_iris.describe().show()

# 2. Вычислим взаимные корреляции между числовыми столбцами
numeric_cols = ["sepal_length", "sepal_width", "petal_length", "petal_width"]
print("Взаимные корреляции между признаками:")
for i in range(len(numeric_cols)):
    for j in range(i+1, len(numeric_cols)):
        corr_val = df_iris.stat.corr(numeric_cols[i], numeric_cols[j])
        print(f"Корреляция между {numeric_cols[i]} и {numeric_cols[j]}: {corr_val:.3f}")

Описательная статистика набора данных:


25/03/01 09:54:07 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+------------------+-------------------+------------------+------------------+--------------+
|summary|      sepal_length|        sepal_width|      petal_length|       petal_width|       species|
+-------+------------------+-------------------+------------------+------------------+--------------+
|  count|               150|                150|               150|               150|           150|
|   mean| 5.843333333333335| 3.0540000000000007|3.7586666666666693|1.1986666666666672|          NULL|
| stddev|0.8280661279778637|0.43359431136217375| 1.764420419952262|0.7631607417008414|          NULL|
|    min|               4.3|                2.0|               1.0|               0.1|   Iris-setosa|
|    max|               7.9|                4.4|               6.9|               2.5|Iris-virginica|
+-------+------------------+-------------------+------------------+------------------+--------------+

Взаимные корреляции между признаками:
Корреляция между sepal_length и sepal_width

In [12]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# 1. Преобразуем столбец species в числовой индекс (label)
indexer = StringIndexer(inputCol="species", outputCol="label")

# 2. Собираем числовые признаки в единый столбец "features"
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")

# 3. Создаем классификатор: логистическую регрессию
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)

# 4. Формируем Pipeline, объединяя этапы преобразования и классификатора
pipeline = Pipeline(stages=[indexer, assembler, lr])

# 5. Разбиваем данные на обучающую (70%) и тестовую (30%) выборки
train_df, test_df = df_iris.randomSplit([0.7, 0.3], seed=42)

# 6. Обучаем модель
model = pipeline.fit(train_df)

# 7. Делаем прогнозы на тестовой выборке
predictions = model.transform(test_df)
print("Примеры прогнозов:")
predictions.select("species", "label", "prediction", "probability").show(20, truncate=False)

# 8. Оцениваем точность модели
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Точность модели: {:.3f}".format(accuracy))

25/03/01 09:58:41 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


Примеры прогнозов:
+---------------+-----+----------+-----------------------------------------------------------------+
|species        |label|prediction|probability                                                      |
+---------------+-----+----------+-----------------------------------------------------------------+
|Iris-setosa    |2.0  |2.0       |[4.599705793769882E-19,4.671058980629929E-4,0.9995328941019369]  |
|Iris-setosa    |2.0  |2.0       |[2.749813503816678E-19,5.205764724954879E-4,0.9994794235275045]  |
|Iris-setosa    |2.0  |2.0       |[3.569703930131562E-22,2.6874378001183374E-5,0.9999731256219988] |
|Iris-setosa    |2.0  |2.0       |[2.3019756241931815E-19,7.122838327080171E-4,0.999287716167292]  |
|Iris-setosa    |2.0  |2.0       |[6.760792642654331E-18,0.003634416180262674,0.9963655838197374]  |
|Iris-setosa    |2.0  |2.0       |[3.0951077142726367E-19,7.033173666718958E-4,0.9992966826333282] |
|Iris-setosa    |2.0  |2.0       |[2.0653966381466125E-18,0.001252777300

## Задание - Эксперименты с настройками Spark

1.	Изменение числа партиций:
  -	Проведите эксперименты с разным количеством партиций (например, 10, 20, 50) для одного и того же датасета.
  -	Измерьте время выполнения агрегационных операций (например, groupBy и count) при разном числе партиций.
  -	Сравните и обсудите результаты.
2.	Проверка влияния параметров broadcast:
  -	Измените значение параметра spark.sql.autoBroadcastJoinThreshold и выполните join двух DataFrame.
  -	Проанализируйте, как изменяется план выполнения и время выполнения join.

  https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page - адрес датасета

In [13]:
import os
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp
import time

# Функция для скачивания файла по URL
def download_file(url, local_path):
    os.makedirs(os.path.dirname(local_path), exist_ok=True)
    print(f"Скачивание данных из {url} в {local_path}...")
    response = requests.get(url, stream=True)
    with open(local_path, 'wb') as f:
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                f.write(chunk)
    print("Скачивание завершено.")

# Скачивание данных Yellow Taxi за 2020 год (месяцы 1 - 12)
year = "2020"
taxi_type = "yellow"
base_url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download"

for month in range(1, 13):
    month_str = f"{month:02d}"
    url = f"{base_url}/{taxi_type}/yellow_tripdata_{year}-{month_str}.csv.gz"
    local_path = f"data/nyc_taxi/{taxi_type}/{year}/yellow_tripdata_{year}-{month_str}.csv.gz"
    download_file(url, local_path)

Скачивание данных из https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-01.csv.gz в data/nyc_taxi/yellow/2020/yellow_tripdata_2020-01.csv.gz...
Скачивание завершено.
Скачивание данных из https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-02.csv.gz в data/nyc_taxi/yellow/2020/yellow_tripdata_2020-02.csv.gz...
Скачивание завершено.
Скачивание данных из https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-03.csv.gz в data/nyc_taxi/yellow/2020/yellow_tripdata_2020-03.csv.gz...
Скачивание завершено.
Скачивание данных из https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-04.csv.gz в data/nyc_taxi/yellow/2020/yellow_tripdata_2020-04.csv.gz...
Скачивание завершено.
Скачивание данных из https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-05.csv.gz в data/nyc_taxi/yellow/2020/yellow_tripdata_2020-

In [14]:
# Создаем SparkSession
spark = SparkSession.builder.appName("Эксперименты с партиционированием и broadcast").getOrCreate()

# Читаем скачанный CSV (с распаковкой, если требуется; в данном случае Spark может читать gzipped файлы напрямую)
# df_taxi = spark.read.option("header", "true").csv("data/nyc_taxi/yellow/2020/*.csv.gz")
df_taxi = spark.read.option("header", "true").csv("data/nyc_taxi/yellow/2020/yellow_tripdata_2020-01.csv.gz")

# Приведение типов: преобразуем даты и числовые поля
df_taxi = df_taxi.withColumn("tpep_pickup_datetime", to_timestamp(col("tpep_pickup_datetime"))) \
                 .withColumn("fare_amount", col("fare_amount").cast("double")) \
                 .withColumn("tip_amount", col("tip_amount").cast("double"))

# Для ускорения экспериментов берем подвыборку данных (например, 0.1% от исходного)
df_sampled = df_taxi.sample(withReplacement=False, fraction=0.001, seed=42)
print("Размер выборки:", df_sampled.count())

#############################################
# Эксперимент 1. Изменение числа партиций
#############################################

partitions_list = [1, 2, 10]
aggregation_times = {}

print("\n--- Эксперимент: Изменение числа партиций и измерение времени агрегирования ---")
for num_parts in partitions_list:
    # Репартитионируем выборку на num_parts партиций
    df_repart = df_sampled.repartition(num_parts)

    start_time = time.time()
    # Группируем по дате поездки (приводим datetime к дате) и считаем количество строк
    df_agg = df_repart.groupBy(col("tpep_pickup_datetime").cast("date").alias("pickup_date")).count().collect()
    elapsed_time = time.time() - start_time

    aggregation_times[num_parts] = elapsed_timeу
    print(f"Количество партиций: {num_parts}, время выполнения агрегирования: {elapsed_time:.3f} сек")


25/03/01 10:01:58 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

Размер выборки: 6295

--- Эксперимент: Изменение числа партиций и измерение времени агрегирования ---


                                                                                

Количество партиций: 1, время выполнения агрегирования: 5.536 сек




Количество партиций: 2, время выполнения агрегирования: 6.463 сек
CodeCache: size=131072Kb used=45540Kb max_used=45898Kb free=85531Kb
 bounds [0x0000000105ecc000, 0x0000000108bdc000, 0x000000010decc000]
 total_blobs=16424 nmethods=15136 adapters=1198
 compilation: disabled (not enough contiguous free space left)


[Stage 68:>                                                         (0 + 1) / 1]

Количество партиций: 10, время выполнения агрегирования: 6.355 сек


                                                                                

In [15]:
df_sampled.repartition(10)\
  .write.mode("overwrite").csv("taxi-test", header=True)


                                                                                

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.createOrReplaceTempView.html


In [16]:
df_sampled.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       2| 2020-01-01 00:22:56|  2020-01-01 00:37:03|              2|         7.43|         1|                 N|           7|          56|           2|       22.0|  0.5|    0.5|       0.0|           0|                  0.3

In [17]:
from pyspark.sql.functions import sum as spark_sum
df_sampled.groupBy("VendorID")\
          .agg(spark_sum("passenger_count").alias("total_passengers"))\
          .show(5)

[Stage 78:>                                                         (0 + 1) / 1]

+--------+----------------+
|VendorID|total_passengers|
+--------+----------------+
|    NULL|            NULL|
|       1|          2434.0|
|       2|          6998.0|
+--------+----------------+



                                                                                

In [18]:
df_sampled.createOrReplaceTempView("trips")
spark.sql("""
    SELECT VendorID, SUM(passenger_count) AS total_passenger_count
    FROM trips
    GROUP BY VendorID
""").show()

[Stage 81:>                                                         (0 + 1) / 1]

+--------+---------------------+
|VendorID|total_passenger_count|
+--------+---------------------+
|    NULL|                 NULL|
|       1|               2434.0|
|       2|               6998.0|
+--------+---------------------+



                                                                                

In [19]:
df_sampled.select("VendorID", col("passenger_count").cast("int").alias("passenger_count")) \
  .groupBy("VendorID") \
  .sum("passenger_count") \
  .withColumnRenamed("sum(passenger_count)", "total_passenger_count") \
  .show()

[Stage 84:>                                                         (0 + 1) / 1]

+--------+---------------------+
|VendorID|total_passenger_count|
+--------+---------------------+
|    NULL|                 NULL|
|       1|                 2434|
|       2|                 6998|
+--------+---------------------+



                                                                                

Параметр spark.sql.autoBroadcastJoinThreshold определяет максимальный размер (в байтах) таблицы, которая при выполнении операции join может быть автоматически «транслярована» (broadcast) на все узлы кластера. То есть если размер одной из таблиц меньше установленного порога, Spark автоматически передаст её на каждый executor, чтобы выполнить join локально, что позволяет избежать затратного перемешивания данных (shuffle).



In [20]:
############################################
# Эксперимент 2. Проверка влияния параметра broadcast
#############################################

# Создаем небольшой справочный DataFrame: например, уникальные типы оплаты
df_payment = df_sampled.select("payment_type").distinct()

thresholds = [-1, 10485760]  # -1: отключение broadcast, 10 МБ – типичное значение

print("\n--- Эксперимент: Влияние параметра spark.sql.autoBroadcastJoinThreshold на join ---")
for thresh in thresholds:
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", thresh)
    print(f"\nЗначение spark.sql.autoBroadcastJoinThreshold = {thresh}")

    start_time = time.time()
    # Выполняем join по столбцу "paymentType"
    df_join = df_sampled.join(df_payment, on="payment_type", how="left")
    join_count = df_join.count()
    elapsed_join = time.time() - start_time
    print(f"Время выполнения join: {elapsed_join:.3f} сек, количество строк: {join_count}")

    print("План выполнения join:")
    df_join.explain(True)

# Завершаем работу Spark
spark.stop()


--- Эксперимент: Влияние параметра spark.sql.autoBroadcastJoinThreshold на join ---

Значение spark.sql.autoBroadcastJoinThreshold = -1


                                                                                

Время выполнения join: 2.350 сек, количество строк: 6295
План выполнения join:
== Parsed Logical Plan ==
'Join UsingJoin(LeftOuter, [payment_type])
:- Sample 0.0, 0.001, false, 42
:  +- Project [VendorID#2092, tpep_pickup_datetime#2128, tpep_dropoff_datetime#2094, passenger_count#2095, trip_distance#2096, RatecodeID#2097, store_and_fwd_flag#2098, PULocationID#2099, DOLocationID#2100, payment_type#2101, fare_amount#2148, extra#2103, mta_tax#2104, cast(tip_amount#2105 as double) AS tip_amount#2167, tolls_amount#2106, improvement_surcharge#2107, total_amount#2108, congestion_surcharge#2109]
:     +- Project [VendorID#2092, tpep_pickup_datetime#2128, tpep_dropoff_datetime#2094, passenger_count#2095, trip_distance#2096, RatecodeID#2097, store_and_fwd_flag#2098, PULocationID#2099, DOLocationID#2100, payment_type#2101, cast(fare_amount#2102 as double) AS fare_amount#2148, extra#2103, mta_tax#2104, tip_amount#2105, tolls_amount#2106, improvement_surcharge#2107, total_amount#2108, congestion_su

                                                                                

Время выполнения join: 2.148 сек, количество строк: 6295
План выполнения join:
== Parsed Logical Plan ==
'Join UsingJoin(LeftOuter, [payment_type])
:- Sample 0.0, 0.001, false, 42
:  +- Project [VendorID#2092, tpep_pickup_datetime#2128, tpep_dropoff_datetime#2094, passenger_count#2095, trip_distance#2096, RatecodeID#2097, store_and_fwd_flag#2098, PULocationID#2099, DOLocationID#2100, payment_type#2101, fare_amount#2148, extra#2103, mta_tax#2104, cast(tip_amount#2105 as double) AS tip_amount#2167, tolls_amount#2106, improvement_surcharge#2107, total_amount#2108, congestion_surcharge#2109]
:     +- Project [VendorID#2092, tpep_pickup_datetime#2128, tpep_dropoff_datetime#2094, passenger_count#2095, trip_distance#2096, RatecodeID#2097, store_and_fwd_flag#2098, PULocationID#2099, DOLocationID#2100, payment_type#2101, cast(fare_amount#2102 as double) AS fare_amount#2148, extra#2103, mta_tax#2104, tip_amount#2105, tolls_amount#2106, improvement_surcharge#2107, total_amount#2108, congestion_su