In [None]:
import numpy as np
import pandas as pd
from pandas import Series , DataFrame
import matplotlib.pyplot as plt
import plotly.express as px

In [None]:
import glob
glob.glob('C:\\Users\\Ivan Francis\\Desktop\\GMU\\AIT 580\\Project_AIT580\\Excel_sheets\\JC*.csv')

In [None]:
all_dfs = []
for one_filename in glob.glob('C:\\Users\\Ivan Francis\\Desktop\\GMU\\AIT 580\\Project_AIT580\\Excel_sheets\\JC*.csv'):
    print(f'Loading {one_filename}')
    new_df = pd.read_csv(one_filename,
                         usecols=['ride_id','rideable_type','started_at','ended_at','start_station_name','start_station_id','end_station_name',
                                  'end_station_id','start_lat','start_lng','end_lat','end_lng','member_casual','birth year','gender'])
    all_dfs.append(new_df)

In [None]:
len(all_dfs)

In [None]:
Citibike =pd.concat(all_dfs)

In [None]:
Citibike.shape

In [None]:
first_row = Citibike.head(1)
last_row = Citibike.tail(1)

In [None]:
first_row

In [None]:
last_row

In [None]:
import missingno as msg
msg.matrix(Citibike)

In [None]:
#Checking the missing values with respect to each column in the dataset.
Citibike.isnull().sum()

In [None]:
#Checking for the datatypes of each column in the dataset.
Citibike.info()


We observe that the start time and end time are object datatypes. For your analysis we need to convert them to date/time format.

In [None]:
Citibike['started_at'] = pd.to_datetime(Citibike['ended_at'])
Citibike['ended_at'] = pd.to_datetime(Citibike['ended_at'])

We also need to check if the the station id's are consistent with the station name.

In [None]:
print('Total Unique Station ID:', len(pd.unique(Citibike['start_station_id'])), '\n',
      'Total Unique Start Station Names:', len(pd.unique(Citibike['start_station_name'])))

In [None]:
print('Total Unique Station ID:', len(pd.unique(Citibike['end_station_id'])), '\n',
      'Total Unique End Station Names:', len(pd.unique(Citibike['end_station_name'])))

Now for the Stations we see that some station names have different Id's. However, it is acceptable to have different ids rather than different names for the same id because we would expect the same name for a given id, but there may be a new id generated for the stations. As a result, this will not have much effect on the aggregate.

In [None]:
Citibike['start_year'] = Citibike['started_at'].dt.year
Citibike['weekday'] = Citibike['started_at'].dt.day_name()
Citibike['hour'] = Citibike['started_at'].dt.hour

In [None]:
Citibike_agg = Citibike.groupby(['weekday', 'hour'], as_index=False).agg(
    trip_count= pd.NamedAgg(column='start_station_id', aggfunc='count')
)

In [None]:
plot_1 = px.line(Citibike_agg, x='hour', y='trip_count', color='weekday',
               title='Citibike Trips by Weekdays and for each Hour of the day from March 2021 to 2023',
               category_orders={'weekday':['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']},
               labels={'hour':'Time', 'trip_count':'Trip Count', 'weekday':'Day of Week'},
               color_discrete_sequence=px.colors.qualitative.Set1
)

In [None]:
plot_1.update_traces(line={'width':3})
plot_1.update_layout(hovermode='x')
plot_1.update_xaxes(tickmode='array', tickvals=[0, 3, 5, 8, 11, 14, 17, 20, 23],
                 ticktext=['12 am', '3 am', '5 am', '8 am', '11 am', '2 pm', '5 pm', '8 pm', '11 pm'])

plot_1.show()

In [None]:
grouped = Citibike.groupby(['weekday', 'hour', 'member_casual'], as_index=False)['ride_id'].count()


In [None]:
fig = px.line(grouped, x='hour', y='ride_id', color='member_casual',
              facet_col='weekday', facet_col_wrap=3,
              title='Citibike Trips by Hour of Day and User Type from the Period of March 2021-2023',
              category_orders={'weekday':['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']},
              labels={'hour':'Time of the Day', 'ride_id':'Trip Count', 'member_casual':'User Type'},
              color_discrete_sequence=['#1f77b4', '#ff7f0e'])

