In [1]:
import pandas as pd
from haversine import haversine, Unit
import plotly.express as px
import pyspark
import pyspark.sql.functions as f
from pyspark.sql.functions import when
from pyspark.sql import *

In [2]:
spark = SparkSession.builder.master('local[*]').appName('ikay').getOrCreate()

In [None]:
df = pd.read_csv('201902_fordgobike_tripdata.csv')

In [4]:
df.head()

Unnamed: 0,duration_sec,start_time,end_time,start_station_id,start_station_name,start_station_latitude,start_station_longitude,end_station_id,end_station_name,end_station_latitude,end_station_longitude,bike_id,user_type,member_birth_year,member_gender,bike_share_for_all_trip
0,52185,2019-02-28 17:32:10.1450,2019-03-01 08:01:55.9750,21.0,Montgomery St BART Station (Market St at 2nd St),37.789625,-122.400811,13.0,Commercial St at Montgomery St,37.794231,-122.402923,4902,Customer,1984.0,Male,No
1,42521,2019-02-28 18:53:21.7890,2019-03-01 06:42:03.0560,23.0,The Embarcadero at Steuart St,37.791464,-122.391034,81.0,Berry St at 4th St,37.77588,-122.39317,2535,Customer,,,No
2,61854,2019-02-28 12:13:13.2180,2019-03-01 05:24:08.1460,86.0,Market St at Dolores St,37.769305,-122.426826,3.0,Powell St BART Station (Market St at 4th St),37.786375,-122.404904,5905,Customer,1972.0,Male,No
3,36490,2019-02-28 17:54:26.0100,2019-03-01 04:02:36.8420,375.0,Grove St at Masonic Ave,37.774836,-122.446546,70.0,Central Ave at Fell St,37.773311,-122.444293,6638,Subscriber,1989.0,Other,No
4,1585,2019-02-28 23:54:18.5490,2019-03-01 00:20:44.0740,7.0,Frank H Ogawa Plaza,37.804562,-122.271738,222.0,10th Ave at E 15th St,37.792714,-122.24878,4898,Subscriber,1974.0,Male,Yes


###  create a column to show distance

In [5]:
df['distance'] = df.apply(lambda x:haversine((x['start_station_latitude'],x['start_station_longitude']),
                                             (x['end_station_latitude'],x['end_station_longitude'])),axis=1)

In [6]:
df['distance']=round(df['distance'],2)

In [8]:
df.head()

Unnamed: 0,duration_sec,start_time,end_time,start_station_id,start_station_name,start_station_latitude,start_station_longitude,end_station_id,end_station_name,end_station_latitude,end_station_longitude,bike_id,user_type,member_birth_year,member_gender,bike_share_for_all_trip,distance
0,52185,2019-02-28 17:32:10.1450,2019-03-01 08:01:55.9750,21.0,Montgomery St BART Station (Market St at 2nd St),37.789625,-122.400811,13.0,Commercial St at Montgomery St,37.794231,-122.402923,4902,Customer,1984.0,Male,No,0.54
1,42521,2019-02-28 18:53:21.7890,2019-03-01 06:42:03.0560,23.0,The Embarcadero at Steuart St,37.791464,-122.391034,81.0,Berry St at 4th St,37.77588,-122.39317,2535,Customer,,,No,1.74
2,61854,2019-02-28 12:13:13.2180,2019-03-01 05:24:08.1460,86.0,Market St at Dolores St,37.769305,-122.426826,3.0,Powell St BART Station (Market St at 4th St),37.786375,-122.404904,5905,Customer,1972.0,Male,No,2.7
3,36490,2019-02-28 17:54:26.0100,2019-03-01 04:02:36.8420,375.0,Grove St at Masonic Ave,37.774836,-122.446546,70.0,Central Ave at Fell St,37.773311,-122.444293,6638,Subscriber,1989.0,Other,No,0.26
4,1585,2019-02-28 23:54:18.5490,2019-03-01 00:20:44.0740,7.0,Frank H Ogawa Plaza,37.804562,-122.271738,222.0,10th Ave at E 15th St,37.792714,-122.24878,4898,Subscriber,1974.0,Male,Yes,2.41


