In [1]:
from pyspark import SparkContext, SparkConf

app_name = "lab1"
conf = SparkConf().setAppName(app_name).setMaster('local[1]')
sc = SparkContext(conf=conf)




In [2]:
!hadoop fs -put /mnt/data /data

In [3]:
!hadoop fs -ls /data

Found 1 items
drwxr-xr-x   - root root          0 2022-12-17 08:55 /data/data


In [4]:
warandpeace = sc.textFile("warandsociety.txt")
warandpeace.count()

12851

In [5]:
nilFile = sc.textFile("nil")

In [6]:
warandpeace.take(10)

['Лев Николаевич Толстой',
 'Война и мир. Книга 1',
 '',
 'Война и мир – 1',
 '',
 ' ',
 ' http://www.lib.ru',
 '',
 'Аннотация ',
 '']

In [7]:
linesWithWar = warandpeace.filter(lambda x: "война" in x)
linesWithWar.first()

"– Еh bien, mon prince. Genes et Lucques ne sont plus que des apanages, des поместья, de la famille Buonaparte. Non, je vous previens, que si vous ne me dites pas, que nous avons la guerre, si vous vous permettez encore de pallier toutes les infamies, toutes les atrocites de cet Antichrist (ma parole, j'y crois) – je ne vous connais plus, vous n'etes plus mon ami, vous n'etes plus мой верный раб, comme vous dites. [Ну, что, князь, Генуа и Лукка стали не больше, как поместьями фамилии Бонапарте. Нет, я вас предупреждаю, если вы мне не скажете, что у нас война, если вы еще позволите себе защищать все гадости, все ужасы этого Антихриста (право, я верю, что он Антихрист) – я вас больше не знаю, вы уж не друг мой, вы уж не мой верный раб, как вы говорите.] Ну, здравствуйте, здравствуйте. Je vois que je vous fais peur, [Я вижу, что я вас пугаю,] садитесь и рассказывайте."

In [8]:
def time(f):    
    import time
    t = time.process_time()
    f()
    print(f"Elapsed time: {int((time.process_time() - t)*1e9)} ns")
linesWithWar.cache()
time(lambda: linesWithWar.count() )
time(lambda: linesWithWar.count() )

Elapsed time: 12721399 ns
Elapsed time: 10819499 ns


In [9]:
wordCounts = linesWithWar.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

In [10]:
wordCounts.top(5, lambda x: x[1])

[('и', 250), ('что', 152), ('не', 117), ('в', 108), ('–', 92)]

In [11]:
trips_data = sc.textFile("trips.csv")
stations_data = sc.textFile("stations.csv")

In [12]:
from typing import NamedTuple
from datetime import datetime
from functools import reduce

def initStations(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str
    
    for station in stations:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y')
        )
        
