<a href="https://colab.research.google.com/github/R1tsuko/big_data/blob/main/L1%20-%20Introduction%20to%20Apache%20Spark/BigData_Lab2_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
sc = spark.sparkContext
spark

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from google.colab import files
uploaded = files.upload()

Saving stations.csv to stations.csv


In [None]:
tripData = sc.textFile("trips.csv")
# запомним заголовок, чтобы затем его исключить из данных
tripsHeader = tripData.first()
trips = tripData.filter(lambda row: row != tripsHeader).map(lambda row: row.split(",", -1))

stationData = sc.textFile("stations.csv")
stationsHeader = stationData.first()
stations = stationData.filter(lambda row: row != stationsHeader).map(lambda row: row.split(",", -1))

In [None]:
from typing import NamedTuple
from datetime import datetime

In [None]:
def initStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str

    for station in stations:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y')
        )
def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str

    for trip in trips:
        try:
            yield Trip(
             trip_id = int(trip[0]),
             duration = int(trip[1]),
             start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
             start_station_name = trip[3],
             start_station_id = int(trip[4]),
             end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
             end_station_name = trip[6],
             end_station_id = trip[7],
             bike_id = int(trip[8]),
             subscription_type = trip[9],
             zip_code = trip[10]
            )
        except:
            pass

In [None]:
stationsInternal = stations.mapPartitions(initStation)
stationsInternal.first()

Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dockcount=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0))

In [None]:
tripsInternal = trips.mapPartitions(initTrip)
tripsInternal.first()

Trip(trip_id=4130, duration=71, start_date=datetime.datetime(2013, 8, 29, 10, 16), start_station_name='Mountain View City Hall', start_station_id=27, end_date=datetime.datetime(2013, 8, 29, 10, 17), end_station_name='Mountain View City Hall', end_station_id='27', bike_id=48, subscription_type='Subscriber', zip_code='97214')

#### Велосипед с максимальным временем пробега

In [None]:
tripsByBikeID = tripsInternal.keyBy(lambda trip: trip.bike_id)

In [None]:
from operator import add
bike_duratons_sorted = tripsByBikeID.mapValues(lambda trip: trip.duration).reduceByKey(add).sortBy(lambda x: x[1], ascending=False)

In [None]:
bike_duratons_sorted.take(5)

[(535, 18611693),
 (466, 3933272),
 (613, 2409014),
 (526, 2253019),
 (415, 2248886)]

#### Найти наибольшее геодезическое расстояние между станциями.

In [None]:
import pyproj
geod = pyproj.Geod(ellps='WGS84')
def finddist(coord1, coord2):
  _, _, distances_in_meters = geod.inv(
         coord1[1],
         coord1[0],
         coord2[1],
         coord2[0])
  return distances_in_meters

In [None]:
rdd = stationsInternal.map(lambda x: (x.lat, x.long))
sorted(list(rdd.cartesian(rdd).map(lambda x: finddist(x[0], x[1])).collect()))[-1]

69920.96757764355

#### Найти путь велосипеда с максимальным временем пробега через станции.

In [None]:
# вел с макс пробегом - 535
# сконвертировал дату в мс, думал что сортировка будет быстрее, но нет(
trips = tripsInternal.filter(lambda x: x.bike_id == 535).map(lambda x: (int(x.start_date.timestamp() * 1000), x.start_station_id)).sortByKey().map(lambda x: x[1])

In [None]:
trips.take(10)

[47, 70, 69, 77, 61, 58, 72, 47, 60, 46]

#### Найти количество велосипедов в системе.

In [None]:
bike_ids = tripsInternal.map(lambda trip: trip.bike_id).distinct()

In [None]:
bike_ids.count()

700

#### Найти пользователей потративших на поездки более 3 часов.

In [None]:
three_hours_in_sec = 10800
tripsInternal.map(lambda trip: (trip.zip_code, trip.duration)).reduceByKey(add).filter(lambda x: x[1] > three_hours_in_sec).keys().collect()

['95060',
 '94109',
 '94061',
 '94612',
 '95138',
 '94123',
 '94133',
 '94960',
 '94131',
 '',
 '1719',
 '94965',
 '94025',
 '95123',
 '8540',
 '94703',
 '95070',
 '94501',
 '94108',
 '94065',
 '94040',
 '10010',
 '94518',
 '20002',
 '94556',
 '94301',
 '10514',
 '98034',
 '78209',
 '77459',
 '97330',
 '94005',
 '94063',
 '58553',
 '94039',
 '94611',
 '94920',
 '94903',
 '15238',
 '97217',
 '94582',
 '94565',
 '98101',
 '93726',
 '30318',
 '45322',
 '78230',
 '33154',
 '4517',
 '29200',
 '94080',
 '92808',
 '94119',
 '89448',
 '20008',
 '92124',
 '11106',
 '2138',
 '91605',
 '85251',
 '60622',
 '94104',
 '90230',
 '94305',
 '90049',
 '91706',
 '98403',
 '5024',
 '89138',
 '11515',
 '28277',
 '34990',
 '94803',
 '92663',
 '91801',
 '95003',
 '8545',
 '95472',
 '29910',
 '91304',
 '94024',
 '11358',
 '60657',
 '91745',
 '78757',
 '11206',
 '95351',
 '33629',
 '80435',
 '60647',
 '94949',
 '1742',
 '60056',
 '92805',
 '53703',
 '21202',
 '94947',
 '91205',
 '95120',
 '85282',
 '76107',
 '