In [1]:
!pip3 install pyspark==3.0.0

Collecting pyspark==3.0.0
  Downloading pyspark-3.0.0.tar.gz (204.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m204.7/204.7 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9 (from pyspark==3.0.0)
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m15.1 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044159 sha256=9453139688632b5bbddab9429375bf1b5dbae6d1628c550de6c2cecabe72bd10
  Stored in directory: /root/.cache/pip/wheels/b1/bb/8b/ca24d3f756f2ed967225b0871898869db676eb5846df5adc56
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0

In [2]:
import pyspark.sql as sql
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import functions
import requests
from geopy.distance import geodesic

In [3]:
try:
    sc = SparkContext.getOrCreate()
    sc.setLogLevel("ERROR")
except:
    conf = SparkConf().setAppName("lb1").setMaster('local[1]')
    sc = SparkContext(conf=conf)
    sc.setLogLevel("ERROR")

spark = SparkSession(sc)

sc

In [4]:
with open("stations.csv", "wb") as f:
    request = requests.get("https://git.ai.ssau.ru/tk/big_data/raw/branch/bachelor/data/stations.csv")
    f.write(request.content)

with open("trips.csv", "wb") as f:
    request = requests.get("https://git.ai.ssau.ru/tk/big_data/raw/branch/bachelor/data/trips.csv")
    f.write(request.content)

In [5]:
tripData = spark.read\
  .option("header", True)\
  .option("inferSchema", True)\
  .option("timestampFormat", 'M/d/y H:m')\
  .csv("trips.csv")

tripData

DataFrame[id: int, duration: int, start_date: timestamp, start_station_name: string, start_station_id: int, end_date: timestamp, end_station_name: string, end_station_id: int, bike_id: int, subscription_type: string, zip_code: string]

In [6]:
stationData = spark.read\
  .option("header", True)\
  .option("inferSchema", True)\
  .option("timestampFormat", 'M/d/y')\
  .csv("stations.csv")

stationData

DataFrame[id: int, name: string, lat: double, long: double, dock_count: int, city: string, installation_date: timestamp]

#**Задание №1: Найти велосипед с максимальным временем пробега**

In [8]:
tripData\
  .groupBy("bike_id")\
  .agg(
        functions\
        .sum("duration")\
        .alias("dur")
      )\
  .orderBy(functions.desc("dur"))\
  .limit(1)\
  .show()

+-------+--------+
|bike_id|     dur|
+-------+--------+
|    535|18611693|
+-------+--------+



#**Задание 2: Найти наибольшее геодезическое расстояние между станциями.**


In [9]:
stations_coordinates = stationData\
  .select("id", "lat", "long")\
  .withColumnRenamed("lat", "lat1")\
  .withColumnRenamed("long", "long1")\
  .createOrReplaceTempView("stations_coordinates")\


station_combinations = spark.sql("""
    SELECT a.id as station1, b.id as station2, a.lat1, a.long1, b.lat1 as lat2, b.long1 as long2
    FROM stations_coordinates a
    CROSS JOIN stations_coordinates b
    WHERE a.id < b.id
""")

calculate_distance_udf = spark.udf.register(
    "calculate_distance",
     lambda lat1, lon1, lat2, lon2: geodesic((lat1, lon1), (lat2, lon2)).kilometers
)

print(
  station_combinations\
    .withColumn("distance", calculate_distance_udf("lat1", "long1", "lat2", "long2"))\
    .select("station1", "station2", "distance").orderBy(col("distance").desc())\
    .first()\
    ['distance']
)

9.669526104642657


#**Задание 3. Найти путь велосипеда с максимальным временем пробега через станции.**

In [10]:
max_dur_trip = tripData\
    .orderBy(col("duration").desc())\
    .limit(1)\
    .select("start_station_name", "end_station_name")\
    .first()

filtered_joined_station = stationData.filter((col("name") == max_dur_trip.start_station_name) | (col("name") == max_dur_trip.end_station_name))

filtered_joined_station\
  .crossJoin(filtered_joined_station.select(col("name").alias("end_station_name"), col("lat").alias("end_lat"), col("long").alias("end_long")))\
  .withColumn("distance", calculate_distance_udf(col("lat"), col("long"), col("end_lat"), col("end_long")))\
  .select("name", "end_station_name", "distance")\
  .filter((col("name") != col("end_station_name")) & (col("distance") != 0))\
  .show()

+--------------------+--------------------+------------------+
|                name|    end_station_name|          distance|
+--------------------+--------------------+------------------+
|       2nd at Folsom|South Van Ness at...|2.3150845505323323|
|South Van Ness at...|       2nd at Folsom|2.3150845505323323|
+--------------------+--------------------+------------------+



#**Задание 4. Найти количество велосипедов в системе.**

In [11]:
print(
  tripData\
    .select("bike_id")\
    .distinct()\
    .count()
)

700


#**Задание 5. Найти пользователей потративших на поездки более 3 часов.**


In [12]:
tripData\
  .groupBy("id")\
  .agg({"duration": "sum"})\
  .withColumnRenamed("sum(duration)", "sum_duration")\
  .filter(col("sum_duration") > 60*60*3)\
  .show()

+------+------------+
|    id|sum_duration|
+------+------------+
|  6654|       17751|
| 22097|       21686|
| 22223|       15619|
| 30654|       13479|
| 34759|       17959|
| 43688|       22504|
| 88666|       21964|
| 88674|       13726|
|105536|       19854|
|143153|       20649|
|146988|       44084|
|189310|       21785|
|431881|       28377|
|431018|       12301|
|427387|       12612|
|418759|       15526|
|418461|       15103|
|410754|       16743|
|386707|       14313|
|305619|       12412|
+------+------------+
only showing top 20 rows