In [7]:
df.to_csv('fordgo_bikes.csv')

In [3]:
bikes=spark.read.option('header','true').csv('fordgo_bikes.csv')

###  calculate the duration in seconds for each trip

In [4]:
bikes=bikes.withColumn("start_time",f.to_timestamp("start_time"))\
.withColumn("end_time",f.to_timestamp("end_time")).withColumn("end_time",f.to_timestamp("end_time"))

In [5]:
bikes= bikes.withColumn('Duration_Secs', f.unix_timestamp("end_time") - f.unix_timestamp('start_time'))

In [6]:
bikes.select('Duration_secs').show()

+-------------+
|Duration_secs|
+-------------+
|        52185|
|        42522|
|        61855|
|        36490|
|         1586|
|         1793|
|         1147|
|         1616|
|         1571|
|         1050|
|          458|
|          506|
|         1177|
|          915|
|          395|
|          209|
|          548|
|          675|
|          558|
|          874|
+-------------+
only showing top 20 rows



### assuming each minute cost 0.35 cents calculate the cost of each trip

In [7]:
bikes=bikes.withColumn('cost',f.round((f.col('Duration_secs')/60)*0.35,2))

In [8]:
bikes.select('cost').show()

+------+
|  cost|
+------+
|304.41|
|248.05|
|360.82|
|212.86|
|  9.25|
| 10.46|
|  6.69|
|  9.43|
|  9.16|
|  6.13|
|  2.67|
|  2.95|
|  6.87|
|  5.34|
|   2.3|
|  1.22|
|   3.2|
|  3.94|
|  3.26|
|   5.1|
+------+
only showing top 20 rows



### calculate the total distance for each bike and list the top 10

