In [1]:
!pip install pyspark



In [5]:
from pyspark import SparkContext, SparkConf

import pyspark.sql as sql
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf, col, max, sum, countDistinct
from typing import NamedTuple
from datetime import datetime
from functools import reduce
import os
# Установите эти переменные с абсолютным путем к исполнимому файлу Python
os.environ['PYSPARK_PYTHON'] = 'C:/Users/belia/AppData/Local/Programs/Python/Python310/python.exe'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/belia/AppData/Local/Programs/Python/Python310/python.exe'

In [6]:
app_name = "LR1_Apache_Spark"
conf = SparkConf().setAppName(app_name).setMaster('local[1]')
sc = SparkContext.getOrCreate()
sc

In [7]:
def initStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str

    for station in stations:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y')
        )

def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str

    for trip in trips:
        try:
            yield Trip(
             trip_id = int(trip[0]),
             duration = int(trip[1]),
             start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
             start_station_name = trip[3],
             start_station_id = int(trip[4]),
             end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
             end_station_name = trip[6],
             end_station_id = trip[7],
             bike_id = int(trip[8]),
             subscription_type = trip[9],
             zip_code = trip[10]
            )
        except:
            pass

In [8]:
trip_data = sc.textFile("trip.csv")
tripsHeader = trip_data.first()
trips = trip_data.filter(lambda row: row != tripsHeader).map(lambda row: row.split(",", -1))
stationData = sc.textFile("station.csv")
stationsHeader = stationData.first()
stations = stationData.filter(lambda row: row != stationsHeader).map(lambda row: row.split(",", -1))

In [9]:
stationsIndexed = stations.keyBy(lambda station: station[0])

In [10]:
stationsIndexed.take(2)

[('2',
  ['2',
   'San Jose Diridon Caltrain Station',
   '37.329732',
   '-121.90178200000001',
   '27',
   'San Jose',
   '8/6/2013']),
 ('3',
  ['3',
   'San Jose Civic Center',
   '37.330698',
   '-121.888979',
   '15',
   'San Jose',
   '8/5/2013'])]

In [11]:
tripsByStartTerminals = trips.keyBy(lambda trip: trip[4])
tripsByEndTerminals = trips.keyBy(lambda trip: trip[7])

In [12]:
tripsByStartTerminals.take(2)

[('66',
  ['4576',
   '63',
   '8/29/2013 14:13',
   'South Van Ness at Market',
   '66',
   '8/29/2013 14:14',
   'South Van Ness at Market',
   '66',
   '520',
   'Subscriber',
   '94127']),
 ('10',
  ['4607',
   '70',
   '8/29/2013 14:42',
   'San Jose City Hall',
   '10',
   '8/29/2013 14:43',
   'San Jose City Hall',
   '10',
   '661',
   'Subscriber',
   '95138'])]

In [13]:
tripsByEndTerminals.take(2)

[('66',
  ['4576',
   '63',
   '8/29/2013 14:13',
   'South Van Ness at Market',
   '66',
   '8/29/2013 14:14',
   'South Van Ness at Market',
   '66',
   '520',
   'Subscriber',
   '94127']),
 ('10',
  ['4607',
   '70',
   '8/29/2013 14:42',
   'San Jose City Hall',
   '10',
   '8/29/2013 14:43',
   'San Jose City Hall',
   '10',
   '661',
   'Subscriber',
   '95138'])]

In [14]:
stations_mapped = stations.mapPartitions(initStation)

In [15]:
trips_mapped= trips.mapPartitions(initTrip)

## Задание 1. Найти велосипед с максимальным временем пробега

In [20]:
bike_durations = trips.map(lambda trip: (trip[8], int(trip[1])))

total_duration_per_bike = bike_durations.reduceByKey(lambda x, y: x + y)

max_duration_bike = total_duration_per_bike.reduce(lambda a, b: a if a[1] > b[1] else b)

print(f"ID велосипеда с максимальным временем пробега: {max_duration_bike[0]}, Время пробега: {max_duration_bike[1]}")

ID велосипеда с максимальным временем пробега: 535, Время пробега: 18611693


## Задание 2. Найти наибольшее геодезическое расстояние между станциями


In [25]:
import math

def haversine(lon1, lat1, lon2, lat2):
    lon1, lat1, lon2, lat2 = map(math.radians, [lon1, lat1, lon2, lat2])
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
    c = 2 * math.asin(math.sqrt(a))
    r = 6371
    return c * r

def calculate_distances(stations):
    import itertools
    pairs = list(itertools.combinations(stations.collect(), 2))
    max_distance = 0
    max_pair = None
    for (id1, lat1, lon1), (id2, lat2, lon2) in pairs:
        dist = haversine(float(lon1), float(lat1), float(lon2), float(lat2))
        if dist > max_distance:
            max_distance = dist
            max_pair = (id1, id2)
    return max_distance, max_pair

stations_rdd = stations.map(lambda x: (x[0], float(x[2]), float(x[3])))  
max_distance, max_pair = calculate_distances(stations_rdd)

print(f"Наибольшее расстояние :{max_distance:.2f} между станциями {max_pair[0]} и {max_pair[1]}")

The maximum distance is 69.92 km between stations 16 and 60


## Задание 3. Найти путь велосипеда с максимальным временем пробега через станции

In [27]:
bike_durations = trips.map(lambda trip: (trip[8], int(trip[1])))  
total_duration_per_bike = bike_durations.reduceByKey(lambda x, y: x + y)

max_duration_bike = total_duration_per_bike.reduce(lambda a, b: a if a[1] > b[1] else b)[0]

bike_trips = trips.filter(lambda trip: trip[8] == max_duration_bike)
longest_trip = bike_trips.reduce(lambda a, b: a if int(a[1]) > int(b[1]) else b)  # Сравниваем по длительности

print(f"Самая длинная поездка велосипеда с ID {max_duration_bike}:")
print(f"ID поездки: {longest_trip[0]}, Длительность: {longest_trip[1]} секунд,")
print(f"От станции: {longest_trip[3]} до станции: {longest_trip[6]}")

Самая длинная поездка велосипеда с ID 535:
ID поездки: 568474, Длительность: 17270400 секунд,
От станции: South Van Ness at Market до станции: 2nd at Folsom


## Задание 4. Найти количество велосипедов в системе.

In [28]:
unique_bikes = trips.map(lambda trip: trip[8]).distinct() 
bike_count = unique_bikes.count()

print(f"Количество уникальных велосипедов в системе: {bike_count}")

Количество уникальных велосипедов в системе: 700


## Задание 5. Найти пользователей потративших на поездки более 3 часов.

In [30]:
user_durations = trips.map(lambda trip: (trip[10], float(trip[1]) / 3600))

total_durations_per_user = user_durations.reduceByKey(lambda x, y: x + y)

users_over_3_hours = total_durations_per_user.filter(lambda user_duration: user_duration[1] > 3)

first_few_users = users_over_3_hours.take(5)  

print("Первые несколько пользователей, потратившие на поездки более 3 часов:")
for user, total_hours in first_few_users:
    print(f"Пользователь: {user}, Всего часов: {total_hours:.2f}")

Первые несколько пользователей, потратившие на поездки более 3 часов:
Пользователь (ZIP-код): 95138, Всего часов: 43.16
Пользователь (ZIP-код): 95060, Всего часов: 210.72
Пользователь (ZIP-код): 94109, Всего часов: 3349.20
Пользователь (ZIP-код): 94061, Всего часов: 847.05
Пользователь (ZIP-код): 94612, Всего часов: 516.89


In [66]:
sc.stop()