In [1]:
# Устанавливаем Java и PySpark в Colab
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!pip install pyspark==3.3.2

import os
import sys

# Устанавливаем JAVA_HOME для Colab
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-11-openjdk-amd64'

# Импортируем PySpark
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

# Создаем SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:7 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ Packages [68.9 kB]
Get:8 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [1,338 kB]
Get:9 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Hit:10 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:11 https://r2u.stat.illinois.edu/ubuntu jammy/main amd64 Packages [2,665 kB]
Hit:12 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Get:13 http:/

In [2]:
!pip install pyspark>=3.5

In [3]:
spark.version

'3.3.2'

In [4]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

--2025-03-04 08:55:11--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 52.85.39.97, 52.85.39.153, 52.85.39.117, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|52.85.39.97|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-10.parquet’


2025-03-04 08:55:11 (235 MB/s) - ‘yellow_tripdata_2024-10.parquet’ saved [64346071/64346071]



In [13]:
df = spark.read.parquet('yellow_tripdata_2024-10.parquet')

In [14]:
df = df.repartition(4)

In [15]:
df.write.parquet('yellow/2024/10/', mode='overwrite')

In [16]:
!ls -lh yellow/2024/10/

total 100M
-rw-r--r-- 1 root root 25M Mar  4 08:57 part-00000-6475ccd2-a958-4e86-8605-e5b8c6d2ed16-c000.snappy.parquet
-rw-r--r-- 1 root root 25M Mar  4 08:57 part-00001-6475ccd2-a958-4e86-8605-e5b8c6d2ed16-c000.snappy.parquet
-rw-r--r-- 1 root root 25M Mar  4 08:57 part-00002-6475ccd2-a958-4e86-8605-e5b8c6d2ed16-c000.snappy.parquet
-rw-r--r-- 1 root root 25M Mar  4 08:57 part-00003-6475ccd2-a958-4e86-8605-e5b8c6d2ed16-c000.snappy.parquet
-rw-r--r-- 1 root root   0 Mar  4 08:57 _SUCCESS


In [17]:
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-04 23:19:27|  2024-10-04 23:43:28|              1|         6.07|         1|                 N|         229|         231|           1|       31.0|  1.0|    0.5|       4.

In [18]:
df.createOrReplaceTempView('yellow_2024_10')

In [20]:
spark.sql("""
    SELECT COUNT(*) AS trip_count
    FROM yellow_2024_10
    WHERE CAST(tpep_pickup_datetime AS DATE) = '2024-10-15'
""").show()

+----------+
|trip_count|
+----------+
|    128893|
+----------+



In [21]:
spark.sql("""
SELECT
    MAX((CAST(tpep_dropoff_datetime AS LONG) - CAST(tpep_pickup_datetime AS LONG)) / 3600) AS duration
FROM
    yellow_2024_10;
""").show()

+------------------+
|          duration|
+------------------+
|162.61777777777777|
+------------------+



In [22]:

from pyspark.sql.functions import col, max, unix_timestamp, round


result = spark.table("yellow_2024_10") \
    .select(
        round(
            (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 3600,
            2
        ).alias("duration")
    ) \
    .agg(max("duration").alias("max_duration"))

result.show()


+------------+
|max_duration|
+------------+
|      162.62|
+------------+



In [None]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [None]:
zones_schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True)
])

In [None]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-03-03 15:10:15--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 52.85.39.117, 52.85.39.65, 52.85.39.153, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|52.85.39.117|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_lookup.csv’


2025-03-03 15:10:16 (299 MB/s) - ‘taxi_zone_lookup.csv’ saved [12331/12331]



In [None]:
df_zones = spark.read.option("header", "true").schema(zones_schema).csv('taxi_zone_lookup.csv')

In [None]:
df_zones.createOrReplaceTempView('zones')

In [None]:
spark.sql("""
SELECT
    zones.Zone AS pickup_location_zone,
    COUNT(1)
FROM
    yellow_2024_10 yellow LEFT JOIN zones ON yellow.PULocationID = zones.LocationID
GROUP BY
    1
ORDER BY
    2 ASC;
""").show(10, False)

+---------------------------------------------+--------+
|pickup_location_zone                         |count(1)|
+---------------------------------------------+--------+
|Governor's Island/Ellis Island/Liberty Island|1       |
|Arden Heights                                |2       |
|Rikers Island                                |2       |
|Green-Wood Cemetery                          |3       |
|Jamaica Bay                                  |3       |
|Port Richmond                                |4       |
|Rossville/Woodrow                            |4       |
|West Brighton                                |4       |
|Eltingville/Annadale/Prince's Bay            |4       |
|Charleston/Tottenville                       |4       |
+---------------------------------------------+--------+
only showing top 10 rows



In [None]:
spark.sql(""" SELECT
    z.Zone AS pickup_location_zone,
    COUNT(*) AS ride_count
FROM yellow_2024_10 y
JOIN zones z ON y.PULocationID = z.LocationID
GROUP BY z.Zone
ORDER BY ride_count ASC;
""").show(10, False)

+---------------------------------------------+----------+
|pickup_location_zone                         |ride_count|
+---------------------------------------------+----------+
|Governor's Island/Ellis Island/Liberty Island|1         |
|Arden Heights                                |2         |
|Rikers Island                                |2         |
|Green-Wood Cemetery                          |3         |
|Jamaica Bay                                  |3         |
|Port Richmond                                |4         |
|Rossville/Woodrow                            |4         |
|West Brighton                                |4         |
|Eltingville/Annadale/Prince's Bay            |4         |
|Charleston/Tottenville                       |4         |
+---------------------------------------------+----------+
only showing top 10 rows