In [None]:
fig.update_traces(line={'width':3})
fig.update_layout(hovermode='x')
fig.for_each_annotation(lambda a: a.update(text="Day : " + a.text.split("=")[1]))
timestamps = ['12 am', '3 am', '6 am', '9 am', '12 pm', '3 pm', '6 pm', '9 pm']
for i in range(len(fig.layout.annotations)):
    fig.update_xaxes(title='', tickmode='array', tickvals=[0, 3, 5, 8, 11, 14, 17, 20], ticktext=timestamps, row=i//3+1, col=i%3+1)

fig.show()

Trips by Gender and Age.

In [None]:
Citibike['Age'] = 2023 - Citibike['birth year']

In [None]:
Citibike['rider_age'] = Citibike['Age'] - 7

In [None]:
print(Citibike.head())

In [None]:
age_plt = px.box(Citibike, y='rider_age', title='Age Distribution Boxplot')
age_plt.show()

The box plot tells us that there are outliers present in the data. As it is safe to assume that senior citizens in New Jersey are using the Citibike transportation system. However, the frequency of riders with the age above 100 seems very less probable. So I ignore those numbers because it is safe to assume that someone above the age of 100 cannot ride a bike.

So While visualizing the plot to understand trips on the basis of gender and riders age we follow the above.

In [None]:
Citibike['gender'] = Citibike['gender'].replace({0: 'Unknown', 1: 'Male', 2: 'Female'})

In [None]:
# group trips by age and gender and count the number of trips
trips_agg = Citibike.groupby(['rider_age', 'gender'], as_index=False)['start_station_id'].count()


In [None]:
trips_agg = Citibike.groupby(['rider_age', 'gender'], as_index=False).agg(
    trip_count=pd.NamedAgg(column='start_station_id', aggfunc='count')
)

In [None]:
# filter out ages over 100
trips_agg = trips_agg[trips_agg['rider_age'] < 100]

In [None]:
sex_age_plt = px.bar(trips_agg, x='rider_age', y='trip_count', color='gender',
              title="Total Citibike Trips distribution based on the Rider's Gender and Age",
              labels={'rider_age':'Age', 'trip_count':'Total Trips made from March 2021 - 23', 'gender':'Gender'},
              color_discrete_sequence=px.colors.qualitative.Vivid)

sex_age_plt.show()

In [None]:
import datetime
Citibike['month'] = Citibike['started_at'].apply(lambda x: x.strftime('%B'))

In [None]:
Citibike['year'] = Citibike['started_at'].apply(lambda x: x.year)

In [None]:
seasons = {
    'Winter': [12, 1, 2],
    'Spring': [3, 4, 5],
    'Summer': [6, 7, 8],
    'Autumn': [9, 10, 11]
}

In [None]:
def get_season(month):
    for season, months in seasons.items():
        if month in months:
            return season

In [None]:
Citibike['season'] = Citibike['started_at'].apply(lambda x: get_season(x.month))

In [None]:
# Group the data by season and year and count the number of rides
rides_by_season = Citibike.groupby(['season', 'year']).size().reset_index(name='count')

In [None]:
# Pivot the data to create a matrix with season as rows, year as columns, and ride count as values
pivot = rides_by_season.pivot(index='season', columns='year', values='count')

In [None]:
# Plot the pivot table as a bar graph
import seaborn as sns
import matplotlib.pyplot as plt
# Make the line plot interactive using Plotly
Season_plot = px.line(pivot.reset_index(), x='season', y=[2021, 2022, 2023], title='Seasonal Trend/Pattern for Citibike Rides from March 2021 - 23')
Season_plot.update_xaxes(title="Season")
Season_plot.update_yaxes(title='Number of Citibike Rides')
Season_plot.show()

In [None]:
sns.countplot(x='rideable_type', data=Citibike)
plt.title('Count of Different Types of Citibikes')
plt.xlabel('Citibike Types')
plt.ylabel('Total Count')

In [None]:
sns.set(style="ticks")

# create a scatterplot matrix
scatter_matrix = sns.pairplot(data=Citibike, 
                              vars=['rider_age'], 
                              hue='rideable_type', 
                              palette='bright')

scatter_matrix.fig.set_size_inches(8, 6)

# decrease the font size of the title
scatter_matrix.fig.suptitle("Scatterplot Matrix of Age by Types of Citibikes.", y=1.05, fontsize=16)

plt.show()

In [None]:
import folium 
from folium.plugins import HeatMap
import seaborn as sns

In [None]:
df = Citibike[Citibike['start_station_id'] != Citibike['end_station_id']]

In [None]:
route_counts = df.groupby(['start_station_name', 'start_lat', 'start_lng', 'end_station_name', 'end_lat', 'end_lng', 'member_casual']).size().reset_index(name='count')

In [None]:
member_routes = route_counts[route_counts['member_casual'] == 'member'].nlargest(25, 'count')
casual_routes = route_counts[route_counts['member_casual'] == 'casual'].nlargest(25, 'count')

In [None]:
nj_map = folium.Map(location=[40.7128, -74.0060], zoom_start=12)

In [None]:
for i, row in member_routes.iterrows():
    start_lat, start_lng, end_lat, end_lng = row['start_lat'], row['start_lng'], row['end_lat'], row['end_lng']
    folium.Marker([start_lat, start_lng], popup=f"{row['start_station_name']} to {row['end_station_name']} ({row['count']} rides)").add_to(nj_map)
    folium.PolyLine([(start_lat, start_lng), (end_lat, end_lng)], color='green', weight=2.5, opacity=1).add_to(nj_map)

In [None]:
for i, row in casual_routes.iterrows():
    start_lat, start_lng, end_lat, end_lng = row['start_lat'], row['start_lng'], row['end_lat'], row['end_lng']
    folium.Marker([start_lat, start_lng], popup=f"{row['start_station_name']} to {row['end_station_name']} ({row['count']} rides)").add_to(nj_map)
    folium.PolyLine([(start_lat, start_lng), (end_lat, end_lng)], color='red', weight=2.5, opacity=1).add_to(nj_map)

In [None]:
nj_map


In [None]:
df1 = Citibike[Citibike['start_station_id'] != Citibike['end_station_id']]
route_counts = df1.groupby(['start_station_name', 'start_lat', 'start_lng', 'end_station_name', 'end_lat', 'end_lng']).size().reset_index(name='count')
top_routes = route_counts.nlargest(50, 'count')
nj_map = folium.Map(location=[40.7128, -74.0060], zoom_start=12)
for i, row in top_routes.iterrows():
    start_lat, start_lng, end_lat, end_lng = row['start_lat'], row['start_lng'], row['end_lat'], row['end_lng']
    folium.Marker([start_lat, start_lng], popup=f"{row['start_station_name']} to {row['end_station_name']} ({row['count']} rides)").add_to(nj_map)
    folium.PolyLine([(start_lat, start_lng), (end_lat, end_lng)], color='blue', weight=2.5, opacity=1).add_to(nj_map)
nj_map

In [None]:
casual_df = Citibike[Citibike['member_casual']=='casual']

In [None]:
# get the counts of rideable types for casual riders
counts = casual_df['rideable_type'].value_counts()

In [None]:
plt.bar(counts.index, counts.values)
plt.title('Counts of Rideable Types for Casual Riders')
plt.xlabel('Types of Citibikes')
plt.ylabel('Total Casual Riders')
plt.show()

In [None]:
member_df = Citibike[Citibike['member_casual']=='member']

In [None]:
counts = member_df['rideable_type'].value_counts()

In [None]:
plt.bar(counts.index, counts.values)
plt.title('Counts of Rideable Types for Members')
plt.xlabel('Types of Citibikes')
plt.ylabel('Total Member Riders')
plt.show()

SPARK ANALYSIS FOR DISPLAYING SQL QUERIES

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

In [None]:
spark=SparkSession.builder.appName("OTR").config("spark.sql.caseSensitive","True").getOrCreate()

In [None]:
spark_citi = spark.createDataFrame(Citibike)

In [None]:
spark_citi.createOrReplaceTempView("Citibike_table")

In [None]:
spark.sql("SELECT * FROM Citibike_table LIMIT 5").show()

In [None]:
total_records = spark.sql("SELECT COUNT (*) as total_records FROM Citibike_table")
total_records.show()

We can see that the pandas dataframe has been succesfully loaded into a Spark SQL table.

In [None]:
t_g = spark.sql("SELECT gender, COUNT(*) as total_rides FROM Citibike_table GROUP BY gender")
t_g.show()

In [None]:
spark.sql("""
SELECT start_station_name, COUNT(*) as count
FROM Citibike_table
GROUP BY start_station_name
ORDER BY count DESC
LIMIT 10
""").show()

In [None]:
spark.sql("""
SELECT end_station_name, COUNT(*) as count
FROM Citibike_table
GROUP BY end_station_name
ORDER BY count DESC
LIMIT 10
""").show()

In [None]:
spark.sql("SELECT season, COUNT(*) as total_rides FROM Citibike_table GROUP BY season").show()

In [None]:
spark.sql("SELECT gender, ROUND(AVG(rider_age),0) as average_age FROM Citibike_table GROUP BY gender").show()

In [None]:
spark.sql("SELECT member_casual,COUNT(*) as User_count FROM Citibike_table GROUP BY member_casual").show()