In [None]:
# Import libraries
import numpy as np
import pandas as pd
import requests
import zipfile
import io
from datetime import datetime
import json
import time
from pprint import pprint
from pymongo import MongoClient

In [None]:
# Connecting to Mongo
client = MongoClient('localhost', 27017) #27017 local, 27027 sharding
NY = client.Citibike.get_collection("NewYork")

In [None]:
# Document example
NY.find_one()

In [None]:
# n total documents
query1 = NY.estimated_document_count()
pprint(query1)

In [None]:
%%time
# n trips in 2019
query2 = NY.aggregate([
    {"$match" : { "ST": { "$lt": datetime(2020, 1, 1) } } },
    {"$count":'trips_2019'}
])
for result in query2:
    pprint(result)

In [None]:
%%time
# n trips in 2020
query3 = NY.aggregate([
    {"$match" : { "ST": { "$gte": datetime(2020, 1, 1) } } },
    {"$count":'trips_2020'}
])
for result in query3:
    pprint(result)

In [None]:
%%time
# n trips by gender (0=unknown; 1=males; 2=females)
query4 = NY.aggregate([
    {'$group': {'_id': {'year': { '$year': { 'date': '$ST' } }, 
                        'gender':'$G'},
                'trips': { "$sum": 1}}}
])
for result in query4:
    pprint(result)

In [None]:
%%time
# n trips per user type
# (Customer = 24-hour pass or 3-day pass user; Subscriber = Annual Member)
query5 = NY.aggregate([
    {'$group': {'_id': {'year': { '$year': { 'date': '$ST' } }, 
                        'userType':'$U'},
                'trips': { "$sum": 1}}}
])
for result in query5:
    pprint(result)

In [None]:
%%time
# n trips per month
array_month = []
query6 = NY.aggregate([
    {"$group" : {"_id" : { "$dateToString":{"format":"%Y-%m", "date":"$ST"}},
                     'trips': {"$sum": 1}}}
])
for result in query6:
    array_month.append(result)

In [None]:
# Creating and cleaning a dataframe
df_month = pd.DataFrame(array_month)
df_month.sort_values(by='_id',inplace=True)
df_month.rename(columns={'_id':'year_month'},inplace=True)
df_month.reset_index(drop=True,inplace=True)
df_month.head()

In [None]:
%%time
# data per day
array_day = []
query7 = NY.aggregate([
                    {"$group" : {"_id" : { "$dateToString":{"format":"%Y-%m-%d", "date":"$ST"}},
                                    'trips': { "$sum": 1},
                                    'avg_duration': {"$avg" : "$D"},
                                    'avg_birthYear':{  "$avg" : "$BY" },
                                    'males': { '$sum': {'$cond': [{ '$eq': ["$G", 1] }, 1, 0 ] }},
                                    'females': { '$sum': {'$cond': [{ '$eq': ["$G", 2] }, 1, 0 ] }},
                                    'customers': { '$sum': {'$cond': [{ '$eq': ["$U", 'Customer'] }, 1, 0 ]  }},
                                    'subscribers': {'$sum': { '$cond': [{ '$eq': ["$U", 'Subscriber'] }, 1, 0 ]}},
                                    'leisure': {'$sum': { '$cond': [{ '$eq': ["$S", '$E'] }, 1, 0 ]}}}}
])
for result in query7:
    array_day.append(result)

In [None]:
# Creating, cleaning and saving a dataframe
df_day = pd.DataFrame(array_day)
df_day['_id'] = pd.to_datetime(df_day['_id'])
df_day.sort_values(by='_id',inplace=True)
df_day.reset_index(drop=True,inplace=True)
df_day.rename(columns={'_id':'date'},inplace=True)
df_day.to_csv('data_per_day.csv',index=False)

In [None]:
%%time
# n trips per week
array_week = []
query8 = NY.aggregate([
    { "$group" : {"_id" : {'year': { '$year': { 'date': '$ST' } },
                        'week': { '$week': { 'date': '$ST' } } },
                'trips': { "$sum": 1 } } }
                             ])
