# ДОМАШНЕЕ ЗАДАНИЕ 2. Анализ поездок посредством Spark DataFrame API
Выполнено: Ханенко Даниилом ИУ6-54Б

### Наборы данных:
- Поездки (bikeshare): [данные](https://s3.amazonaws.com/tripdata/201902-citibike-tripdata.csv.zip) | [пояснения](https://www.citibikenyc.com/system-data)
- Кварталы NYC: [данные](https://data.cityofnewyork.us/api/geospatial/d3c5-ddgc?method=export&format=GeoJSON)

### Задача 1

- определите для каждой станции количество начала поездок и количество завершения поездок
- сопоставьте станции с кварталами города (zones) и определите суммы количества начала и завершения для каждого квартала
- выведите по убыванию количества поездок и 
- отобразите в виде картограмм (Choropleth). 

In [17]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count
import geopandas as gpd
import pandas as pd
import folium
from shapely.geometry import Point

os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"

spark = SparkSession.builder \
    .appName("NYC_Citibike_Analysis") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

trips_df = spark.read.csv("201902-citibike-tripdata_1.csv", header=True, inferSchema=True)
trips_df = trips_df.withColumn("start station id", col("start station id").cast("integer")) \
                   .withColumn("end station id", col("end station id").cast("integer")) \
                   .withColumn("start station latitude", col("start station latitude").cast("double")) \
                   .withColumn("start station longitude", col("start station longitude").cast("double")) \
                   .withColumn("end station latitude", col("end station latitude").cast("double")) \
                   .withColumn("end station longitude", col("end station longitude").cast("double"))


- определите для каждой станции количество начала поездок и количество завершения поездок

In [18]:
starts = trips_df.groupBy(
    col("start station id").alias("station_id"),
    col("start station latitude").alias("lat"),
    col("start station longitude").alias("lon")
).agg(count("*").alias("start_count"))

ends = trips_df.groupBy(
    col("end station id").alias("station_id")
).agg(count("*").alias("end_count"))

stations_stats = starts.join(ends, on="station_id", how="outer").fillna(0)
stations_stats = stations_stats.withColumn("total_activity", col("start_count") + col("end_count"))


- сопоставьте станции с кварталами города (zones) и определите суммы количества начала и завершения для каждого квартала

In [21]:
stations_pdf = stations_stats.toPandas()
geometry = [Point(xy) for xy in zip(stations_pdf.lon, stations_pdf.lat)]
stations_gdf = gpd.GeoDataFrame(stations_pdf, geometry=geometry)
stations_gdf.set_crs(epsg=4326, inplace=True)

zones_gdf = gpd.read_file("NYC Taxi Zones.geojson").to_crs(epsg=4326)

joined_gdf = gpd.sjoin(stations_gdf, zones_gdf, how="left", predicate="within")

joined_gdf['start_count'] = joined_gdf['start_count'].fillna(0).astype(int)
joined_gdf['end_count'] = joined_gdf['end_count'].fillna(0).astype(int)

zones_stats = joined_gdf.groupby('zone')[['start_count', 'end_count']].sum().reset_index()
zones_stats['total_trips'] = zones_stats['start_count'] + zones_stats['end_count']


  PyArrow >= 4.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


- выведите по убыванию количества поездок

In [22]:
zones_stats_sorted = zones_stats.sort_values(by='total_trips', ascending=False)

print(zones_stats_sorted.head(20))


                            zone  start_count  end_count  total_trips
30                  East Village        43333      43170        86503
26                  East Chelsea        38967      39395        78362
61                   Murray Hill        32572      32578        65150
83                      Union Sq        30527      31464        61991
40       Greenwich Village North        26485      27054        53539
17                  Clinton East        25364      25241        50605
80          TriBeCa/Civic Center        24744      25350        50094
51               Lower East Side        23070      23236        46306
34                      Flatiron        22976      23238        46214
36              Garment District        20281      20334        40615
88     West Chelsea/Hudson Yards        19421      19613        39034
15                  Central Park        19955      18748        38703
90     Williamsburg (North Side)        17692      18195        35887
89                  

- отобразите в виде картограмм (Choropleth)

In [None]:
zones_map_data = zones_gdf.merge(zones_stats, on='zone', how='left').fillna(0)

m = folium.Map(location=[40.73, -73.95], zoom_start=11)

folium.Choropleth(
    geo_data=zones_map_data,
    name='choropleth',
    data=zones_map_data,
    columns=['zone', 'total_trips'],
    key_on='feature.properties.zone',
    fill_color='YlOrRd',
    fill_opacity=0.7,
    line_opacity=0.2,
    legend_name='Total Trips by Neighborhood'
).add_to(m)

folium.GeoJson(
    zones_map_data,
    style_function=lambda x: {'opacity': 0, 'fillOpacity': 0},
    tooltip=folium.GeoJsonTooltip(
        fields=['zone', 'borough', 'start_count', 'end_count', 'total_trips'],
        aliases=['Zone', 'Borough', 'Starts', 'Ends', 'Total'],
        localize=True
    )
).add_to(m)

folium.LayerControl().add_to(m)
m.save("activity.html")
