In [None]:
!pip install pyspark

In [1]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType, StringType

spark=SparkSession.builder.appName('FordGoBike').getOrCreate()
spark
# https://www.kaggle.com/code/priyankabnl/fordgobike-trip-data

In [2]:
path="G:/My Drive/Develhope/develhope-Data5-Team3/Week_1/2017-fordgobike-tripdataa.csv"
df_pyspark=spark.read.csv(path,header=True, inferSchema=True)

In [None]:
type(df_pyspark)

In [None]:
df_pyspark.head(5)


In [3]:
df_pyspark=df_pyspark.withColumn('start_station_longitude',F.col('start_station_longitude').cast(FloatType()))\
    .withColumn('start_station_latitude',F.col('start_station_latitude').cast(FloatType()))\
    .withColumn('end_station_latitude',F.col('end_station_latitude').cast(FloatType()))\
    .withColumn('end_station_longitude',F.col('end_station_longitude').cast(FloatType()))
df_pyspark=df_pyspark.withColumnRenamed('_c4','start_am_pm').withColumnRenamed('_c9','end_am_pm')
df_pyspark.printSchema()

root
 |-- start_time: string (nullable = true)
 |-- start time hour: integer (nullable = true)
 |-- start time minute: integer (nullable = true)
 |-- start time seconds: integer (nullable = true)
 |-- start_am_pm: string (nullable = true)
 |-- end_time: string (nullable = true)
 |-- end_time hour: integer (nullable = true)
 |-- end_time minute: integer (nullable = true)
 |-- end_time seconds: integer (nullable = true)
 |-- end_am_pm: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_latitude: float (nullable = true)
 |-- start_station_longitude: float (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_latitude: float (nullable = true)
 |-- end_station_longitude: float (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- user_type: string (nullable = true)
 |-- member_birth_year: integer (nullable = true)
 |

In [None]:
df_pyspark.describe().show()


In [4]:
df_pyspark= df_pyspark.withColumn('start time hour',F.when(df_pyspark['start_am_pm']=='PM',df_pyspark["start time hour"]+12).otherwise(df_pyspark["start time hour"]))
df_pyspark= df_pyspark.withColumn('start time hour',F.when(df_pyspark['start time hour']==24,12).otherwise(df_pyspark["start time hour"]))

In [5]:
df_pyspark= df_pyspark.withColumn('end_time hour',F.when(df_pyspark['end_am_pm']=='PM',df_pyspark["end_time hour"]+12).otherwise(df_pyspark["end_time hour"]))
df_pyspark= df_pyspark.withColumn('end_time hour',F.when(df_pyspark['end_time hour']==24,12).otherwise(df_pyspark["end_time hour"]))

In [None]:
df_pyspark.filter(F.col('end_time hour')==24).show()

In [None]:
#fixing 1-digit values 
df_pyspark=df_pyspark.withColumn('start time hour',F.lpad(F.col('start time hour'),2,'0'))\
    .withColumn('start time minute',F.lpad(F.col('start time minute'),2,'0'))\
    .withColumn('start time seconds',F.lpad(F.col('start time seconds'),2,'0'))\
    .withColumn('end_time hour',F.lpad(F.col('end_time hour'),2,'0'))\
    .withColumn('end_time minute',F.lpad(F.col('end_time minute'),2,'0'))\
    .withColumn('end_time seconds',F.lpad(F.col('end_time seconds'),2,'0'))

In [6]:
#creating start time and end time columns with timestamp for easy comparison
df_pyspark=df_pyspark.withColumn('start time',F.to_timestamp(F.concat_ws(':',F.lpad(F.col('start time hour'),2,'0'),F.lpad(F.col('start time minute'),2,'0'),F.lpad(F.col('start time seconds'),2,'0'))))
df_pyspark=df_pyspark.withColumn('end time',F.to_timestamp(F.concat_ws(':',F.lpad(F.col('end_time hour'),2,'0'),F.lpad(F.col('end_time minute'),2,'0'),F.lpad(F.col('end_time seconds'),2,'0'))))

In [None]:
df_pyspark.show()

In [7]:
#swapping end time and start time values if start time > end time
df_pyspark=df_pyspark.withColumn('end time',F.when(df_pyspark['end time']<df_pyspark['start time'], df_pyspark['start time'])\
                                 .otherwise(df_pyspark['end time']))
df_pyspark=df_pyspark.withColumn('start time',F.when(df_pyspark['end time']==df_pyspark['start time'], 
                                                     F.to_timestamp(F.concat_ws(':','end_time hour','end_time minute','end_time seconds')))\
                                 .otherwise(df_pyspark['start time']))

In [None]:
# df_pyspark=df_pyspark.withColumn('start_time',F.concat_ws(':',F.lpad(F.hour(F.col('start time')),2,'0'),F.lapd(F.minute(F.col('start time')),2,'0'),F.lapd(F.second(F.col('start time')),2,'0')))#\
    #.withColumn('end_time',F.concat_ws(':',F.lpad(F.hour('end time'),2,'0'),F.lpad(F.minute('end time'),2,'0'),F.lpad(F.second('end time'),2,'0')))

In [8]:
df_pyspark=df_pyspark.withColumn('start_time',df_pyspark['start time']).withColumn('end_time',df_pyspark['end time'])

In [None]:
df_pyspark.columns

In [9]:
#Dropping 'start time' and 'end time' columns
df_pyspark=df_pyspark.drop(*['start time','end time','start time hour',
 'start time minute',
 'start time seconds',
 'end_time hour',
 'end_time minute',
 'end_time seconds',])
df_pyspark.show()

+-------------------+-----------+-------------------+---------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+-------+----------+-----------------+-------------+-----------+
|         start_time|start_am_pm|           end_time|end_am_pm|start_station_id|  start_station_name|start_station_latitude|start_station_longitude|end_station_id|    end_station_name|end_station_latitude|end_station_longitude|bike_id| user_type|member_birth_year|member_gender|     pyment|
+-------------------+-----------+-------------------+---------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+-------+----------+-----------------+-------------+-----------+
|2023-05-07 15:12:50|         PM|2023-05-07 16:57:40|       PM|              74|Laguna St at Haye...|             37.776436|   

In [None]:
!pip install haversine

In [10]:
from haversine import haversine

def haversine_f(lat1, lon1, lat2, lon2):
    return haversine( (lat1, lon1), (lat2, lon2),unit='m',normalize=True )

haversine_udf = F.udf(haversine_f)

In [11]:
#Calculate haversine distance(Onur)
df_pyspark=df_pyspark.withColumn('haversine_distance', 
                    haversine_udf(F.col('start_station_latitude'), F.col('start_station_longitude'), 
                                  F.col('end_station_latitude'), F.col('end_station_longitude'))
                    )

In [12]:
#Calculate haversine distance in meters(Uros)
df_pyspark=df_pyspark.withColumn('haversine_distance',
                                 haversine_udf('start_station_latitude', 
                                               'start_station_longitude', 
                                               'end_station_latitude', 
                                               'end_station_longitude'))

In [None]:
#assign timestamp to start_time and end_time
#Calculate 'Diff_in_seconds' 
#Calculate 'Diff_in_minutes' 
#Calculate 'Trip_cost' 
df_pyspark=df_pyspark.withColumn('start_time',F.to_timestamp('start_time','HH:mm:ss'))\
    .withColumn('end_time',F.to_timestamp('end_time','HH:mm:ss'))\
    .withColumn('Diff_in_seconds',F.col('end_time').cast('long')-F.col('start_time').cast('long'))\
    .withColumn('Diff_in_minutes',(F.col('Diff_in_seconds')/60))\
    .withColumn('Trip_cost',(F.col('Diff_in_minutes')*0.35))

df_pyspark.printSchema()


In [None]:
df_pyspark.groupBy("bike_id")\
    .agg(F.sum("haversine_distance").alias("sum_distance")).sort(F.desc("sum_distance"))

In [None]:
df_pyspark.show()

In [13]:
df_pyspark.write.option("header",True).mode('overwrite').csv('D:/mycsv')

Py4JJavaError: An error occurred while calling o181.csv.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:640)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:847)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [None]:
df_pyspark.show()