# Домашнее задание № 2. 
ФИО: Алёхин Арсений Михайлович

Группа: ИУ6-54Б

### Наборы данных:
- Поездки (bikeshare): [данные](https://s3.amazonaws.com/tripdata/201902-citibike-tripdata.csv.zip) | [пояснения](https://www.citibikenyc.com/system-data)
- Кварталы NYC: [данные](https://data.cityofnewyork.us/api/geospatial/d3c5-ddgc?method=export&format=GeoJSON)

### Задание:

- определите для каждой станции количество начала поездок и количество завершения поездок
- сопоставьте станции с кварталами города (zones) и определите суммы количества начала и завершения для каждого квартала
- выведите по убыванию количества поездок и 
- отобразите в виде картограмм (Choropleth). 

### Импорты и инициализация

In [57]:
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, DoubleType
from shapely.geometry import shape, Point
import json
import pandas as pd
import folium
import geopandas as gpd
from datetime import datetime

os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
spark = SparkSession.builder \
    .appName("NYC_Citibike_Analysis") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.driver.host", "127.0.0.1") \
    .getOrCreate()

sc = spark.sparkContext

### Загрузка данных о поездках

In [58]:
df = spark.read.csv("201902-citibike-tripdata.csv", header=True, inferSchema=True)
print(f"Всего поездок: {df.count()}")
print(f"Столбцы: {df.columns}")
df.cache()


Всего поездок: 18565
Столбцы: ['tripduration', 'starttime', 'stoptime', 'start station id', 'start station name', 'start station latitude', 'start station longitude', 'end station id', 'end station name', 'end station latitude', 'end station longitude', 'bikeid', 'usertype', 'birth year', 'gender']


DataFrame[tripduration: int, starttime: timestamp, stoptime: timestamp, start station id: int, start station name: string, start station latitude: double, start station longitude: double, end station id: int, end station name: string, end station latitude: double, end station longitude: double, bikeid: int, usertype: string, birth year: int, gender: int]

### Подсчет поездок по станциям


In [64]:

# Подсчитываем начало поездок со всеми координатами
counts_start = (
    df
    .groupBy('start station id', 'start station name', 
             'start station latitude', 'start station longitude')
    .agg(F.count('*').alias('start_count'))
    .withColumnRenamed('start station id', 'station_id')
    .withColumnRenamed('start station name', 'station_name')
    .withColumnRenamed('start station latitude', 'latitude')
    .withColumnRenamed('start station longitude', 'longitude')
)

# Подсчитываем завершение поездок
counts_end = (
    df
    .groupBy('end station id')
    .agg(F.count('*').alias('end_count'))
    .withColumnRenamed('end station id', 'station_id')
)

# Объединяем
joined = (
    counts_start
    .join(counts_end, 'station_id', 'full_outer')
    .fillna(0)
)

print(f"Всего уникальных станций: {joined.count()}")
print("\nПримеры данных:")
joined.show(5, truncate=False)


joined.cache()


Всего уникальных станций: 52

Примеры данных:
+----------+--------------+-----------------+------------------+-----------+---------+
|station_id|station_name  |latitude         |longitude         |start_count|end_count|
+----------+--------------+-----------------+------------------+-----------+---------+
|3183      |Exchange Place|40.7162469       |-74.0334588       |757        |904      |
|3184      |Paulus Hook   |40.7141454       |-74.0335519       |405        |439      |
|3185      |City Hall     |40.7177325       |-74.043845        |497        |525      |
|3186      |Grove St PATH |40.71958611647166|-74.04311746358871|2447       |2774     |
|3187      |Warren St     |40.7211236       |-74.03805095      |360        |356      |
+----------+--------------+-----------------+------------------+-----------+---------+
only showing top 5 rows



26/01/08 17:55:16 WARN CacheManager: Asked to cache already cached data.


DataFrame[station_id: int, station_name: string, latitude: double, longitude: double, start_count: bigint, end_count: bigint]

### GeoPandas сопоставление с зонами

In [60]:
# Конвертируем Spark DataFrame в Pandas
stations_pdf = joined.toPandas()

print(f"Всего станций для обработки: {len(stations_pdf)}")
print(f"\nДиапазон координат станций:")
print(f"  Latitude:  {stations_pdf['latitude'].min():.4f} - {stations_pdf['latitude'].max():.4f}")
print(f"  Longitude: {stations_pdf['longitude'].min():.4f} - {stations_pdf['longitude'].max():.4f}")

# Создаем GeoDataFrame со станциями
geometry = [Point(xy) for xy in zip(stations_pdf.longitude, stations_pdf.latitude)]
stations_gdf = gpd.GeoDataFrame(stations_pdf, geometry=geometry)
stations_gdf.set_crs(epsg=4326, inplace=True)

print(f"GeoDataFrame создан с CRS: {stations_gdf.crs}\n")

# Загружаем зоны NYC
zones_gdf = gpd.read_file("NYC Taxi Zones.geojson").to_crs(epsg=4326)

print(f"Загружено {len(zones_gdf)} зон из NYC Taxi Zones.geojson")
print(f"Границы зон NYC:")
print(f"  Latitude:  {zones_gdf.geometry.bounds.miny.min():.4f} - {zones_gdf.geometry.bounds.maxy.max():.4f}")
print(f"  Longitude: {zones_gdf.geometry.bounds.minx.min():.4f} - {zones_gdf.geometry.bounds.maxx.max():.4f}\n")

# Пространственный join - ищем в каких зонах находятся станции
joined_gdf = gpd.sjoin(stations_gdf, zones_gdf, how="left", predicate="within")

# Проверяем результаты
found_zones = joined_gdf['zone'].notna().sum()
not_found = joined_gdf['zone'].isna().sum()

# Если все станции вне зон - используем ближайшие зоны
if found_zones == 0:
    print("Все станции в Jersey City - применяю метод ближайших зон\n")
    
    nearest_zones = []
    for idx, station in stations_gdf.iterrows():
        distances = zones_gdf.geometry.distance(station.geometry)
        nearest_idx = distances.idxmin()
        nearest_zones.append(zones_gdf.loc[nearest_idx, 'zone'])
    
    joined_gdf['zone'] = nearest_zones
    found_zones = len([z for z in nearest_zones if z is not None])

stations_gdf = None  # Очищаем память
print("Примеры найденных сопоставлений:")
print(joined_gdf[['station_name', 'zone', 'start_count', 'end_count']].head(15))
print()


  PyArrow >= 4.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


Всего станций для обработки: 52

Диапазон координат станций:
  Latitude:  0.0000 - 40.7487
  Longitude: -74.0836 - 0.0000
GeoDataFrame создан с CRS: EPSG:4326

Загружено 263 зон из NYC Taxi Zones.geojson
Границы зон NYC:
  Latitude:  40.4961 - 40.9155
  Longitude: -74.2556 - -73.7000

Все станции в Jersey City - применяю метод ближайших зон

Примеры найденных сопоставлений:
          station_name                                           zone  \
0       Exchange Place                              Battery Park City   
1          Paulus Hook  Governor's Island/Ellis Island/Liberty Island   
2            City Hall  Governor's Island/Ellis Island/Liberty Island   
3        Grove St PATH  Governor's Island/Ellis Island/Liberty Island   
4            Warren St  Governor's Island/Ellis Island/Liberty Island   
5             Union St  Governor's Island/Ellis Island/Liberty Island   
6   Liberty Light Rail  Governor's Island/Ellis Island/Liberty Island   
7         Lincoln Park  Governor's Isla


  distances = zones_gdf.geometry.distance(station.geometry)

  distances = zones_gdf.geometry.distance(station.geometry)

  distances = zones_gdf.geometry.distance(station.geometry)

  distances = zones_gdf.geometry.distance(station.geometry)

  distances = zones_gdf.geometry.distance(station.geometry)

  distances = zones_gdf.geometry.distance(station.geometry)

  distances = zones_gdf.geometry.distance(station.geometry)

  distances = zones_gdf.geometry.distance(station.geometry)

  distances = zones_gdf.geometry.distance(station.geometry)

  distances = zones_gdf.geometry.distance(station.geometry)

  distances = zones_gdf.geometry.distance(station.geometry)

  distances = zones_gdf.geometry.distance(station.geometry)

  distances = zones_gdf.geometry.distance(station.geometry)

  distances = zones_gdf.geometry.distance(station.geometry)

  distances = zones_gdf.geometry.distance(station.geometry)

  distances = zones_gdf.geometry.distance(station.geometry)

  distances = zones_gdf

### Суммируем по зонам и сортируем

In [65]:
# Заполняем NaN значения нулями (для станций вне зон)
joined_gdf['start_count'] = joined_gdf['start_count'].fillna(0).astype(int)
joined_gdf['end_count'] = joined_gdf['end_count'].fillna(0).astype(int)

# Группируем по zones и суммируем поездки
zones_stats = joined_gdf.groupby('zone')[['start_count', 'end_count']].sum().reset_index()
zones_stats['total_trips'] = zones_stats['start_count'] + zones_stats['end_count']

# Сортируем по убыванию количества поездок
zones_stats = zones_stats.sort_values('total_trips', ascending=False)

print(f"Всего уникальных зон с поездками: {len(zones_stats)}\n")

print(zones_stats.head(20).to_string(index=False))


Всего уникальных зон с поездками: 6

                                         zone  start_count  end_count  total_trips
Governor's Island/Ellis Island/Liberty Island        15108      14946        30054
                            Battery Park City         1755       1947         3702
                                    Hudson Sq         1128       1163         2291
                Meatpacking/West Village West          472        417          889
                    West Chelsea/Hudson Yards          102         91          193
                                 Far Rockaway            0          1            1


### Подготовка данных для карты

In [67]:
# Мержим данные о поездках со свойствами зон для карты
zones_map_data = zones_gdf.merge(zones_stats, on='zone', how='left').fillna(0)

print(f"Данные для карты содержат {len(zones_map_data)} зон")
print(f"Зоны с поездками: {len(zones_stats)}\n")

# Проверяем статистику
print("Статистика по поездкам:")
total_all = zones_map_data['total_trips'].sum()
print(f"  Минимум поездок в зоне: {zones_map_data['total_trips'].min():.0f}")
print(f"  Максимум поездок в зоне: {zones_map_data['total_trips'].max():.0f}")
print(f"  Среднее поездок в зоне: {zones_map_data['total_trips'].mean():.0f}")
print(f"  Всего поездок: {total_all:.0f}\n")


Данные для карты содержат 263 зон
Зоны с поездками: 6

Статистика по поездкам:
  Минимум поездок в зоне: 0
  Максимум поездок в зоне: 30054
  Среднее поездок в зоне: 370
  Всего поездок: 97238



### Choropleth карта

In [68]:
# Центр карты (центр NYC)
map_center = [40.73, -73.95]

# Создаем базовую карту
m = folium.Map(location=map_center, zoom_start=11, tiles="cartodbpositron")

# Добавляем Choropleth слой
folium.Choropleth(
    geo_data=zones_map_data,
    name='choropleth',
    data=zones_map_data,
    columns=['zone', 'total_trips'],
    key_on='feature.properties.zone',
    fill_color='YlOrRd',
    fill_opacity=0.7,
    line_opacity=0.2,
    legend_name='Общее количество поездок'
).add_to(m)

# Добавляем интерактивные подсказки
folium.GeoJson(
    zones_map_data,
    style_function=lambda x: {'opacity': 0, 'fillOpacity': 0},
    tooltip=folium.GeoJsonTooltip(
        fields=['zone', 'borough', 'start_count', 'end_count', 'total_trips'],
        aliases=['Квартал', 'Округ', 'Начало', 'Завершение', 'Всего'],
        localize=True
    )
).add_to(m)

# Добавляем контроль слоев
folium.LayerControl().add_to(m)

# Сохраняем карту
m.save("task1_choropleth_map.html")
print("  Откройте файл в браузере для просмотра интерактивной карты\n")

  Откройте файл в браузере для просмотра интерактивной карты



In [69]:
spark.stop()