def initTrips(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str
        
    for trip in trips:
        try:
            yield Trip(                             
             trip_id = int(trip[0]),
             duration = int(trip[1]),
             start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
             start_station_name = trip[3],
             start_station_id = int(trip[4]),
             end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
             end_station_name = trip[6],
             end_station_id = trip[7],
             bike_id = int(trip[8]),
             subscription_type = trip[9],
             zip_code = trip[10]
            ) 
        except:
            pass

In [13]:
def GetDataFromTable(data):
    columns = data.first()
    table = data.filter(lambda row: row != columns)\
                .map(lambda row: row.split(","))
    return columns, table

In [14]:
!ls
!head trips.csv

Untitled.ipynb	    get-pip.py	nyctaxi.txt   trips.csv
docker-compose.yml  lab1_	stations.csv  warandsociety.txt
id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code
4576,63,,South Van Ness at Market,66,8/29/2013 14:14,South Van Ness at Market,66,520,Subscriber,94127
4607,,8/29/2013 14:42,San Jose City Hall,10,8/29/2013 14:43,San Jose City Hall,10,661,Subscriber,95138
4130,71,8/29/2013 10:16,Mountain View City Hall,27,8/29/2013 10:17,Mountain View City Hall,27,48,Subscriber,97214
4251,77,8/29/2013 11:29,San Jose City Hall,10,8/29/2013 11:30,San Jose City Hall,10,26,Subscriber,95060
4299,83,8/29/2013 12:02,South Van Ness at Market,66,8/29/2013 12:04,Market at 10th,67,319,Subscriber,94103
4927,103,8/29/2013 18:54,Golden Gate at Polk,59,8/29/2013 18:56,Golden Gate at Polk,59,527,Subscriber,94109
4500,109,8/29/2013 13:25,Santa Clara at Almaden,4,8/29/2013 13:27,Adobe on Almaden,5,679,Subscriber,95112
4563,111,

In [15]:
trips_data.first()

'id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code'

In [16]:
trips_headers = trips_data.first()
stations_headers = stations_data.first()

In [17]:
trips = trips_data.filter(lambda x: x != trips_headers).map(lambda x: x.split(","))

stations = stations_data.filter(lambda x: x != stations_headers).map(lambda x: x.split(","))

In [18]:
for data in trips.take(3):
    print(data)

['4576', '63', '', 'South Van Ness at Market', '66', '8/29/2013 14:14', 'South Van Ness at Market', '66', '520', 'Subscriber', '94127']
['4607', '', '8/29/2013 14:42', 'San Jose City Hall', '10', '8/29/2013 14:43', 'San Jose City Hall', '10', '661', 'Subscriber', '95138']
['4130', '71', '8/29/2013 10:16', 'Mountain View City Hall', '27', '8/29/2013 10:17', 'Mountain View City Hall', '27', '48', 'Subscriber', '97214']


In [19]:
for data in stations.take(3):
    print(data)

['2', 'San Jose Diridon Caltrain Station', '37.329732', '-121.90178200000001', '27', 'San Jose', '8/6/2013']
['3', 'San Jose Civic Center', '37.330698', '-121.888979', '15', 'San Jose', '8/5/2013']
['4', 'Santa Clara at Almaden', '37.333988', '-121.894902', '11', 'San Jose', '8/6/2013']


In [20]:
trips_MP = trips.mapPartitions(initTrips)
trips_MP.first()

Trip(trip_id=4130, duration=71, start_date=datetime.datetime(2013, 8, 29, 10, 16), start_station_name='Mountain View City Hall', start_station_id=27, end_date=datetime.datetime(2013, 8, 29, 10, 17), end_station_name='Mountain View City Hall', end_station_id='27', bike_id=48, subscription_type='Subscriber', zip_code='97214')

In [21]:
stations_MP = stations.mapPartitions(initStations)
stations_MP.first()

Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dockcount=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0))

In [22]:
# 1. Найти велосипед с максимальным временем пробега.
bike_max_duration = trips_MP.map(lambda trip: (trip.bike_id, trip.duration))\
                  .reduceByKey(lambda a, b: a + b)\
                  .top(1, key = lambda x: x[1])

print(f'Bike id with max duration: {bike_max_duration[0][0]}')

Bike id with max duration: 535


In [23]:
# 2. Найти наибольшее геодезическое расстояние между станциями
from math import radians, cos, sin, asin, sqrt
def distance(lat1, lon1, lat2, lon2):
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    km = 6371* c
    return km

all_station = stations_MP.map(lambda station: (0, (station.station_id, station.lat, station.long)))
answer = all_station.join(all_station)\
    .map(lambda row: row[1])\
    .filter(lambda row: row[0][0]<row[1][0])\
    .map(lambda row: ((row[0][0], row[1][0]), distance(row[0][1], row[0][2], row[1][1], row[1][2])))\
    .sortBy(lambda dist: dist[1], ascending=False)
print(answer.first())

((16, 60), 4451.29398191479)


