## Решите следующие задачи для данных велопарковок Сан-Франциско (trips.csv, stations.csv):

1.	Найти велосипед с максимальным временем пробега.
2.	Найти наибольшее геодезическое расстояние между станциями.
3.	Найти путь велосипеда с максимальным временем пробега через станции.
4.	Найти количество велосипедов в системе.
5.	Найти пользователей потративших на поездки более 3 часов.

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark
from pyspark.sql import functions as F

In [3]:
spark = SparkSession.builder.appName("lr1").config("spark.sql.legacy.timeParserPolicy", "LEGACY").getOrCreate()

In [4]:
trips_df = spark.read.option("header", "true").option("inferSchema", "true").csv("trip.csv")
print(trips_df.dtypes)

stations_df = spark.read.option("header", "true").option("inferSchema", "true").csv("station.csv")
print(stations_df.dtypes)

[('id', 'int'), ('duration', 'int'), ('start_date', 'string'), ('start_station_name', 'string'), ('start_station_id', 'int'), ('end_date', 'string'), ('end_station_name', 'string'), ('end_station_id', 'int'), ('bike_id', 'int'), ('subscription_type', 'string'), ('zip_code', 'string')]
[('id', 'int'), ('name', 'string'), ('lat', 'double'), ('long', 'double'), ('dock_count', 'int'), ('city', 'string'), ('installation_date', 'string')]


In [5]:
# Найти велосипед с максимальным временем пробега.
result =(trips_df.groupBy("bike_id")
         .agg((F.sum("duration")/3600).alias("sum_duration"))
         .orderBy(F.desc("sum_duration"))
         .limit(1)
         .select("bike_id", F.round("sum_duration", 1).alias("duration"))
).collect()
print(result)


[Row(bike_id=535, duration=5169.9)]


In [6]:
# Найти наибольшее геодезическое расстояние между станциями
def calc_dist(lat1, lon1, lat2, lon2):
    R = 6371 # Радиус Земли
    lat1, lat2, lon1, lon2 = F.radians(lat1), F.radians(lat2), F.radians(lon1), F.radians(lon2)
    return R * F.acos(F.sin(lat1) * F.sin(lat2) + F.cos(lat1) * F.cos(lat2) * F.cos(lon2 - lon1))

max_distance = (stations_df.alias("station1")
                .crossJoin(stations_df.alias("station2"))
                .filter(F.col("station1.id") < F.col("station2.id"))
                .withColumn("distance", calc_dist(F.col("station1.lat"), F.col("station1.long"), F.col("station2.lat"), F.col("station2.long")))
                .agg(F.max("distance").alias("max_distance"))
                .collect()[0]["max_distance"])

print(max_distance)

69.92087595421542


In [7]:
# Найти путь велосипеда с максимальным временем пробега через станции.

# Первое задание
max_duration_bike =(trips_df.groupBy("bike_id")
         .agg((F.sum("duration")/3600).alias("sum_duration"))
         .orderBy(F.desc("sum_duration"))
         .limit(1)
         .select("bike_id", F.round("sum_duration", 1))
).collect()

bike_trips = (trips_df.filter(F.col("bike_id")==max_duration_bike[0]["bike_id"])
             .select("start_date", "start_station_name", "end_date", "end_station_name")
             .orderBy("start_date")
).collect()

print(bike_trips)

[Row(start_date='1/1/2014 13:42', start_station_name='Mechanics Plaza (Market at Battery)', end_date='1/1/2014 14:36', end_station_name='Embarcadero at Sansome'), Row(start_date='1/1/2014 18:51', start_station_name='Embarcadero at Sansome', end_date='1/1/2014 19:13', end_station_name='Market at 4th'), Row(start_date='1/1/2014 19:48', start_station_name='Market at 4th', end_date='1/1/2014 20:01', end_station_name='South Van Ness at Market'), Row(start_date='1/10/2014 20:13', start_station_name='Market at 10th', end_date='1/10/2014 20:17', end_station_name='Powell Street BART'), Row(start_date='1/10/2014 8:09', start_station_name='Embarcadero at Folsom', end_date='1/10/2014 8:19', end_station_name='San Francisco Caltrain (Townsend at 4th)'), Row(start_date='1/10/2014 8:21', start_station_name='San Francisco Caltrain (Townsend at 4th)', end_date='1/10/2014 8:31', end_station_name='Temporary Transbay Terminal (Howard at Beale)'), Row(start_date='1/10/2014 9:19', start_station_name='Tempora

In [8]:
# Найти количество велосипедов в системе.
bike_count=(trips_df.groupBy("bike_id").count().count())

print(bike_count)


700


In [9]:
# Найти пользователей потративших на поездки более 3 часов.
users = (trips_df.groupBy("zip_code")
         .agg((F.sum("duration")/3600).alias("sum_duration"))
         .filter(F.col("sum_duration")>3)
         .select("zip_code", F.round("sum_duration",1).alias("sum_duration"))
         .filter(F.col("zip_code")!="nil")
         .orderBy(F.desc("sum_duration"))
).collect()

print(users)

[Row(zip_code='94107', sum_duration=13821.4), Row(zip_code='94105', sum_duration=7110.0), Row(zip_code='94133', sum_duration=6010.5), Row(zip_code='94102', sum_duration=5313.3), Row(zip_code='94103', sum_duration=5313.2), Row(zip_code='95531', sum_duration=4797.3), Row(zip_code='94111', sum_duration=3956.9), Row(zip_code='95112', sum_duration=3539.5), Row(zip_code='94109', sum_duration=3349.2), Row(zip_code='94040', sum_duration=2168.9), Row(zip_code='94110', sum_duration=2061.6), Row(zip_code='94117', sum_duration=1917.0), Row(zip_code='94301', sum_duration=1830.7), Row(zip_code='94041', sum_duration=1743.4), Row(zip_code='94158', sum_duration=1735.6), Row(zip_code='94306', sum_duration=1541.8), Row(zip_code='94025', sum_duration=1438.4), Row(zip_code='94108', sum_duration=1424.3), Row(zip_code='94611', sum_duration=1393.0), Row(zip_code='94010', sum_duration=1333.4), Row(zip_code='94403', sum_duration=1301.8), Row(zip_code='95110', sum_duration=1270.5), Row(zip_code='94114', sum_dura