# ДОМАШНЕЕ ЗАДАНИЕ 2. Анализ поездок посредством Spark DataFrame API

**Дисциплина:** Методы обработки больших данных  
**Студент:** Попов Я.Ю. 
**Группа:** ИУ6-31М

### Задание 1:

- определите для каждой станции количество начала поездок и количество завершения поездок
- сопоставьте станции с кварталами города (zones) и определите суммы количества начала и завершения для каждого квартала
- выведите по убыванию количества поездок и
- отобразите в виде картограмм (Choropleth).


In [1]:
!pip install shapely folium geopandas



In [2]:
!pip install pygeos rtree



## Инициализация Spark и загрузка данных

In [1]:
# Импорт необходимых библиотек
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, desc, when, coalesce
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, LongType

# Инициализация SparkSession
spark = SparkSession.builder \
    .appName("CitiBike Analysis") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()

# Определение схемы для данных о поездках
schema = StructType([
    StructField("tripduration", LongType(), True),
    StructField("starttime", StringType(), True),
    StructField("stoptime", StringType(), True),
    StructField("start_station_id", IntegerType(), True),
    StructField("start_station_name", StringType(), True),
    StructField("start_station_latitude", DoubleType(), True),
    StructField("start_station_longitude", DoubleType(), True),
    StructField("end_station_id", IntegerType(), True),
    StructField("end_station_name", StringType(), True),
    StructField("end_station_latitude", DoubleType(), True),
    StructField("end_station_longitude", DoubleType(), True),
    StructField("bikeid", IntegerType(), True),
    StructField("usertype", StringType(), True),
    StructField("birth_year", IntegerType(), True),
    StructField("gender", IntegerType(), True)
])

# Загрузка данных
trips_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "false") \
    .schema(schema) \
    .csv("JC-201902-citibike-tripdata.csv")

# Кеширование данных для оптимизации производительности
trips_df.cache()



DataFrame[tripduration: bigint, starttime: string, stoptime: string, start_station_id: int, start_station_name: string, start_station_latitude: double, start_station_longitude: double, end_station_id: int, end_station_name: string, end_station_latitude: double, end_station_longitude: double, bikeid: int, usertype: string, birth_year: int, gender: int]

## 1. Анализ количества поездок по станциям

In [2]:
# Анализ станций начала поездок
start_stations = trips_df.groupBy(
    "start_station_id", 
    "start_station_name", 
    "start_station_latitude", 
    "start_station_longitude"
).agg(
    count("*").alias("num_starts")
).withColumnRenamed("start_station_id", "station_id") \
 .withColumnRenamed("start_station_name", "station_name") \
 .withColumnRenamed("start_station_latitude", "station_latitude") \
 .withColumnRenamed("start_station_longitude", "station_longitude")

# Анализ станций завершения поездок
end_stations = trips_df.groupBy(
    "end_station_id", 
    "end_station_name", 
    "end_station_latitude", 
    "end_station_longitude"
).agg(
    count("*").alias("num_ends")
).withColumnRenamed("end_station_id", "station_id") \
 .withColumnRenamed("end_station_name", "station_name") \
 .withColumnRenamed("end_station_latitude", "station_latitude") \
 .withColumnRenamed("end_station_longitude", "station_longitude")

# Объединение данных о станциях
stations = start_stations.join(
    end_stations,
    ["station_id", "station_name", "station_latitude", "station_longitude"],
    "full_outer"
).fillna(0)

# Отображение результатов для всех уникальных станций
print("Количество станций в наборе данных:", stations.count())
print("Статистика по всем станциям:")
stations.select("station_name", "num_starts", "num_ends").orderBy(desc("num_starts")).show(100, truncate=False)

Количество станций в наборе данных: 52
Статистика по всем станциям:
+-----------------------+----------+--------+
|station_name           |num_starts|num_ends|
+-----------------------+----------+--------+
|Grove St PATH          |2447      |2774    |
|Sip Ave                |1082      |1020    |
|Hamilton Park          |957       |984     |
|Newport PATH           |760       |737     |
|Exchange Place         |757       |904     |
|Harborside             |676       |696     |
|Brunswick St           |512       |452     |
|Newark Ave             |502       |544     |
|City Hall              |497       |525     |
|Marin Light Rail       |491       |513     |
|Jersey & 6th St        |478       |362     |
|Jersey & 3rd           |473       |454     |
|Brunswick & 6th        |452       |400     |
|Van Vorst Park         |441       |374     |
|Columbus Drive         |433       |428     |
|Morris Canal           |431       |356     |
|Journal Square         |406       |371     |
|Paulus Hook

## 2. Сопоставление станций с кварталами города

In [29]:
import geopandas as gpd
from shapely.geometry import Point, Polygon, MultiPolygon
from shapely.ops import unary_union
import pandas as pd
import numpy as np

