In [1]:
from pyspark.sql.types import *

schema = StructType([
    StructField("tripduration", IntegerType()),
    StructField("starttime", DateType()),
    StructField("stoptime", DateType()),
    StructField("start station id", IntegerType()),
    StructField("start station name", StringType()),
    StructField("start station latitude", DecimalType(11, 9)),
    StructField("start station longitude", DecimalType(11, 9)),
    StructField("end station id", IntegerType()),
    StructField("end station name", StringType()),
    StructField("end station latitude", DecimalType(11, 9)),
    StructField("end station longitude", DecimalType(11, 9)),
    StructField("bikeid", IntegerType()),
    StructField("usertype", StringType()),
    StructField("birth year", IntegerType()),
    StructField("gender", IntegerType())
])

bike = sqlContext.read.format('csv').options(header='true', nullValue='\\N').schema(schema).load('/mnt/bike/newyork')

In [2]:
from pyspark.sql.functions import mean, min, max

bike.select([mean('tripduration'), min('tripduration'), max('tripduration')]).show()

In [3]:
# Stolen blatentnly from https://stackoverflow.com/a/51633111/404006 
# Monthly Data
from pyspark import *
from pyspark.sql import functions as F
aggregatebike = bike.groupBy(F.year("starttime").alias("year"), F.month("starttime").alias("month"), "bikeid").agg(F.mean("tripduration"), F.count("tripduration").alias("count")).sort(["year", "month", "count"], ascending=False)
display(aggregatebike)

year,month,bikeid,avg(tripduration),count
2019,2,35619,865.7807486631016,561
2019,2,35612,812.8541266794625,521
2019,2,35539,1002.8378378378378,518
2019,2,35647,907.5271317829456,516
2019,2,35624,897.1284584980237,506
2019,2,35804,871.9668049792531,482
2019,2,35695,823.4074844074844,481
2019,2,35559,864.2651356993737,479
2019,2,35761,857.292372881356,472
2019,2,34870,828.0170212765958,470


In [4]:
#Yearly Data
aggregatebike = bike.where(F.year("starttime") == 2018).groupBy(F.year("starttime").alias("year"), "bikeid").agg(F.mean("tripduration"), F.count("tripduration").alias("count")).sort(["year", "count"], ascending=False)
display(aggregatebike)

year,bikeid,avg(tripduration),count
2018,30657,869.991963661775,2862
2018,32124,837.6561583577712,2728
2018,32090,788.3671961581086,2707
2018,32036,995.577265973254,2692
2018,32541,892.9239089891831,2681
2018,32195,834.1423677198325,2627
2018,32606,915.9151561897417,2593
2018,30692,835.3154024767802,2584
2018,31108,825.6917701863354,2576
2018,30315,828.4202334630351,2570


In [5]:
#All Time Data
aggregatebike = bike.groupBy("bikeid").agg(F.mean("tripduration"), F.count("tripduration").alias("count")).sort("count", ascending=False)
display(aggregatebike)

bikeid,avg(tripduration),count
18104,873.9685520361991,8840
15432,1125.7282571132357,8681
17955,933.789558603204,8677
20233,926.4413737280296,8648
18442,900.39765878535,8628
17526,858.4072948328268,8554
15731,893.3093036863663,8545
20174,970.7953134153486,8535
16254,866.6826889176166,8509
16260,920.7079510703364,8502


In [6]:
#All Time Last, First Ride
sortbike = bike.select("bikeid", "starttime").where(F.col("bikeid") == 18104)
df1 = sortbike.sort("starttime", ascending=True).first()
df2 = sortbike.sort("starttime", ascending=False).first()

#Year Last First Ride
sortbike = bike.select("bikeid", "starttime").where(F.col("bikeid") == 30657)
df3 = sortbike.sort("starttime", ascending=True).first()
df4 = sortbike.sort("starttime", ascending=False).first()

#Month Last First Ride
sortbike = bike.select("bikeid", "starttime").where(F.col("bikeid") == 35619)
df5 = sortbike.sort("starttime", ascending=True).first()
df6 = sortbike.sort("starttime", ascending=False).first()

display([df1, df2, df3, df4, df5, df6])



bikeid,starttime
18104,2013-06-01
18104,2019-02-28
30657,2017-08-31
30657,2019-02-28
35619,2019-01-20
35619,2019-02-28


In [7]:
from pyspark.sql import Window
sortbike = bike.select("bikeid", "starttime").where(F.col("bikeid") == 35619)
df = sortbike.groupBy("bikeid", "starttime").count()
df = df.withColumn("daily_average", F.avg("count").over(Window.partitionBy(F.window("starttime", "30 days"))))
display(df)


bikeid,starttime,count,daily_average
35619,2019-01-20,19,19.75
35619,2019-01-29,14,19.75
35619,2019-02-10,31,19.75
35619,2019-01-28,11,19.75
35619,2019-02-09,20,19.75
35619,2019-02-11,24,19.75
35619,2019-01-25,31,19.75
35619,2019-02-07,32,19.75
35619,2019-02-12,14,19.75
35619,2019-01-26,25,19.75


In [8]:
sortbike = bike.select("bikeid", "starttime", "tripduration").where(F.col("bikeid") == 35619).where(F.year("starttime") == 2019).where(F.month("starttime") == 2)
totalmiles = ((sortbike.select("tripduration").groupby().sum("tripduration").collect()[0][0])/60/60)*7.456
totalmiles