for result in query8:
    year=result.get('_id').get('year')
    week=result.get('_id').get('week')
    trips=result.get('trips')
    array_week.append([year,week,trips])   

In [None]:
# Creating a dataframe
# $week returns the week of the year for a date as a number between 0 and 53.
# Weeks begin on Sundays, and week 1 begins with the first Sunday of the year. 
# Days preceding the first Sunday of the year are in week 0. 
df_week = pd.DataFrame(array_week,columns=['year','week','trips'])
df_week.sort_values(by=['week'],inplace=True)
df_week.reset_index(drop=True,inplace=True)
df_week.head()

In [None]:
%%time
# n trips in a specific week (38 in this case)
query9 = NY.aggregate([
    {'$match': { '$expr' : {'$eq': [{'$week':'$ST'}, 38] }}},
    {"$group" : { "_id" : {'year': { '$year': { 'date': '$ST' } } }, 'trips': { "$sum": 1 }}}                                
])
for result in query9:
    pprint(result)

In [None]:
%%time
# week 38: n trips per station/hour (know in advance date intervals)
array_week38 = []
query10 = NY.aggregate([ 
                        {"$match" : { '$or': [ { "ST": { "$gte": datetime(2019, 9, 16), "$lt": datetime(2019, 9, 23) }},
                                            { "ST": { "$gte": datetime(2020, 9, 14), "$lt": datetime(2020, 9, 21) }}]}},
                        {"$group" : { "_id" : { 'station':"$S",'datetime':
                                               {"$dateToString":{"format":"%Y-%m-%dT%H", "date":"$ST"}} },
                                    'trips': { "$sum": 1 },
                                    'avg_duration': { "$avg" : "$D"},
                                    'avg_birthYear':{  "$avg" : "$BY" },
                                    'males': { '$sum': {'$cond': [{ '$eq': ["$G", 1] }, 1, 0 ] }},
                                    'females': { '$sum': {'$cond': [{ '$eq': ["$G", 2] }, 1, 0 ] }},
                                    'customers': { '$sum': {'$cond': [{ '$eq': ["$U", 'Customer'] }, 1, 0 ]  }},
                                    'subscribers': {'$sum': { '$cond': [{ '$eq': ["$U", 'Subscriber'] }, 1, 0 ]}}}}
],allowDiskUse=True)
for res in query10:
    key=res.pop('_id')
    array_week38.append(list(key.values())+list(res.values()))

In [None]:
# Creaing and saving a dataframe
df_week38 = pd.DataFrame(array_week38, columns=['station_id','dateTime','trips','avg_duration','avg_birthYear','males', 'females','customers','subscribers'])
df_week38['dateTime']=pd.to_datetime(df_week38['dateTime'])
df_week38.sort_values(by=['station_id','dateTime'],inplace=True)
df_week38.reset_index(drop=True,inplace=True)
df_week38.to_csv('week38.csv',index=False)

In [None]:
%%time
# n stations in 2019
query11=NY.find({ "ST": { "$lt": datetime(2020, 1, 1) } }).distinct('S')
len(query11)

In [None]:
%%time
# n stations in 2020
query12=NY.find({ "ST": { "$gte": datetime(2020, 1, 1) } }).distinct('S')
len(query12)

In [None]:
%%time
# trips data in september, considering start stations
array_sept_start = []
query13 = NY.aggregate([ 
                        {"$match" : { '$or': [ { "ST": { "$gte": datetime(2019, 9, 1), "$lt": datetime(2019, 10, 1) }},
                                            { "ST": { "$gte": datetime(2020, 9, 1), "$lt": datetime(2020, 10, 1) }}]}},
                        {"$group" : { "_id" : { 'station':"$S",
                                               'datetime': {"$dateToString":{"format":"%Y-%m-%dT%H", "date":"$ST"}} },
                                    'trips': { "$sum": 1 }}}
],allowDiskUse=True)
for res in query13:
    station=res.get('_id').get('station')
    date=res.get('_id').get('datetime')
    trips=res.get('trips')
    array_sept_start.append([station,date,trips])

