In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession,functions
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [None]:
trips = spark.read.option("timestampFormat", 'M/d/y hh:mm').csv('trips.csv', header=True, sep=",")
trips.show(5)
stations = spark.read.option("timestampFormat", 'M/d/y hh:mm').csv('stations.csv', header=True, sep=",")
stations.show(5)

+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|  id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|4576|      63|           NULL|South Van Ness at...|              66|8/29/2013 14:14|South Van Ness at...|            66|    520|       Subscriber|   94127|
|4607|    NULL|8/29/2013 14:42|  San Jose City Hall|              10|8/29/2013 14:43|  San Jose City Hall|            10|    661|       Subscriber|   95138|
|4130|      71|8/29/2013 10:16|Mountain View Cit...|              27|8/29/2013 10:17|Mountain View Cit...|            27|     48|       Subscriber|   97214|
|4251|      77|8/29/2013 11:29|  San Jose City Hall|      

In [None]:
import pyspark.sql as psql

Найти велосипед с максимальным временем пробега.

In [None]:
most_rided_bike=trips.groupBy('bike_id').agg({'duration':'sum'}).orderBy('sum(duration)',ascending=False).first()
print(f"Номер велосипеда с наибольшим временем пробега: {most_rided_bike['bike_id']}. Общее время пробега: {most_rided_bike['sum(duration)']} секунд")

Номер велосипеда с наибольшим временем пробега: 535. Общее время пробега: 18611693.0 секунд


Найти наибольшее геодезическое расстояние между станциями.

In [None]:
from geopy.distance import distance #функция для вычисления геодезического расстояния

dist_func=functions.udf(lambda lat1,long1,lat2,long2: distance((lat1,long1),(lat2,long2)).km,psql.types.DoubleType())
max_dist=stations.alias('station1').join(stations.alias('station2'),\
functions.col('station1.name')!=functions.col('station2.name'),'inner').withColumn('distance',\
dist_func(functions.col('station1.lat'),functions.col('station1.long'),functions.col('station2.lat'),functions.col('station2.long'))).orderBy('distance',ascending=False).first()
print(f"Наибольшее расстояние составляет {max_dist['distance']} километров")


Наибольшее расстояние составляет 69.92096757764355 километров


Найти путь велосипеда с максимальным временем пробега через станции.

In [None]:
most_rided_bike=trips.groupBy('bike_id').agg({'duration':'sum'}).orderBy('sum(duration)',ascending=False).first()['bike_id']
print(f'Путь велосипеда с максимальным временем пробега (номер {most_rided_bike}):')
bike_trips=trips.filter(psql.functions.col('bike_id')==most_rided_bike)

sorted_stations=bike_trips.orderBy(functions.to_timestamp(bike_trips['start_date'],'M/d/yyyy H:mm')).select('start_station_id','start_station_name','end_station_id','end_station_name')
sorted_stations.show()

Путь велосипеда с максимальным временем пробега (номер 535):
+----------------+--------------------+--------------+--------------------+
|start_station_id|  start_station_name|end_station_id|    end_station_name|
+----------------+--------------------+--------------+--------------------+
|              47|     Post at Kearney|            70|San Francisco Cal...|
|              70|San Francisco Cal...|            69|San Francisco Cal...|
|              69|San Francisco Cal...|            77|   Market at Sansome|
|              77|   Market at Sansome|            64|   2nd at South Park|
|              61|     2nd at Townsend|            42|    Davis at Jackson|
|              58|San Francisco Cit...|            72|Civic Center BART...|
|              72|Civic Center BART...|            47|     Post at Kearney|
|              47|     Post at Kearney|            60|Embarcadero at Sa...|
|              60|Embarcadero at Sa...|            46|Washington at Kea...|
|              46|Washingto

Найи количество велосипедов в системе

In [None]:
bikes_count=trips.select(['bike_id']).distinct().count()
print(f'В системе {bikes_count} велосипедов')

В системе 700 велосипедов


Найти пользователей потративших на поездки более 3 часов.

In [None]:
riders_data=trips.groupBy('zip_code').agg({'duration':'sum'})
riders_data=riders_data.withColumn('duration_in_hours',riders_data['sum(duration)']/3600).select('zip_code','duration_in_hours')
long_riders=riders_data.filter(riders_data['duration_in_hours']>3)
long_riders.show(10)

+--------+------------------+
|zip_code| duration_in_hours|
+--------+------------------+
|   94102| 5313.339166666667|
|   95134|202.22861111111112|
|   84606|26.429166666666667|
|   80305|50.251666666666665|
|   60070| 8.033055555555556|
|   95519|            8.4175|
|   43085|3.2416666666666667|
|   91910|14.024444444444445|
|   77339|3.8091666666666666|
|   48063|3.8208333333333333|
+--------+------------------+
only showing top 10 rows

