In [1]:
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

Get:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [1,381 kB]
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:7 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Get:10 https://r2u.stat.illinois.edu/ubuntu jammy/main all Packages [8,788 kB]
Hit:11 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:12 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 Packages [1,239 kB]
Get:13 http://archive.ubuntu.com/u

In [2]:
!wget -q https://archive.apache.org/dist/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz
!tar -xzf spark-3.5.5-bin-hadoop3.tgz
!mv spark-3.5.5-bin-hadoop3 /content/spark

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark"
os.environ["PATH"] += ":/content/spark/bin"

In [4]:
!pip install -q findspark

In [5]:
import findspark
findspark.init()

In [6]:
from pyspark import SparkContext, SparkConf
import pyspark.sql as sql
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf, col, max, sum, countDistinct
from pyspark.sql.functions import radians, cos, sin, sqrt, asin

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

sc = spark.sparkContext

print(sc.version)

3.5.5


In [8]:
from google.colab import files
uploaded = files.upload()

Saving trips.csv to trips.csv


In [9]:
uploaded = files.upload()

Saving stations.csv to stations.csv


In [10]:
spark = SparkSession.builder.appName("MySparkApp").getOrCreate()

trips = spark.read.csv("trips.csv", header=True, inferSchema=True)
stations = spark.read.csv("stations.csv", header=True, inferSchema=True)

trips.printSchema()
trips.show(5, truncate=False)

stations.printSchema()
stations.show(5, truncate=False)

root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: string (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: string (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)

+----+--------+---------------+------------------------+----------------+---------------+------------------------+--------------+-------+-----------------+--------+
|id  |duration|start_date     |start_station_name      |start_station_id|end_date       |end_station_name        |end_station_id|bike_id|subscription_type|zip_code|
+----+--------+---------------+------------------------+----------------+---------------+------------------------+--------------+-------+-----------------+--------+
|4576|63      |NUL

1. Найти велосипед с максимальным временем пробега.

In [11]:
bike_max_duration = (
    trips
    .groupBy("bike_id") # группируем данные по bike_id
    .sum("duration")# находим суммарную продолжительность всех поездок для каждого велосипеда
    .withColumnRenamed("sum(duration)", "total_duration")
    .orderBy(col("total_duration").desc())# сортируем по убыванию
    .limit(1) # выводим максимальную продолжительность
)

bike_max_duration.show()

+-------+--------------+
|bike_id|total_duration|
+-------+--------------+
|    535|      18611693|
+-------+--------------+



2. Найти наибольшее геодезическое расстояние между станциями.

In [12]:
# создаём копию датафрейма станций, изменяя названия колонок
stations2 = stations.select(
    col("id").alias("id2"),
    col("name").alias("name2"),
    col("lat").alias("lat2"),
    col("long").alias("long2")
)

# создаём пары станций
station_pairs = stations.crossJoin(stations2)

# рассчитываем геодезическое расстояние
distance= station_pairs.withColumn(
    "distance",
    2 * 6371 * asin(sqrt(
        sin((radians(col("lat2")) - radians(col("lat"))) / 2) ** 2 +
        cos(radians(col("lat"))) * cos(radians(col("lat2"))) *
        sin((radians(col("long2")) - radians(col("long"))) / 2) ** 2
    ))
)

# выводим максимальное расстояние
max_distance = distance.orderBy(col("distance").desc()).limit(1)
max_distance.select("id", "name", "id2", "name2", "distance").show(truncate=False)


+---+--------------------------+---+----------------------+-----------------+
|id |name                      |id2|name2                 |distance         |
+---+--------------------------+---+----------------------+-----------------+
|16 |SJSU - San Salvador at 9th|60 |Embarcadero at Sansome|69.92087595428183|
+---+--------------------------+---+----------------------+-----------------+



3. Найти путь велосипеда с максимальным временем пробега через станции.

In [13]:
longest_trip = (
    trips
    .select("start_station_name", "end_station_name", "duration")
    .orderBy(col("duration").desc()) # сортируем по убыванию длительности
    .limit(1) # выводим максимальное время
)

longest_trip.show(truncate=False)

+------------------------+----------------+--------+
|start_station_name      |end_station_name|duration|
+------------------------+----------------+--------+
|South Van Ness at Market|2nd at Folsom   |17270400|
+------------------------+----------------+--------+



4. Найти количество велосипедов в системе.

In [14]:
bike_count = trips.select(countDistinct("bike_id").alias("total_bikes")) # подсчет количества уникальных значений по bike_id
bike_count.show()

+-----------+
|total_bikes|
+-----------+
|        700|
+-----------+



5. Найти пользователей потративших на поездки более 3 часов.

In [15]:
long_duration_trips = trips.filter(col("duration") > 3 * 60 * 60) #оставляем только поездки, где продолжительность больше 3х часов
long_duration_trips.show(truncate=False)

+----+--------+---------------+------------------------------------+----------------+---------------+------------------------------------+--------------+-------+-----------------+--------+
|id  |duration|start_date     |start_station_name                  |start_station_id|end_date       |end_station_name                    |end_station_id|bike_id|subscription_type|zip_code|
+----+--------+---------------+------------------------------------+----------------+---------------+------------------------------------+--------------+-------+-----------------+--------+
|4639|11118   |8/29/2013 15:18|Market at 4th                       |76              |8/29/2013 18:23|Market at 4th                       |76            |433    |Customer         |NULL    |
|4637|11272   |8/29/2013 15:17|Market at 4th                       |76              |8/29/2013 18:25|Market at 4th                       |76            |377    |Customer         |NULL    |
|4528|12280   |8/29/2013 13:39|Paseo de San Antonio    