In [0]:
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .appName("MyApp") \
    .getOrCreate()

In [0]:
spark

In [0]:
df = spark.read.table("fordgobike_tripdata__1__csv")

In [0]:
df.filter('start_time > end_time').show()

+----------+---+---+---+--------+---+---+---+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+-------+----------+-----------------+-------------+-----------+
|start_time|_c1|_c2|_c3|end_time|_c5|_c6|_c7|start_station_id|  start_station_name|start_station_latitude|start_station_longitude|end_station_id|    end_station_name|end_station_latitude|end_station_longitude|bike_id| user_type|member_birth_year|member_gender|     pyment|
+----------+---+---+---+--------+---+---+---+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+-------+----------+-----------------+-------------+-----------+
|        12| 32|  7| AM|       1|  0|  0| AM|              20|Mechanics Monumen...|               37.7913|             -122.39905|            58|Market St at 10th St|            37.

In [0]:
from pyspark.sql.functions import concat, to_timestamp, lit, when


df = df.withColumn('hour_start_str', df.start_time.cast("string"))
df = df.withColumn('minute_start_str', df._c1.cast("string"))
df = df.withColumn('second_start_str', df._c2.cast("string"))
df = df.withColumn('time_start_str', concat(df.hour_start_str, lit(':'), df.minute_start_str, lit(':'), df.second_start_str))
df = df.withColumn('time_start', to_timestamp(df.time_start_str, 'HH:mm:ss'))
df = df.drop('time_start_str')


df = df.withColumn('hour_end_str', df.end_time.cast("string"))
df = df.withColumn('minute_end_str', df._c5.cast("string"))
df = df.withColumn('second_end_str', df._c6.cast("string"))
df = df.withColumn('time_end_str', concat(df.hour_end_str, lit(':'), df.minute_end_str, lit(':'), df.second_end_str))
df = df.withColumn('time_end', to_timestamp(df.time_end_str, 'HH:mm:ss'))
df = df.drop('time_end_str')


In [0]:
df.filter('time_end > time_start').show()

+----------+---+---+---+--------+---+---+---+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+-------+----------+-----------------+-------------+-----------+--------------+----------------+----------------+-------------------+------------+--------------+--------------+-------------------+
|start_time|_c1|_c2|_c3|end_time|_c5|_c6|_c7|start_station_id|  start_station_name|start_station_latitude|start_station_longitude|end_station_id|    end_station_name|end_station_latitude|end_station_longitude|bike_id| user_type|member_birth_year|member_gender|     pyment|hour_start_str|minute_start_str|second_start_str|         time_start|hour_end_str|minute_end_str|second_end_str|           time_end|
+----------+---+---+---+--------+---+---+---+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---

In [0]:
df_new = df.select(
    "*",
    when(df.time_end > df.time_start, df.time_end).otherwise(df.time_start).alias("time_end_new"),
    when(df.time_end < df.time_start, df.time_end).otherwise(df.time_start).alias("time_start_new")
)
df_new = df_new.drop('time_start')
df_new = df_new.drop('time_end')
df_new.show()

+----------+---+---+---+--------+---+---+---+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+-------+----------+-----------------+-------------+-----------+--------------+----------------+----------------+------------+--------------+--------------+-------------------+-------------------+
|start_time|_c1|_c2|_c3|end_time|_c5|_c6|_c7|start_station_id|  start_station_name|start_station_latitude|start_station_longitude|end_station_id|    end_station_name|end_station_latitude|end_station_longitude|bike_id| user_type|member_birth_year|member_gender|     pyment|hour_start_str|minute_start_str|second_start_str|hour_end_str|minute_end_str|second_end_str|       time_end_new|     time_start_new|
+----------+---+---+---+--------+---+---+---+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---

In [0]:
from pyspark.sql.functions import udf
from math import radians, sin, cos, sqrt, atan2

# define a Haversine function
def haversine(lat1, lon1, lat2, lon2):
    R = 6371 # radius of the earth in km
    lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * atan2(sqrt(a), sqrt(1-a))
    distance = R * c
    return distance

# register the function as a UDF
haversine_udf = udf(haversine)

# create a new column with the distance
df_new = df_new.withColumn("distance", haversine_udf("start_station_latitude", "start_station_longitude", "end_station_latitude", "end_station_longitude"))


In [0]:
df_new.show()

+----------+---+---+---+--------+---+---+---+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+-------+----------+-----------------+-------------+-----------+--------------+----------------+----------------+------------+--------------+--------------+-------------------+-------------------+------------------+
|start_time|_c1|_c2|_c3|end_time|_c5|_c6|_c7|start_station_id|  start_station_name|start_station_latitude|start_station_longitude|end_station_id|    end_station_name|end_station_latitude|end_station_longitude|bike_id| user_type|member_birth_year|member_gender|     pyment|hour_start_str|minute_start_str|second_start_str|hour_end_str|minute_end_str|second_end_str|       time_end_new|     time_start_new|          distance|
+----------+---+---+---+--------+---+---+---+----------------+--------------------+----------------------+-----------------------+--------------+-------