In [50]:
bikes.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- duration_sec: string (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- end_time: timestamp (nullable = true)
 |-- start_station_id: string (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_latitude: string (nullable = true)
 |-- start_station_longitude: string (nullable = true)
 |-- end_station_id: string (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_latitude: string (nullable = true)
 |-- end_station_longitude: string (nullable = true)
 |-- bike_id: string (nullable = true)
 |-- user_type: string (nullable = true)
 |-- member_birth_year: string (nullable = true)
 |-- member_gender: string (nullable = true)
 |-- bike_share_for_all_trip: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- Duration_Secs: long (nullable = true)
 |-- cost: double (nullable = true)
 |-- age: double (nullable = true)
 |-- age group: string (nullable = fals

In [37]:
total_distance=bikes.groupBy('bike_id').agg({'distance':'sum'}).withColumnRenamed('sum(distance)','TOTAL BIKE DISTANCE')\
.withColumnRenamed('bike_id','BIKE ID')

In [38]:
total_distance=total_distance.withColumn('TOTAL BIKE DISTANCE',f.round('TOTAL BIKE DISTANCE',2))
total_distance.show()

+-------+-------------------+
|BIKE ID|TOTAL BIKE DISTANCE|
+-------+-------------------+
|   6240|              52.35|
|   6194|             166.37|
|   5645|                5.4|
|   6613|              36.59|
|   1572|                9.7|
|   5325|             297.28|
|    675|               59.4|
|    829|               24.1|
|   1436|              65.22|
|   2136|              57.15|
|   4821|             291.93|
|   2069|              51.98|
|   3606|               8.41|
|   5925|             102.59|
|   4937|             178.13|
|   1159|              59.52|
|    691|               3.56|
|   1512|              30.93|
|   1090|              52.05|
|   3414|              18.55|
+-------+-------------------+
only showing top 20 rows



In [39]:
top_10_bikes=total_distance.orderBy('TOTAL BIKE DISTANCE',ascending=False).limit(10)
top_10_bikes.show()

+-------+-------------------+
|BIKE ID|TOTAL BIKE DISTANCE|
+-------+-------------------+
|   4814|              331.2|
|   4834|             324.12|
|   4794|             321.92|
|   5145|             318.38|
|   5014|             314.87|
|   4956|             313.25|
|   4422|             312.37|
|   5351|             310.78|
|   4742|             309.59|
|   4778|              308.6|
+-------+-------------------+



In [40]:
top_10=top_10_bikes.toPandas()#convert dataframe to pandas

In [41]:
top_10.dtypes

BIKE ID                 object
TOTAL BIKE DISTANCE    float64
dtype: object

In [42]:
px.scatter(top_10,x='BIKE ID',y='TOTAL BIKE DISTANCE',title='Top 10 Bikes By Distance',color='BIKE ID')

### Calculate the number of trips for each start station list top 10 and find the ratio of using as male or female

In [31]:
top_stations=bikes.groupBy('start_station_id','start_station_name').agg({'start_station_id':'count'})

In [34]:
top_stations=top_stations.withColumnRenamed('start_station_name','STATION NAME').withColumnRenamed('start_station_id','STATION ID').withColumnRenamed('count(start_station_id)','TRIPS').orderBy('TRIPS',ascending=False).limit(10)

In [35]:
top_stations.show()

+----------+--------------------+-----+
|STATION ID|        STATION NAME|TRIPS|
+----------+--------------------+-----+
|      58.0|Market St at 10th St| 3904|
|      67.0|San Francisco Cal...| 3544|
|      81.0|  Berry St at 4th St| 3052|
|      21.0|Montgomery St BAR...| 2895|
|       3.0|Powell St BART St...| 2760|
|      15.0|San Francisco Fer...| 2710|
|      30.0|San Francisco Cal...| 2703|
|       5.0|Powell St BART St...| 2327|
|      22.0|Howard St at Beal...| 2293|
|      16.0|Steuart St at Mar...| 2283|
+----------+--------------------+-----+



In [75]:
#creating a pandas dataframe for visualization
stations=top_stations.toPandas()

In [43]:
px.bar(stations,y='STATION NAME',x='TRIPS',title='TOP 10 START STATIONS',color='STATION NAME')

#### the ratio of using as male or female.

In [52]:
gender_count=bikes.groupBy('member_gender').agg({'member_gender':'count'})

In [53]:
total_count=bikes.select('member_gender').count()

In [54]:
gender_count=gender_count.withColumnRenamed('count(member_gender)','COUNTS').withColumnRenamed('member_gender','GENDER')

In [64]:
gender_ratio=gender_count.withColumn('RATIO',f.round((100*gender_count['COUNTS']/total_count),2))


In [74]:
#creating a pandas dataframe for visualization
ratio=gender_ratio.toPandas()
ratio=ratio.dropna()

In [65]:
px.pie(ratio,values='RATIO',names="GENDER",title="USER SEGMENTATION BY GENDER")

### Make a comparison to find the percentage of usage for customer and subscriber

In [25]:
user_count = bikes.groupBy('user_type').agg(f.count('user_type').alias('count_users'))

In [26]:
total_user_count = bikes.count()

In [27]:
user_percentage=user_count.withColumn('percent', f.round((100 * user_count['count_users'] / total_user_count),2))

In [28]:
user_percentage.show()

+----------+-----------+-------+
| user_type|count_users|percent|
+----------+-----------+-------+
|Subscriber|     163544|  89.17|
|  Customer|      19868|  10.83|
+----------+-----------+-------+



### Calculate the age of all users and show the relation between the distance and the age

In [66]:
import datetime




In [67]:
current_year = int(datetime.date.today().strftime('%Y'))
bikes = bikes.withColumn('age', current_year -bikes['member_birth_year'])

In [31]:
bikes.select('age').show()

+----+
| age|
+----+
|39.0|
|null|
|51.0|
|34.0|
|49.0|
|64.0|
|40.0|
|34.0|
|35.0|
|31.0|
|27.0|
|30.0|
|33.0|
|null|
|35.0|
|30.0|
|42.0|
|48.0|
|33.0|
|45.0|
+----+
only showing top 20 rows



####  relation between age and distance

In [68]:
#create a category to group the ages
bikes=bikes.withColumn('AGE GROUP',when(bikes.age <20,'under 20').when((bikes.age>20) & (bikes.age<=25),'21-25')\
                      .when((bikes.age>25) &(bikes.age<=30),'26-30').when((bikes.age>30)&( bikes.age<=35),'31-35')\
                      .when((bikes.age>35)&(bikes.age<=40),'36-40').when((bikes.age>40) &(bikes.age<=45),'41-45')\
                      .when((bikes.age>45)&(bikes.age<=50),'45-50').when((bikes.age>50) &(bikes.age<=55),'51-55')\
                      .when((bikes.age>55) &(bikes.age<=60),'56-60').when((bikes.age>60) &(bikes.age<=65),'61-65')\
                      .when((bikes.age>65) &(bikes.age<=70),'65-70').otherwise('over 70'))

In [69]:
bikes.select('AGE GROUP').show()

+---------+
|AGE GROUP|
+---------+
|    36-40|
|  over 70|
|    51-55|
|    31-35|
|    45-50|
|    61-65|
|    36-40|
|    31-35|
|    31-35|
|    31-35|
|    26-30|
|    26-30|
|    31-35|
|  over 70|
|    31-35|
|    26-30|
|    41-45|
|    45-50|
|    31-35|
|    41-45|
+---------+
only showing top 20 rows



In [70]:
age_distance=bikes.groupBy('AGE GROUP').agg({'distance':'sum'})

In [71]:
age_distance=age_distance.withColumnRenamed('sum(distance)',"DISTANCE").orderBy('DISTANCE',ascending=False)

In [76]:
#creating a pandas dataframe for visualization
distance_age=age_distance.toPandas()


Unnamed: 0,AGE GROUP,DISTANCE
0,31-35,78729.3
1,36-40,62807.02
2,26-30,51567.72
3,41-45,36531.17
4,45-50,20458.35


In [77]:
px.bar(distance_age,x='AGE GROUP',y='DISTANCE',title='AGE GROUP BY DISTANCES',color='DISTANCE')

###  Calculate the total cost for all customers and all subscribers

In [83]:
user_cost=bikes.groupBy('user_type').agg({'cost':'sum'})

In [84]:
user_cost=user_cost.withColumnRenamed('sum(cost)','TOTAL COST').withColumnRenamed('user_type','USER TYPE')
user_cost=user_cost.withColumn('TOTAL COST',f.round('TOTAL COST',2))

In [85]:
user_cost.show()

+----------+----------+
| USER TYPE|TOTAL COST|
+----------+----------+
|Subscriber| 611300.65|
|  Customer| 166076.67|
+----------+----------+



In [86]:
#creating a pandas dataframe for visualization
cost=user_cost.toPandas()

In [89]:
px.bar(cost,x='USER TYPE',y='TOTAL COST',title='COST FOR USERS AND SUBSCRIBERS',color='TOTAL COST')

### Analyze how often bikes are being rented ( morning - afternoon - evening) and at what times of day

In [90]:
bikes= bikes.withColumn('hour',f.hour('start_time'))

In [91]:
bikes=bikes.withColumn('DAY PERIOD',when(bikes.hour<12,'morning').when(bikes.hour.between(12,17),'afternoon').otherwise('evening'))

In [92]:
bikes.select('hour','DAY PERIOD').show()

+----+----------+
|hour|DAY PERIOD|
+----+----------+
|  17| afternoon|
|  18|   evening|
|  12| afternoon|
|  17| afternoon|
|  23|   evening|
|  23|   evening|
|  23|   evening|
|  23|   evening|
|  23|   evening|
|  23|   evening|
|  23|   evening|
|  23|   evening|
|  23|   evening|
|  23|   evening|
|  23|   evening|
|  23|   evening|
|  23|   evening|
|  23|   evening|
|  23|   evening|
|  23|   evening|
+----+----------+
only showing top 20 rows



In [105]:
day_time=bikes.groupBy('user_type','DAY PERIOD','hour').agg({'DAY PERIOD':'count'})

In [106]:
day_time=day_time.withColumnRenamed('user_type','USER TYPE').withColumnRenamed('hour','HOUR')\
.withColumnRenamed('count(DAY PERIOD)','COUNT').orderBy('COUNT')

In [107]:
#create pandas dataframe for visuals
day=day_time.toPandas()

In [112]:
fig = px.bar(day, x="HOUR", y="COUNT", color="DAY PERIOD", barmode="group"
         
                        )
fig.update_traces(width=1)

### What is the ratio of payment using cc or app wallet?

In [115]:
payment= spark.read.option("header","true").csv("2017-fordgobike-tripdata.csv")

In [116]:
payment.show()

+----------+--------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+-------+----------+-----------------+-------------+-----------+
|start_time|end_time|start_station_id|  start_station_name|start_station_latitude|start_station_longitude|end_station_id|    end_station_name|end_station_latitude|end_station_longitude|bike_id| user_type|member_birth_year|member_gender|     pyment|
+----------+--------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+-------+----------+-----------------+-------------+-----------+
|   57:39.7| 12:50.2|              74|Laguna St at Haye...|           37.77643482|            -122.426244|            43|San Francisco Pub...|          37.7787677|         -122.4159292|     96|  Customer|             1987|         Male|credit card|
|   

In [117]:
pay_count=payment.select('pyment').count()

In [43]:
pay_count

519700

In [118]:
pay_group=payment.groupBy('pyment').agg({'pyment':'count'})
pay_group=pay_group.withColumnRenamed('count(pyment)','payment count').orderBy('payment count',ascending=False)

In [119]:
pay_group.show()

+-----------+-------------+
|     pyment|payment count|
+-----------+-------------+
| app wallet|       260061|
|credit card|       259639|
+-----------+-------------+



In [142]:
pay_group=pay_group.withColumn('ratio',f.round(pay_group['payment count']/pay_count,4))\
.withColumnRenamed('pyment','PAY TYPE').withColumnRenamed('payment count','COUNT')

In [143]:
pay_group.show()

+-----------+------+------+
|   PAY TYPE| COUNT| ratio|
+-----------+------+------+
| app wallet|260061|0.5004|
|credit card|259639|0.4996|
+-----------+------+------+



### What is the preferred way to pay for customers and subscriber?

In [125]:
preferred_payment=payment.groupBy('pyment','user_type').agg({'pyment':'count'})
preferred_payment=preferred_payment.withColumnRenamed('pyment','PAY TYPE')\
.withColumnRenamed('user_type','USER TYPE')\
.withColumnRenamed('count(pyment)','COUNT').orderBy('COUNT',ascending=False)

In [126]:
preferred_payment.show()

+-----------+----------+------+
|   PAY TYPE| USER TYPE| COUNT|
+-----------+----------+------+
| app wallet|Subscriber|204727|
|credit card|Subscriber|204503|
| app wallet|  Customer| 55334|
|credit card|  Customer| 55136|
+-----------+----------+------+



In [127]:
P_PAY=preferred_payment.toPandas()

In [138]:
px.bar(P_PAY,x='PAY TYPE',color='USER TYPE',y='COUNT',barmode='group')

In [152]:
px.sunburst(P_PAY,path=['PAY TYPE','USER TYPE'],values='COUNT')


In [153]:
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

windowDept = Window.partitionBy(['USER TYPE']).orderBy(col("COUNT").desc())

preferred_payment.withColumn("row",row_number().over(windowDept)) \
  .filter(col("row") == 1).drop("row") \
  .show()

+----------+----------+------+
|  PAY TYPE| USER TYPE| COUNT|
+----------+----------+------+
|app wallet|  Customer| 55334|
|app wallet|Subscriber|204727|
+----------+----------+------+



In [62]:
pd_bike=bikes.toPandas()

In [63]:
pd_bike.to_csv('fordgobike 2017 trip data.csv')

In [None]:
-