In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz

In [2]:
import os
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [3]:
!pip install findspark
import findspark
findspark.init()

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [5]:
!pip3 install pyspark==3.0.0

Collecting pyspark==3.0.0
  Downloading pyspark-3.0.0.tar.gz (204.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m204.7/204.7 MB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9 (from pyspark==3.0.0)
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m17.8 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044159 sha256=c11b9cbc7821c2bc6620c972d06c79c7755d6f4cd07ce891985cfd390baa1d0d
  Stored in directory: /root/.cache/pip/wheels/b1/bb/8b/ca24d3f756f2ed967225b0871898869db676eb5846df5adc56
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0

<h1><center>Инициализация</center></h1>

In [6]:
from pyspark import SparkContext, SparkConf
import pyspark.sql as sql
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf, col, max, sum, countDistinct

In [7]:
spark = SparkSession \
    .builder \
    .appName("L1_interactive_bike_analysis") \
    .getOrCreate()

<h1><center>Загрузка данных</center></h1>

In [8]:
import os
data_path = os.path.join(os.curdir, "data")
trips_path = os.path.join("trips.csv")
stations_path = os.path.join("stations.csv")

In [9]:
trips = spark.read \
        .option("header", True) \
        .option("inferSchema", True) \
        .option("DateTimeFormat", 'M/d/y H:m') \
        .csv(trips_path)

stat = spark.read \
        .option("header", True) \
        .option("inferSchema", True) \
        .option("DateTimeFormat", 'M/d/y H:m') \
        .csv(stations_path)

<a id='Задачи для данных велопарковок Сан-Франциско (trips.csv, stations.csv):'></a>
## Задачи для данных велопарковок Сан-Франциско (trips.csv, stations.csv):
><li>Найти велосипед с максимальным временем пробега.
><li>Найти наибольшее геодезическое расстояние между станциями.
><li>Найти путь велосипеда с максимальным временем пробега через станции.
><li>Найти количество велосипедов в системе.
><li>Найти пользователей потративших на поездки более 3 часов.

<h1>1. Найти велосипед с максимальным временем пробега</h1>

In [18]:
trip_dur = trips.groupBy("bike_id").agg(sum(col("duration")).alias("duration")) #группировка велосипедов по id; подсчет времени пробега для каждого
trips_dur = trip_dur.orderBy(col("duration").desc()).first() #велосипед с макс пробегом
bike_id = trips_dur["bike_id"]
dur = trips_dur["duration"]
print(f"Велосипед {bike_id} с макс временем {dur}")

Велосипед 535 с макс временем 18611693


<h1>2. Найти наибольшее геодезическое расстояние между станциями</h1>

In [22]:
from math import sqrt, radians, sin, cos, atan2

def dist(l1, long1, l2, long2):
    R = 6373
    l1 = radians(l1)
    l2 = radians(l2)

    long1 = radians(long1)
    long2 = radians(long2)

    dist = R * (2 * atan2(sqrt(sin((l2 - l1) / 2)**2 + cos(l1) * cos(l2) * sin((long2 - long1) / 2)**2), sqrt(1 - (sin((l2 - l1) / 2)**2 + cos(l1) * cos(l2) * sin((long2 - long1) / 2)**2))))

    return dist


geod_dist = udf(dist, DoubleType())

station = stat.alias("station1").crossJoin(stat.alias("station2")) #все пары станций
station_distance = station.withColumn("geodesic_distance", geod_dist(col("station1.lat"), col("station1.long"), col("station2.lat"), col("station2.long")))#расстояние для каждой пары

dist = station_distance.selectExpr("max(geodesic_distance) as max_distance").collect()[0]["max_distance"]
print(f"Наибольшее расстояние: {dist}")

Наибольшее расстояние: 69.9428256877473


<h1>3. Найти путь велосипеда с максимальным временем пробега через станции</h1>

In [23]:
dur = trips.select("start_station_name", "end_station_name", "duration").orderBy(col("duration").desc()).first() #ищем наиболее длительную поездку через сортировку

one = dur["start_station_name"] #начальная станция
two = dur["end_station_name"] #конечная станция
time = dur["duration"] #время поездки

print(f"Самая длинная поездка: из \"{one}\" в \"{two}\" занимает {time}")

Самая длинная поездка: из "South Van Ness at Market" в "2nd at Folsom" занимает 17270400


<h1>4. Найти количество велосипедов в системе</h1>

In [24]:
count = trips.agg(countDistinct("bike_id").alias("bike_count")).collect()[0]["bike_count"]

print(f"Количество велосипедов: {count}")

Количество велосипедов: 700


<h1>5. Найти пользователей потративших на поездки более 3 часов</h1>

In [25]:
user = trips.groupBy("bike_id").sum("duration").withColumnRenamed("sum(duration)", "time")#группируем по id и считаем общее время

user.filter("time>10800").show()

+-------+-------+
|bike_id|   time|
+-------+-------+
|    471|1718831|
|    496|1679568|
|    148| 332138|
|    463|1722796|
|    540|1752835|
|    392|1789476|
|    623|2037219|
|    243| 307458|
|    516|1896751|
|     31| 407907|
|    580|1034382|
|    137|1529200|
|    251|1282980|
|    451|1695574|
|     85|1214769|
|    458|1647080|
|     65| 216922|
|    588| 266415|
|    255| 396395|
|     53| 226389|
+-------+-------+
only showing top 20 rows