In [24]:
# 3. Найти путь велосипеда с максимальным временем пробега через станции
path = trips_MP.filter(lambda x: x.bike_id == bike_max_duration[0][0])\
    .sortBy(lambda trip: trip.start_date)\
    .map(lambda trip: (trip.start_station_name, trip.end_station_name))
print(f'{path.first()[0]}')
for station in path.collect():
    print(f' -> {station[1]}')

Post at Kearney
 -> San Francisco Caltrain (Townsend at 4th)
 -> San Francisco Caltrain 2 (330 Townsend)
 -> Market at Sansome
 -> 2nd at South Park
 -> Davis at Jackson
 -> Civic Center BART (7th at Market)
 -> Post at Kearney
 -> Embarcadero at Sansome
 -> Washington at Kearney
 -> Market at Sansome
 -> Market at Sansome
 -> 2nd at Folsom
 -> 2nd at Townsend
 -> 2nd at Townsend
 -> Embarcadero at Sansome
 -> Clay at Battery
 -> Harry Bridges Plaza (Ferry Building)
 -> Clay at Battery
 -> San Francisco Caltrain (Townsend at 4th)
 -> Steuart at Market
 -> 2nd at Townsend
 -> Harry Bridges Plaza (Ferry Building)
 -> Townsend at 7th
 -> San Francisco Caltrain (Townsend at 4th)
 -> San Francisco Caltrain 2 (330 Townsend)
 -> Townsend at 7th
 -> San Francisco Caltrain (Townsend at 4th)
 -> 2nd at South Park
 -> 5th at Howard
 -> San Francisco Caltrain 2 (330 Townsend)
 -> Mechanics Plaza (Market at Battery)
 -> Powell at Post (Union Square)
 -> Powell at Post (Union Square)
 -> 5th at Howa

 -> Steuart at Market
 -> Embarcadero at Sansome
 -> Commercial at Montgomery
 -> San Francisco Caltrain (Townsend at 4th)
 -> 2nd at Townsend
 -> Commercial at Montgomery
 -> Temporary Transbay Terminal (Howard at Beale)
 -> Embarcadero at Sansome
 -> Steuart at Market
 -> Embarcadero at Sansome
 -> Steuart at Market
 -> San Francisco Caltrain 2 (330 Townsend)
 -> 5th at Howard
 -> San Francisco Caltrain (Townsend at 4th)
 -> Harry Bridges Plaza (Ferry Building)
 -> Embarcadero at Sansome
 -> San Francisco Caltrain (Townsend at 4th)
 -> Townsend at 7th
 -> Market at 4th
 -> San Francisco Caltrain (Townsend at 4th)
 -> Embarcadero at Bryant
 -> Civic Center BART (7th at Market)
 -> Embarcadero at Vallejo
 -> Embarcadero at Sansome
 -> Harry Bridges Plaza (Ferry Building)
 -> San Francisco Caltrain (Townsend at 4th)
 -> Broadway St at Battery St
 -> Steuart at Market
 -> Market at Sansome
 -> Market at 4th
 -> Steuart at Market
 -> Embarcadero at Sansome
 -> Steuart at Market
 -> Embarc

In [25]:
# 4. Найти количество велосипедов в системе
vehicle_count = trips_MP.map(lambda trip: trip.bike_id).distinct().count()
print(vehicle_count)

700


In [26]:
# 5. Найти пользователей потративших на поездки более 3 часов
from pyspark.sql.functions import sum,avg,max,count
seconds = 3600 * 3
users = trips_MP.map(lambda trip: (trip.zip_code, trip.duration))\
    .reduceByKey(lambda a, b: a + b)\
    .filter(lambda user: user[1] > seconds)
print(f'Count users: {users.count()}')
print(users.take(10))

Count users: 3661
[('95060', 758576), ('94109', 12057128), ('94061', 3049397), ('94612', 1860796), ('95138', 155295), ('94123', 1895963), ('94133', 21637675), ('94960', 1439873), ('94131', 3143302), ('', 27723273)]
