# Романов Дмитрий ИУ6-54Б
# ДОМАШНЕЕ ЗАДАНИЕ 2. Анализ поездок посредством Spark DataFrame API 

### Набор данных:
- Поездки (bikeshare): [данные](https://s3.amazonaws.com/tripdata/201902-citibike-tripdata.csv.zip) | [пояснения](https://www.citibikenyc.com/system-data)
- Кварталы NYC: [данные](https://data.cityofnewyork.us/api/geospatial/d3c5-ddgc?method=export&format=GeoJSON)

### Задача 1

- определите для каждой станции количество начала поездок и количество завершения поездок
- сопоставьте станции с кварталами города (zones) и определите суммы количества начала и завершения для каждого квартала
- выведите по убыванию количества поездок и 
- отобразите в виде картограмм (Choropleth).  

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, coalesce, lit
import geopandas as gpd
import folium
from folium.plugins import MiniMap, Draw, MousePosition
from shapely.geometry import Point
import webbrowser
import os

In [2]:
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"

In [3]:
current_dir = os.getcwd()
csv_path = f"file://{current_dir}/201902-citibike-tripdata_1.csv"

In [4]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("BikeShareAnalysis") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.hadoop.fs.defaultFS", "file:///") \
    .getOrCreate()

df = spark.read.csv("201902-citibike-tripdata_1.csv", header=True, inferSchema=True)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/10 18:01:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

1. определите для каждой станции количество начала поездок и количество завершения поездок

In [5]:
start = df.groupBy("start station name", "start station latitude", "start station longitude").agg(count("*").alias("start_count"))
end = df.groupBy("end station name", "end station latitude", "end station longitude").agg(count("*").alias("end_count"))

stations = start.join(
    end,
    (col("start station name") == col("end station name")) &
    (col("start station latitude") == col("end station latitude")) &
    (col("start station longitude") == col("end station longitude")),
    how="outer").select(
    coalesce(col("start station name"), col("end station name")).alias("station_name"),
    coalesce(col("start station latitude"), col("end station latitude")).alias("lat"),
    coalesce(col("start station longitude"), col("end station longitude")).alias("lon"),
    coalesce(col("start_count"), lit(0)).alias("start_count"),
    coalesce(col("end_count"), lit(0)).alias("end_count"))

In [6]:
stations_pd = stations.toPandas()
stations_pd['total'] = stations_pd['start_count'] + stations_pd['end_count']
geometry = [Point(xy) for xy in zip(stations_pd.lon, stations_pd.lat)]
stations_df = gpd.GeoDataFrame(stations_pd, geometry=geometry, crs="EPSG:4326")

                                                                                

2. сопоставьте станции с кварталами города (zones) и определите суммы количества начала и завершения для каждого квартала

In [7]:
zones_gdf = gpd.read_file("NYC Taxi Zones.geojson").to_crs(epsg=4326)
stations_zones = gpd.sjoin(stations_df, zones_gdf, how="left", predicate="within")

In [8]:
stations_zones_pd = stations_zones[["station_name", "objectid"]]
stations_with_counts = stations_pd.merge(stations_zones_pd, on="station_name", how="left")

In [9]:
zone_trip = stations_with_counts.groupby("objectid")[["start_count", "end_count", "total"]].sum().reset_index()
zone_trip["objectid"] = zone_trip["objectid"].astype(float)

In [10]:
zones_gdf["objectid"] = zones_gdf["objectid"].astype(float)
zones_gdf = zones_gdf.merge(zone_trip, on="objectid", how="left")
zones_gdf["start_count"] = zones_gdf["start_count"].fillna(0)
zones_gdf["end_count"] = zones_gdf["end_count"].fillna(0)
zones_gdf["total"] = zones_gdf["total"].fillna(0)

3. выведите по убыванию количества поездок и отобразите в виде картограмм (Choropleth).  

In [11]:
m = folium.Map(location=[40.7128, -74.0060],zoom_start=12,tiles='Cartodb positron',prefer_canvas=True)

In [12]:
minimap = MiniMap(toggle_display=True)
m.add_child(minimap)

In [13]:
MousePosition().add_to(m)

zones_copy = zones_gdf.copy()
zones_copy['percentage'] = (zones_copy['total'] / zones_copy['total'].max()) * 100

def get_color(value):
    value = value or 0
    if value > 80:
        return '#8B0000'
    elif value > 60:
        return '#FF0000'
    elif value > 40:
        return '#FF6347'
    elif value > 20:
        return '#FFD700'
    else:
        return '#90EE90'

In [14]:
zone_layer = folium.FeatureGroup(name='Интенсивность по зонам', show=True)

In [15]:
for idx, row in zones_copy.iterrows():
    if row['total'] == 0:
        continue
    
    percentage = row['percentage']
    color = get_color(percentage)

    geo_json_dict = {
    'type': 'Feature',
    'geometry': row.geometry.__geo_interface__,
    'properties': {
        'zone': row['zone'],
        'borough': row['borough'],
        'start_count': int(row['start_count']),
        'end_count': int(row['end_count']),
        'total': int(row['total'])
    }
}
    
    folium.GeoJson(
        geo_json_dict,
        style_function=lambda x, color=color: {
            'fillColor': color,
            'color': 'darkgray',
            'weight': 2,
            'fillOpacity': 0.65
        },
        highlight_function=lambda x: {
            'fillColor': 'yellow',
            'color': 'black',
            'weight': 3,
            'fillOpacity': 0.8
        },
        tooltip=folium.GeoJsonTooltip(
            fields=['zone', 'borough', 'start_count', 'end_count', 'total'],
            aliases=['Зона:', 'Район:', 'Начало:', 'Конец:', 'Всего:'],
            localize=True,
            sticky=False
        ),
        popup=folium.Popup(
            f"<b>{row['zone']}</b><br>"
            f"<i>{row['borough']}</i><br><hr>"
            f"Начало: {int(row['start_count'])}<br>"
            f"Конец: {int(row['end_count'])}<br>"
            f"<b>Всего: {int(row['total'])}</b><br>"
            f"<i>Интенсивность: {percentage:.1f}%</i>",
            max_width=220
        )
    ).add_to(zone_layer)

In [16]:
zone_layer.add_to(m)
stations_layer = folium.FeatureGroup(name='Станции (размер = активность)', show=True)
max_total = stations_pd['total'].max()
stations_pd_copy = stations_pd.copy()
stations_pd_copy['normalized'] = (stations_pd_copy['total'] / max_total) * 12 + 3

In [17]:
for idx, row in stations_pd_copy.iterrows():
    diff = row['start_count'] - row['end_count']
    
    if diff > max(stations_pd['start_count'] - stations_pd['end_count']) * 0.5:
        icon_color = 'green'
    elif diff < min(stations_pd['start_count'] - stations_pd['end_count']) * 0.5:
        icon_color = 'red'
    else:
        icon_color = 'gray'
    
    radius = row['normalized']
    
    popup_html = f"""
    <div style='font-family: Arial; width: 240px;'>
        <h4 style='margin: 5px 0; color: #2c3e50;'>{row['station_name']}</h4>
        <hr style='margin: 5px 0;'>
        <table style='width: 100%; border-collapse: collapse;'>
            <tr style='background-color: #ecf0f1;'>
                <td style='padding: 3px;'><b>Статистика</b></td>
                <td style='padding: 3px; text-align: right;'><b>Число</b></td>
            </tr>
            <tr>
                <td style='padding: 3px;'>Начало</td>
                <td style='padding: 3px; text-align: right; color: green;'><b>{int(row['start_count'])}</b></td>
            </tr>
            <tr style='background-color: #f8f9fa;'>
                <td style='padding: 3px;'>Конец</td>
                <td style='padding: 3px; text-align: right; color: red;'><b>{int(row['end_count'])}</b></td>
            </tr>
            <tr style='background-color: #e8f4f8; font-weight: bold;'>
                <td style='padding: 5px;'>Баланс</td>
                <td style='padding: 5px; text-align: right;'>{int(diff):+d}</td>
            </tr>
            <tr style='background-color: #fff3cd; font-size: 14px;'>
                <td colspan='2' style='padding: 5px; text-align: center;'><b>ИТОГО: {int(row['total'])}</b></td>
            </tr>
        </table>
    </div>
    """
    
    folium.CircleMarker(
        location=[row["lat"], row["lon"]],
        radius=radius,
        fill=True,
        fillColor=icon_color,
        fillOpacity=0.5,
        weight=1.5,
        color='darkgray',
        popup=folium.Popup(popup_html, max_width=280),
        tooltip=f"<b>{row['station_name']}</b> ({int(row['total'])})"
    ).add_to(stations_layer)

stations_layer.add_to(m)
flow_layer = folium.FeatureGroup(name='Основные потоки', show=False)
top_zones = zones_copy.nlargest(5, 'total')

In [18]:
for idx, zone in top_zones.iterrows():
    centroid = zone.geometry.centroid
    
    folium.CircleMarker(
        location=[centroid.y, centroid.x],
        radius=8,
        fill=True,
        fillColor='purple',
        fillOpacity=0.8,
        weight=2,
        color='white',
        popup=f"<b>Главный центр</b><br>{zone['zone']}<br>{int(zone['total'])} поездок"
    ).add_to(flow_layer)

flow_layer.add_to(m)

<folium.map.FeatureGroup at 0x11c0daf00>

In [19]:
legend_html = '''
<div style="position: fixed; 
     bottom: 20px; left: 20px; width: 300px; height: auto; 
     background-color: rgba(255, 255, 255, 0.98); 
     border: 3px solid #34495e; z-index: 9999; 
     font-size: 12px; padding: 15px; border-radius: 10px;
     box-shadow: 0 4px 8px rgba(0,0,0,0.3);">
     
<div style="text-align: center; margin-bottom: 12px;">
    <h3 style="margin: 0; color: #2c3e50; font-size: 16px;">Citibike NYC</h3>
    <p style="margin: 3px 0; color: #7f8c8d; font-size: 10px;">Февраль 2019</p>
</div>

<hr style="margin: 10px 0; border: none; border-top: 2px solid #bdc3c7;">

<p style="margin: 8px 0; font-weight: bold; color: #2c3e50;">Интенсивность поездок:</p>
<div style="display: grid; grid-template-columns: 20px 1fr; gap: 8px; margin-bottom: 10px;">
    <div style="width: 20px; height: 20px; background: #8B0000; border: 1px solid #333;"></div>
    <span>Очень высокая (80%+)</span>
    
    <div style="width: 20px; height: 20px; background: #FF0000; border: 1px solid #333;"></div>
    <span>Высокая (60-80%)</span>
    
    <div style="width: 20px; height: 20px; background: #FF6347; border: 1px solid #333;"></div>
    <span>Средняя (40-60%)</span>
    
    <div style="width: 20px; height: 20px; background: #FFD700; border: 1px solid #333;"></div>
    <span>Низкая (20-40%)</span>
    
    <div style="width: 20px; height: 20px; background: #90EE90; border: 1px solid #333;"></div>
    <span>Очень низкая (<20%)</span>
</div>

<hr style="margin: 10px 0; border: none; border-top: 2px solid #bdc3c7;">

<p style="margin: 8px 0; font-weight: bold; color: #2c3e50;">Станции:</p>
<p style="margin: 3px 0;"><span style="color: green; font-size: 18px;">●</span> Больше начало</p>
<p style="margin: 3px 0;"><span style="color: red; font-size: 18px;">●</span> Больше конец</p>
<p style="margin: 3px 0; font-size: 11px; color: #7f8c8d;">Размер = общая активность</p>

<hr style="margin: 10px 0; border: none; border-top: 1px solid #bdc3c7;">
<p style="margin: 5px 0; font-size: 10px; color: #95a5a6; text-align: center;"></p>
</div>
'''

In [20]:
m.get_root().html.add_child(folium.Element(legend_html))
folium.LayerControl(position='topright', collapsed=False).add_to(m)

map_file = "bikeshare.html"
m.save(map_file)
webbrowser.open('file://' + os.path.abspath(map_file))

True

In [21]:
spark.stop()