In [None]:
# reading stations file (set right working directory)
stations=pd.read_csv('stations.csv')
stations.head()

In [None]:
# Cleaning and pre-processing
df_sept_start = pd.DataFrame(array_sept_start, columns=['station_id','dateTime','trips'])
df_sept_start['my_dates'] = pd.to_datetime(df_sept_start['dateTime'],format="%Y-%m-%dT%H")
df_sept_start['year'] = df_sept_start['my_dates'].dt.year
df_sept_start['day_of_week'] = df_sept_start['my_dates'].dt.day_name()
df_sept_start['Hour'] = df_sept_start['my_dates'].dt.hour
df_sept_start['type'] = 'start'
df_sept_start.head()

In [None]:
%%time
# trips data in september, considering end stations
array_sept_end = []
query14 = NY.aggregate([ 
                        {"$match" : { '$or': [ { "ST": { "$gte": datetime(2019, 9, 1), "$lt": datetime(2019, 10, 1) }},
                                            { "ST": { "$gte": datetime(2020, 9, 1), "$lt": datetime(2020, 10, 1) }}]}},
                        {"$group" : { "_id" : { 'station':"$E",
                                               'datetime': {"$dateToString":{"format":"%Y-%m-%dT%H", "date":"$ST"}} },
                                    'trips': { "$sum": 1 }}}
],allowDiskUse=True)
for res in query14:
    station=res.get('_id').get('station')
    date=res.get('_id').get('datetime')
    trips=res.get('trips')
    array_sept_end.append([station,date,trips])

In [None]:
# Cleaning and pre-processing
df_sept_end = pd.DataFrame(array_sept_end, columns=['station_id','dateTime','trips'])
df_sept_end['my_dates'] = pd.to_datetime(df_sept_end['dateTime'],format="%Y-%m-%dT%H")
df_sept_end['year'] = df_sept_end['my_dates'].dt.year
df_sept_end['day_of_week'] = df_sept_end['my_dates'].dt.day_name()
df_sept_end['Hour'] = df_sept_end['my_dates'].dt.hour
df_sept_end['type'] = 'end'

In [None]:
# Join start - end
s=df_sept_start[['station_id','my_dates','year','day_of_week','Hour','trips']]
e=df_sept_end[['station_id','my_dates','year','day_of_week','Hour','trips']]
start_end=pd.merge(s,e,on=['station_id','my_dates','year','day_of_week','Hour'],how='outer',suffixes=['_start','_end'])

In [None]:
# Join start_end - stations
start_end.fillna(0,inplace=True)
start_end_stations=pd.merge(start_end,stations,left_on='station_id',right_on='id')
start_end_stations.drop(columns='id',inplace=True)
start_end_stations.to_csv('september_start_end.csv')

In [None]:
# Creating long dataframe
df_sept=df_sept_start.append(df_sept_end)
df_sept.head()

In [None]:
# Grouped dataframe
grouped = df_sept.groupby(['year','station_id','day_of_week','Hour','type'],as_index=False).mean()
df_grouped = pd.merge(grouped,stations,left_on='station_id',right_on='id')
df_grouped.drop(columns='id',inplace=True)
df_grouped.rename(columns={'trips':'avg_trips'},inplace=True)
df_grouped.to_csv('september.csv',index=False)

In [None]:
# Cleaning and saving long dataframe
df_sept = pd.DataFrame(array_sept, columns=['station_id','dateTime','trips','avg_duration'])
df_sept['my_dates'] = pd.to_datetime(df_sept['dateTime'],format="%Y%m%dT%H")
df_sept['year'] = df_sept['my_dates'].dt.year
df_sept['day_of_week'] = df_sept['my_dates'].dt.day_name()
df_sept['Hour'] = df_sept['my_dates'].dt.hour
df_sept_long=pd.merge(df_sept,stations,left_on='station_id',right_on='id')
df_sept_long.drop(columns='id',inplace=True)
df_sept_long.head()
df_sept_long.to_csv('september_long.csv',index=False)