In [1]:
!pip install pyspark
import os
import sys
from pyspark.sql import SparkSession
from typing import NamedTuple
from datetime import datetime

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder.getOrCreate()

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=9a8e12c0a1bcdf9347e2619339ffef25028a33c4938530eaaa8522cee4d3ce68
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark import SparkContext, SparkConf
app_name = "lr1"
spark.stop()
conf = SparkConf().setAppName(app_name).setMaster('local[1]')
sc = SparkContext(conf=conf)
sc

In [None]:
sc.stop()
spark.stop()

In [3]:
def initStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str

    for station in stations:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y')
        )

In [4]:
def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str

    for trip in trips:
        try:
            yield Trip(
             trip_id = int(trip[0]),
             duration = int(trip[1]),
             start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
             start_station_name = trip[3],
             start_station_id = int(trip[4]),
             end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
             end_station_name = trip[6],
             end_station_id = trip[7],
             bike_id = int(trip[8]),
             subscription_type = trip[9],
             zip_code = trip[10]
            )
        except:
            pass

In [10]:
tripData = sc.textFile("data/trips.csv")

headerTrip = tripData.first()
trips = tripData.filter(lambda row: row != headerTrip).map(lambda row: row.split(",", -1))

print(trips.first())

trips_mapper = trips.mapPartitions(initTrip)


stationData = sc.textFile("data/stations.csv")

headerStation = stationData.first()
stations = stationData.filter(lambda row: row != headerStation).map(lambda row: row.split(",", -1))

print(stations.first())

stations_mapped = stations.mapPartitions(initStation)

stations_mapped.first()

['4576', '63', '', 'South Van Ness at Market', '66', '8/29/2013 14:14', 'South Van Ness at Market', '66', '520', 'Subscriber', '94127']
['2', 'San Jose Diridon Caltrain Station', '37.329732', '-121.90178200000001', '27', 'San Jose', '8/6/2013']


Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dockcount=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0))

1.	Найти велосипед с максимальным временем пробега.

In [11]:
# Использование reduce для нахождения поездки с максимальной длительностью
max_duration_trip = trips_mapper.reduce(lambda trip1, trip2: trip1 if trip1.duration > trip2.duration else trip2)

max_duration_trip.bike_id

535

2.	Найти наибольшее геодезическое расстояние между станциями.

In [14]:
from math import radians, cos, sin, asin, sqrt
from typing import Tuple

def haversine(lon1: float, lat1: float, lon2: float, lat2: float) -> float:
    """
    Вычислить геодезическое расстояние между двумя точками на земле, заданными в градусах.
    """
    # Конвертировать десятичные градусы в радианы
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # Формула гаверсинуса
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
    c = 2 * asin(sqrt(a))
    r = 6371  # Радиус Земли в километрах
    return c * r

In [15]:

stations_cartesian = stations_mapped.cartesian(stations_mapped)

def calculate_distance(record: Tuple) -> Tuple[int, int, float]:
    """
    Вычислить расстояние между двумя станциями.
    """
    station1, station2 = record
    distance = haversine(station1.long, station1.lat, station2.long, station2.lat)
    return (station1.station_id, station2.station_id, distance)

distances = stations_cartesian.map(calculate_distance)

max_distance = distances.max(key=lambda x: x[2])
print(f"Максимальное расстояние между станциями {max_distance[0]} и {max_distance[1]}: {max_distance[2]:.2f} км")

Максимальное расстояние между станциями 16 и 60: 69.92 км


3. Найти путь велосипеда с максимальным временем пробега через станции.

In [16]:
# Суммирование длительности поездок для каждого велосипеда и нахождение максимума
max_duration_bike_id, _ = trips_mapper \
    .map(lambda trip: (trip.bike_id, trip.duration)) \
    .reduceByKey(lambda a, b: a + b) \
    .max(key=lambda x: x[1])

# Фильтруем поездки для велосипеда с максимальной длительностью и сортируем по дате начала
max_bike_trips_sorted = trips_mapper \
    .filter(lambda trip: trip.bike_id == max_duration_bike_id) \
    .sortBy(lambda trip: trip.start_date)

# Выводим первую поездку для демонстрации
first_trip = max_bike_trips_sorted.first()
print(f"{first_trip.bike_id} выехал из {first_trip.start_station_name} дата {first_trip.start_date} в {first_trip.end_station_name}")

535 выехал из Post at Kearney дата 2013-08-29 19:32:00 в San Francisco Caltrain (Townsend at 4th)


4.	Найти количество велосипедов в системе.

In [17]:
# Извлекаем bike_id из каждой поездки
bike_ids = trips_mapper.map(lambda trip: trip.bike_id)

# Получаем уникальные bike_id
unique_bike_ids = bike_ids.distinct()

# Подсчитываем количество уникальных велосипедов
bike_count = unique_bike_ids.count()

print(bike_count)

700


5. Найти пользователей потративших на поездки более 3 часов

In [20]:
# Группировка данных по zip_code и вычисление максимальной продолжительности поездки в секундах
# Переименование столбца с максимальной продолжительностью в 'duration'
long_trips = trips_mapper.filter(lambda x: x.duration > (3 * 60 * 60))

unique_zip_codes = (
    long_trips
    .map(lambda trip: trip.zip_code)
    .filter(lambda zip_code: zip_code != "")
    .distinct()
)

unique_zip_codes.take(5)

['58553', '94301', '94039', '94133', '93726']