In [1]:
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession
import pyspark.sql as sql
from pyspark.sql.functions import col

In [2]:
try:
    sc = SparkContext.getOrCreate()
    sc.setLogLevel("ERROR")  
except:
    conf = SparkConf().setAppName("Lab1").setMaster('local[1]')
    sc = SparkContext(conf=conf)
    sc.setLogLevel("ERROR")  
spark = SparkSession(sc)

sc

In [3]:
!hadoop fs -put * .

put: `list_of_countries_sorted_gini.txt': File exists
put: `nyctaxi.csv': File exists
put: `programming-languages.csv': File exists
put: `stations.csv': File exists
put: `trips.csv': File exists
put: `Untitled.ipynb': File exists
put: `warandsociety.txt': File exists


In [4]:
!hadoop fs -ls

Found 9 items
drwxr-xr-x   - andiyash andiyash          1 2023-12-22 20:58 .sparkStaging
-rwxr-xr-x   3 andiyash andiyash       1318 2023-12-22 15:51 Untitled.ipynb
drwxr-xr-x   - andiyash andiyash          6 2023-12-22 15:51 data
-rwxr-xr-x   3 andiyash andiyash        394 2023-12-22 15:51 list_of_countries_sorted_gini.txt
-rwxr-xr-x   3 andiyash andiyash   79500408 2023-12-22 15:51 nyctaxi.csv
-rwxr-xr-x   3 andiyash andiyash      40269 2023-12-22 15:51 programming-languages.csv
-rwxr-xr-x   3 andiyash andiyash       5647 2023-12-22 15:51 stations.csv
-rwxr-xr-x   3 andiyash andiyash   80208831 2023-12-22 16:54 trips.csv
-rwxr-xr-x   3 andiyash andiyash    5315699 2023-12-22 00:46 warandsociety.txt


In [5]:
#Чтение файлов (взял Ваш код)
tripData = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv("trips.csv")

tripData


DataFrame[id: int, duration: int, start_date: timestamp, start_station_name: string, start_station_id: int, end_date: timestamp, end_station_name: string, end_station_id: int, bike_id: int, subscription_type: string, zip_code: string]

In [6]:
# Чтение данных из файла "stations.csv" и создание DataFrame stationData
stationData = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y')\
.csv("stations.csv")

stationData

DataFrame[id: int, name: string, lat: double, long: double, dock_count: int, city: string, installation_date: timestamp]

Задание №1: Найти велосипед с максимальным временем пробега

In [7]:
from pyspark.sql import functions as F
# Выполнение запроса с использованием DataFrame API
result = tripData.groupBy("bike_id").agg(F.sum("duration").alias("dur")) \
                 .orderBy(F.desc("dur")).limit(1)

# Вывод результатов
print("Результат:")
result.show()

Результат:
+-------+--------+
|bike_id|     dur|
+-------+--------+
|    535|36229902|
+-------+--------+



Задание 2: Найти наибольшее геодезическое расстояние между станциями.

In [41]:
!pip install geopy

Defaulting to user installation because normal site-packages is not writeable


In [8]:
from math import radians, cos, sin, asin, sqrt
from geopy.distance import geodesic
# Выбор необходимых столбцов для расчета расстояния
stations_coordinates = stationData.select("id", "lat", "long").withColumnRenamed("lat", "lat1").withColumnRenamed("long", "long1")
stations_coordinates.createOrReplaceTempView("stations_coordinates")

# Кросс-джойн для получения всех возможных комбинаций станций
station_combinations = spark.sql("""
    SELECT a.id as station1, b.id as station2, a.lat1, a.long1, b.lat1 as lat2, b.long1 as long2
    FROM stations_coordinates a
    CROSS JOIN stations_coordinates b
    WHERE a.id < b.id
""")

# Функция для вычисления геодезического расстояния
def calculate_distance(lat1, lon1, lat2, lon2):
    coord1 = (lat1, lon1)
    coord2 = (lat2, lon2)
    return geodesic(coord1, coord2).kilometers

# Регистрация функции UDF
calculate_distance_udf = spark.udf.register("calculate_distance", calculate_distance)

# Вычисление расстояния
result = station_combinations.withColumn("distance", calculate_distance_udf("lat1", "long1", "lat2", "long2"))

# Нахождение максимального расстояния
max_distance = result.select("station1", "station2", "distance").orderBy(col("distance").desc()).first()

print(f"Наибольшее геодезическое расстояние между станциями {max_distance['station1']} и {max_distance['station2']}: {max_distance['distance']} км")

Наибольшее геодезическое расстояние между станциями 24 и 36: 9.669526104642657 км


In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from math import radians, cos, sin, asin, sqrt




# Определение велосипеда с максимальным временем пробега
max_duration_trip = tripData.orderBy(col("duration").desc()).limit(1).select("start_station_name", "end_station_name").first()

# Фильтрация данных для велосипеда с максимальным временем пробега
filtered_joined_station = stationData \
    .filter((col("name") == max_duration_trip.start_station_name) |
            (col("name") == max_duration_trip.end_station_name))

# Создание нового DataFrame, который содержит расстояние между станциями
result_3 = filtered_joined_station.crossJoin(
    filtered_joined_station.select(col("name").alias("end_station_name"), col("lat").alias("end_lat"), col("long").alias("end_long"))
) \
.withColumn("distance", calculate_distance_udf(col("lat"), col("long"), col("end_lat"), col("end_long"))) \
.select("name", "end_station_name", "distance") \
.filter((col("name") != col("end_station_name")) & (col("distance") != 0))  # Исключаем строки, где начальная и конечная станции совпадают и расстояние равно 0

# Вывод результата
result_3.show()

+--------------------+--------------------+------------------+
|                name|    end_station_name|          distance|
+--------------------+--------------------+------------------+
|       2nd at Folsom|South Van Ness at...|2.3150845505323323|
|South Van Ness at...|       2nd at Folsom|2.3150845505323323|
+--------------------+--------------------+------------------+



Задание 4. Найти количество велосипедов в системе.

In [28]:
# Подсчет уникальных велосипедов в DataFrame tripData
bike_count = tripData.select("bike_id").distinct().count()

# Вывод результата
print(f"Количество велосипедов в системе: {bike_count}")

Количество велосипедов в системе: 700


Задание 5. Найти пользователей потративших на поездки более 3 часов.

In [33]:
# Группировка данных по клиентам и суммирование длительности поездок
grouped_data = tripData.groupBy("id").agg({"duration": "sum"}).withColumnRenamed("sum(duration)", "sum_duration")

# Фильтрация данных для клиентов, потративших на поездки более 3 часов
filtered_data = grouped_data.filter(col("sum_duration") > 10800)

# Вывод результата
print("Результат:")
filtered_data.show()


Результат:
+------+------------+
|    id|sum_duration|
+------+------------+
|  6654|       17751|
| 22097|       21686|
| 22223|       15619|
| 30654|       13479|
| 34759|       17959|
| 43688|       22504|
| 88666|       21964|
| 88674|       13726|
|105536|       19854|
|143153|       20649|
|146988|       44084|
|189310|       21785|
|431881|       28377|
|431018|       12301|
|427387|       12612|
|418759|       15526|
|418461|       15103|
|410754|       16743|
|386707|       14313|
|305619|       12412|
+------+------------+
only showing top 20 rows



In [32]:
sc.stop()