In [None]:
import math
import os
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType

# Соколов Никита ИУ6-54Б Вариант 2

In [1]:
import math
import os
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType

In [2]:
# Конфигурация системных переменных
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-17-openjdk'
os.environ['PATH'] = f"{os.environ['JAVA_HOME']}/bin:{os.environ['PATH']}"
os.environ['_JAVA_OPTIONS'] = '-Dhadoop.security.authentication=simple -Duser.name=spark'

# Сборка сессии Spark
try:
    spark_session.stop()
except:
    pass

spark_session = SparkSession.builder \
    .appName('CitibikeAnalysis_HW2') \
    .getOrCreate()

Picked up _JAVA_OPTIONS: -Dhadoop.security.authentication=simple -Duser.name=spark
Picked up _JAVA_OPTIONS: -Dhadoop.security.authentication=simple -Duser.name=spark
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/06 16:49:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Чтение исходных данных
raw_bike_data = spark_session.read \
    .options(header=True, inferSchema=True) \
    .csv('./201902-citibike-tripdata_1.csv')

print(f"Список столбцов: {raw_bike_data.columns}")

[Stage 1:>                                                          (0 + 8) / 8]

Список столбцов: ['tripduration', 'starttime', 'stoptime', 'start station id', 'start station name', 'start station latitude', 'start station longitude', 'end station id', 'end station name', 'end station latitude', 'end station longitude', 'bikeid', 'usertype', 'birth year', 'gender']


                                                                                

In [4]:
raw_bike_data.select(raw_bike_data.columns[:5]).show(5)

+------------+--------------------+--------------------+----------------+--------------------+
|tripduration|           starttime|            stoptime|start station id|  start station name|
+------------+--------------------+--------------------+----------------+--------------------+
|         219|2019-02-01 00:00:...|2019-02-01 00:03:...|          3494.0|E 115 St & Lexing...|
|         143|2019-02-01 00:00:...|2019-02-01 00:02:...|           438.0| St Marks Pl & 1 Ave|
|         296|2019-02-01 00:01:...|2019-02-01 00:06:...|          3571.0|Bedford Ave & Ber...|
|         478|2019-02-01 00:01:...|2019-02-01 00:09:...|           167.0|     E 39 St & 3 Ave|
|         225|2019-02-01 00:01:...|2019-02-01 00:05:...|          3458.0|     W 55 St & 6 Ave|
+------------+--------------------+--------------------+----------------+--------------------+
only showing top 5 rows


In [5]:
@F.udf(DoubleType())
def get_geo_distance(lat1_deg, lon1_deg, lat2_deg, lon2_deg):
    # Радиус Земли (км)
    R_EARTH = 6371.0

    # Перевод градусов в радианы
    phi1, lam1 = math.radians(lat1_deg), math.radians(lon1_deg)
    phi2, lam2 = math.radians(lat2_deg), math.radians(lon2_deg)

    # Разницы координат
    d_phi = phi2 - phi1
    d_lam = lam2 - lam1

    # Гаверсинус
    hav_theta = math.sin(d_phi / 2)**2 + \
                math.cos(phi1) * math.cos(phi2) * math.sin(d_lam / 2)**2
    
    angular_dist = 2 * math.atan2(math.sqrt(hav_theta), math.sqrt(1 - hav_theta))

    return R_EARTH * angular_dist

In [6]:
# Формирование выборки и рассчет дистанции
# Исключаем кольцевые маршруты (старт == финиш)

trips_with_dist = raw_bike_data.select(
    F.col('start station latitude').alias('lat_start'),
    F.col('start station longitude').alias('lon_start'),
    F.col('end station latitude').alias('lat_end'),
    F.col('end station longitude').alias('lon_end'),
    'start station name', 'end station name'
).filter(
    F.col('start station name') != F.col('end station name')
).withColumn(
    'dist_km',
    get_geo_distance('lat_start', 'lon_start', 'lat_end', 'lon_end')
)

trips_with_dist.select('lat_start', 'lat_end', 'dist_km').show(5)

[Stage 3:>                                                          (0 + 1) / 1]

+-----------------+-----------------+-------------------+
|        lat_start|          lat_end|            dist_km|
+-----------------+-----------------+-------------------+
|        40.797911|       40.8014866| 0.4301604184631463|
|      40.72779126|       40.7284186| 0.1436399429400831|
|        40.676368|        40.678045| 0.8217251348316926|
|       40.7489006|      40.75640548| 1.4431216853335014|
|40.76309387270797|40.76132983124814|0.23194966788238217|
+-----------------+-----------------+-------------------+
only showing top 5 rows


                                                                                

In [7]:
# Вычисление статистических показателей
stats_df = trips_with_dist.agg(
    F.max('dist_km').alias('Максимальная_Дистанция'),
    F.mean('dist_km').alias('Средняя_Дистанция'),
    F.stddev('dist_km').alias('Стандартное_Отклонение'),
    F.expr('percentile_approx(dist_km, 0.5)').alias('Медиана')
)

stats_df.show()



+----------------------+------------------+----------------------+------------------+
|Максимальная_Дистанция| Средняя_Дистанция|Стандартное_Отклонение|           Медиана|
+----------------------+------------------+----------------------+------------------+
|    15.326431486694263|1.6403749148455133|     1.288677175706375|1.2628745712441507|
+----------------------+------------------+----------------------+------------------+



                                                                                