# Загрузка GeoJSON с помощью geopandas
zones_gdf = gpd.read_file("taxi_zones.json")

zones_gdf

Unnamed: 0,OBJECTID,Shape_Leng,Shape__Area,Shape__Length,geometry
0,1,23141.260616,2290477000.0,1001843.0,"MULTIPOLYGON (((-73.93384 40.77803, -73.93382 ..."


## Функция для разделения MultiPolygon на отдельные Polygon

In [30]:
def separate_multipolygons(gdf):
    rows = []
    current_id = 1
    for idx, row in gdf.iterrows():
        if row.geometry.geom_type == 'MultiPolygon':
            # Разбиваем MultiPolygon на отдельные Polygon
            for poly in row.geometry.geoms:
                new_row = row.copy()
                new_row.geometry = poly
                new_row['OBJECTID'] = current_id
                rows.append(new_row)
                current_id += 1
        else:
            # Если это уже Polygon, оставляем как есть
            rows.append(row)
    return gpd.GeoDataFrame(rows, crs=gdf.crs)

zones_sep = separate_multipolygons(zones_gdf)

print(f"Исходное количество зон: {len(zones_gdf)}")
print(f"Количество зон после разделения MultiPolygon: {len(zones_sep)}")
print("Пример структуры данных после разделения:")
print(zones_sep)

Исходное количество зон: 1
Количество зон после разделения MultiPolygon: 13
Пример структуры данных после разделения:
   OBJECTID    Shape_Leng   Shape__Area  Shape__Length  \
0         1  23141.260616  2.290477e+09   1.001843e+06   
0         2  23141.260616  2.290477e+09   1.001843e+06   
0         3  23141.260616  2.290477e+09   1.001843e+06   
0         4  23141.260616  2.290477e+09   1.001843e+06   
0         5  23141.260616  2.290477e+09   1.001843e+06   
0         6  23141.260616  2.290477e+09   1.001843e+06   
0         7  23141.260616  2.290477e+09   1.001843e+06   
0         8  23141.260616  2.290477e+09   1.001843e+06   
0         9  23141.260616  2.290477e+09   1.001843e+06   
0        10  23141.260616  2.290477e+09   1.001843e+06   
0        11  23141.260616  2.290477e+09   1.001843e+06   
0        12  23141.260616  2.290477e+09   1.001843e+06   
0        13  23141.260616  2.290477e+09   1.001843e+06   

                                            geometry  
0  POLYGON ((-

In [32]:
# Преобразование Spark DataFrame в Pandas DataFrame для работы с geopandas
stations_pd = stations.toPandas()
    
geometry = [Point(xy) for xy in zip(stations_pd['station_longitude'], stations_pd['station_latitude'])]
stations_gdf = gpd.GeoDataFrame(stations_pd, geometry=geometry, crs="EPSG:4326")

# Проверка CRS зон и приведение к единой системе координат при необходимости
if zones_sep.crs != stations_gdf.crs:
    zones_sep = zones_sep.to_crs(stations_gdf.crs)

    
# Пространственное соединение: определение, в какой зоне находится каждая станция
stations_with_zones = gpd.sjoin(stations_gdf, zones_sep[['geometry', 'OBJECTID']], how='left', predicate='within')

print(stations_with_zones) 

print(f"Количество станций, для которых найдены соответствующие кварталы: {len(stations_with_zones)}")
print(f"Количество уникальных кварталов с станциями: {stations_with_zones['OBJECTID'].nunique()}")
      
# Сопоставление станций с кварталами и подсчет сумм поездок
zone_trips = stations_with_zones.groupby('OBJECTID').agg(
    total_starts=('num_starts', 'sum'),
    total_ends=('num_ends', 'sum')
).reset_index()

# Добавляем общее количество поездок
zone_trips['total_trips'] = zone_trips['total_starts'] + zone_trips['total_ends']

# Сортируем
zone_trips_sorted = zone_trips.sort_values(by='total_trips', ascending=False)

print("\nКоличество кварталов с поездками:", len(zone_trips))
print("\nТоп-10 кварталов по количеству поездок:")
print(zone_trips_sorted.head(10))

    station_id             station_name  station_latitude  station_longitude  \
0         3183           Exchange Place         40.716247         -74.033459   
1         3184              Paulus Hook         40.714145         -74.033552   
2         3185                City Hall         40.717732         -74.043845   
3         3186            Grove St PATH         40.719586         -74.043117   
4         3187                Warren St         40.721124         -74.038051   
5         3191                 Union St         40.718211         -74.083639   
6         3192       Liberty Light Rail         40.711242         -74.055701   
7         3193             Lincoln Park         40.724605         -74.078406   
8         3194          McGinley Square         40.725340         -74.067622   
9         3195                  Sip Ave         40.730743         -74.063784   
10        3196           Riverview Park         40.744319         -74.043991   
11        3198         Heights Elevator 