In [None]:
!pip3 install pyspark==3.0.0

In [1]:
import os
import sys
from pyspark.sql import SparkSession
from typing import NamedTuple
from datetime import datetime
from functools import reduce

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder.getOrCreate()

24/03/28 17:25:19 WARN Utils: Your hostname, dmitriy-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.71.128 instead (on interface ens33)
24/03/28 17:25:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/03/28 17:25:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
from pyspark import SparkContext, SparkConf
app_name = "Lab1"
spark.stop()
conf = SparkConf().setAppName(app_name).setMaster('local[1]')
sc = SparkContext(conf=conf)
sc

In [3]:
def initStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str
    
    for station in stations:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y')
        )
        
def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str
        
    for trip in trips:
        yield Trip(                             
             trip_id = int(trip[0]),
             duration = int(trip[1]),
             start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
             start_station_name = trip[3],
             start_station_id = int(trip[4]),
             end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
             end_station_name = trip[6],
             end_station_id = trip[7],
             bike_id = int(trip[8]),
             subscription_type = trip[9],
             zip_code = trip[10]
        )


In [4]:
tripData = sc.textFile("trip.csv")
stationData = sc.textFile("station.csv")

In [5]:
tripData.first()

'id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code'

In [6]:
stationData.first()

'id,name,lat,long,dock_count,city,installation_date'

In [7]:
header = stationData.first()
station_data_filter = stationData.filter(lambda line: line != header)
station_data_map = station_data_filter.map(lambda line: line.split(','))
print(station_data_map.take(5))

[['2', 'San Jose Diridon Caltrain Station', '37.329732', '-121.90178200000001', '27', 'San Jose', '8/6/2013'], ['3', 'San Jose Civic Center', '37.330698', '-121.888979', '15', 'San Jose', '8/5/2013'], ['4', 'Santa Clara at Almaden', '37.333988', '-121.894902', '11', 'San Jose', '8/6/2013'], ['5', 'Adobe on Almaden', '37.331415', '-121.8932', '19', 'San Jose', '8/5/2013'], ['6', 'San Pedro Square', '37.336721000000004', '-121.894074', '15', 'San Jose', '8/7/2013']]


In [8]:
stations_mapped = station_data_map.mapPartitions(initStation)
stations_mapped.first()

Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dockcount=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0))

In [9]:
header = tripData.first()
trips_data_filter = tripData.filter(lambda line: line != header)
trips_data_map = trips_data_filter.map(lambda line: line.split(','))
print(trips_data_map.take(5))

[['4576', '63', '8/29/2013 14:13', 'South Van Ness at Market', '66', '8/29/2013 14:14', 'South Van Ness at Market', '66', '520', 'Subscriber', '94127'], ['4607', '70', '8/29/2013 14:42', 'San Jose City Hall', '10', '8/29/2013 14:43', 'San Jose City Hall', '10', '661', 'Subscriber', '95138'], ['4130', '71', '8/29/2013 10:16', 'Mountain View City Hall', '27', '8/29/2013 10:17', 'Mountain View City Hall', '27', '48', 'Subscriber', '97214'], ['4251', '77', '8/29/2013 11:29', 'San Jose City Hall', '10', '8/29/2013 11:30', 'San Jose City Hall', '10', '26', 'Subscriber', '95060'], ['4299', '83', '8/29/2013 12:02', 'South Van Ness at Market', '66', '8/29/2013 12:04', 'Market at 10th', '67', '319', 'Subscriber', '94103']]


In [10]:
trips_mapped = trips_data_map.mapPartitions(initTrip)

In [11]:
trips_mapped.first()

Trip(trip_id=4576, duration=63, start_date=datetime.datetime(2013, 8, 29, 14, 13), start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 14, 14), end_station_name='South Van Ness at Market', end_station_id='66', bike_id=520, subscription_type='Subscriber', zip_code='94127')

## 1. Найти велосипед с максимальным временем пробега.

In [12]:
max_duration_trip = trips_mapped.sortBy(lambda trip: trip.duration, ascending=False).first()
print("Trip with max duration:", max_duration_trip.bike_id)

[Stage 11:>                                                         (0 + 1) / 1]

Trip with max duration: 535


                                                                                

## 2. Найти наибольшее геодезическое расстояние между станциями.

In [13]:
trips_stations = trips_mapped.filter(lambda trip: str(trip.start_station_id) != str(trip.end_station_id))

grouped_trips = trips_stations.keyBy(lambda trip: (trip.start_station_id, trip.end_station_id)).groupByKey()

average_trips = grouped_trips.mapValues(lambda trips: sum(trip.duration for trip in trips) / len(trips))

result = average_trips.map(lambda x: (x[1], x[0])).sortByKey(False).take(5)

print(result)



[(229914.0, (26, '16')), (179212.5, (32, '63')), (169308.0, (80, '36')), (156461.03603603604, (66, '62')), (101207.5, (28, '2'))]


                                                                                

## 3. Найти путь велосипеда с максимальным временем пробега через станции.

In [14]:
filtered_trips = trips_mapped.filter(lambda trip: trip.bike_id == max_duration_trip.bike_id)

sorted_trips = filtered_trips.sortBy(lambda trip: trip.start_date)

transformed_trips = sorted_trips.map(lambda trip: (trip.start_station_name, trip.end_station_name))

first_trip_path = transformed_trips.first()

first_trip_path

                                                                                

('Post at Kearney', 'San Francisco Caltrain (Townsend at 4th)')

## 4. Найти количество велосипедов в системе.

In [15]:
bike_ids_rdd = trips_mapped.map(lambda trip: trip.bike_id)

unique_bike_ids = bike_ids_rdd.distinct()

count_bikes = unique_bike_ids.count()

count_bikes

                                                                                

700

## 5. Найти пользователей потративших на поездки более 3 часов.

In [16]:
long_trips = trips_mapped.filter(lambda x: x.duration > (3 * 60 * 60))

unique_zip_codes = (
    long_trips
    .map(lambda trip: trip.zip_code)  
    .filter(lambda zip_code: zip_code != "")  
    .distinct()  
)

first_10_users = unique_zip_codes.take(10)

first_10_users

                                                                                

['58553',
 '94301',
 '94039',
 '94133',
 '93726',
 '94123',
 '4517',
 '29200',
 '45322',
 '94080']