In [1]:
"""
Data models for various data used in the analysis.
"""

from typing import Text, List, Dict, Union, Callable, Any
from datetime import datetime, time
from pyspark.sql import functions as F
import pyspark
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, IntegerType, StringType, DoubleType, TimestampType, LongType


PandasDataFrame = Union[pd.DataFrame, Callable[[], pd.DataFrame]]
SparkDataFrame = Union[pyspark.sql.DataFrame, Callable[[], pyspark.sql.DataFrame]]

# COMMAND ----------

class PandasDataFrameBase(object):
  """Base class for dataframe wrapper classes"""
  def __init__(self, dataframe: PandasDataFrame, **columns):
    self.dataframe = dataframe
    self.columns = columns
    
  def col(self, colname: Text) -> Text:
    """return the actual column name for a col."""
    return self.columns[colname]

# COMMAND ----------

class Route(PandasDataFrameBase):
  """Class to model a dataframe containing route information for bus services."""
  def __init__(self, 
               dataframe: PandasDataFrame,
               service: Text, 
               direction: Text, 
               stop_code: Text, 
               seq: Text, 
               km: Text, 
               dt_from: Text, 
               dt_to: Text) -> None:
    
    columns = dict(service=service,
                   direction=direction,
                   stop_code=stop_code,
                   seq=seq,
                   km=km,
                   dt_from=dt_from,
                   dt_to=dt_to)
    dataframe = dataframe() if callable(dataframe) else dataframe
    super().__init__((dataframe.sort_values(by=[direction, seq])), **columns)
    
    self.service = dataframe[service]
    self.direction = dataframe[direction]
    self.stop_code = dataframe[stop_code]
    self.seq = dataframe[seq]
    self.km = dataframe[km]
    self.dt_from = dataframe[dt_from]
    self.dt_to = dataframe[dt_to]
    
  @staticmethod
  def from_csv(path: Text, 
               service: Text, 
               direction: Text, 
               stop_code: Text, 
               seq: Text, 
               km: Text, 
               dt_from: Text, 
               dt_to: Text,
               time_format: Text='%d/%m/%Y') -> "Route":
    """Load a Route from a csv file."""
    dataframe = pd.read_csv(path,
                            converters={service: lambda s: s.strip()},
                            dtype={stop_code: str},
                            parse_dates=[dt_from, dt_to], 
                            infer_datetime_format=True,
                            date_parser=lambda x: pd.datetime.strptime(x, time_format))
    
    dataframe[stop_code] = dataframe[stop_code].apply(lambda x: x.zfill(5))
    
    return Route(dataframe,
                 service, 
                 direction, 
                 stop_code, 
                 seq, 
                 km, 
                 dt_from, 
                 dt_to)
  
  def valid_for(self, when: datetime) -> "Route":
    """Return a Route that is valid for a particular datetime."""
    predicate = (self.dt_from <= when) & (self.dt_to >= when)
    return Route(self.dataframe.loc[predicate], **self.columns)
  
  def for_service(self, service: Text, direction: Text) -> "Route":
    """Return a Route for the corresponding service and direction."""
    predicate = (self.service == service) & (self.direction == direction)
    return Route(self.dataframe.loc[predicate], **self.columns)
  
  def to_nodes(self, name="stop_code", index="seq", order="km"):
    """Return as a list of nodes"""
    results = (self.dataframe[[self.col(name), 
                               self.col(index),
                               self.col(order)]])
    results.columns = ["name", "index", "order"]
    results["group"] = 1
    return results.to_dict("records")

# COMMAND ----------

class Trips(PandasDataFrameBase):
  """Class to model a pandas dataframe of source-destination for a service."""
  def __init__(self, 
               dataframe: PandasDataFrame, 
               src: Text, 
               src_seq: Text,
               dst: Text, 
               dst_seq: Text,
               pax: Text) -> None:
    self.dataframe = dataframe() if callable(dataframe) else dataframe
    self.columns = dict(src=src, 
                        src_seq=src_seq, 
                        dst=dst, 
                        dst_seq=dst_seq,
                        pax=pax)
  def to_edges(self, source: Text = "src_seq", target: Text = "dst_seq", value: Text = "pax"):
    """Return as a list of edges"""
    records = (self.dataframe[[self.col(source), 
                            self.col(target),
                            self.col(value)]]
            .rename(columns={
              self.col(source): "source",
              self.col(target): "target",
              self.col(value): "value"})
            .to_dict("records"))    
    # stoopid to_dict bug that cannot properly convert numpy.int64 to int
    return [{"source": int(record.get("source")), 
             "target": int(record.get("target")), 
             "value": float(record.get("value"))} 
            for record in records]
    


# COMMAND ----------

class Ezlink(object):
  """Class to model the ezlink data"""
  def __init__(self, 
               dataframe: SparkDataFrame,
               src: Text,
               dst: Text,
               year: Text,
               bus_id: Text,
               trip_id: Text,
               journey_id: Text,
               travel_mode: Text,
               service: Text,
               direction: Text,
               km: Text,
               tap_in_time: Text,
               tap_out_time: Text,
               date: Text,
               meta: List = None
              ) -> None:
    self.dataframe = dataframe() if callable(dataframe) else dataframe
    self.columns = dict(src=src,
                        dst=dst,
                        year=year,
                        bus_id=bus_id,
                        trip_id=trip_id,
                        journey_id=journey_id,
                        travel_mode=travel_mode,
                        service=service,
                        direction=direction,
                        km=km,
                        tap_in_time=tap_in_time,
                        tap_out_time=tap_out_time,
                        date=date,
                        meta = meta if meta else dict())
    self.src = F.col(src)
    self.dst = F.col(dst)
    self.year = F.col(year)
    self.bus_id = F.col(bus_id)
    self.trip_id = F.col(trip_id)
    self.journey_id = F.col(journey_id)
    self.travel_mode = F.col(travel_mode)
    self.service = F.col(service)
    self.direction = F.col(direction)
    self.km = F.col(km)
    self.tap_in_time = F.col(tap_in_time)
    self.tap_out_time = F.col(tap_out_time)
    self.date = F.col(date)
    self.meta = self.columns["meta"]
    
  def cache(self):
    return self.dataframe.cache()
  
  def take(self, n: int):
    return self.dataframe.take(n)
  
  def annotate(self, key: str, value: Any) -> Dict:
    """Update the meta-data for ezlink"""
    self.columns["meta"].update({key: value})
    return self
    
  def in_days_of_week(self, days: List[Text], input_time_column=None) -> "Ezlink":
    """Return an Ezlink object that is representative for the selected days of week."""
    if self.meta.get("in_days_of_week"):
      raise RuntimeError("Ezlink is already in_days_of_week="
                         .format(self.meta.get("in_days_of_week")))
    # if input time is before 3am, the ride date should be 1 day earlier
    # Ride on Monday 2:30am = Sunday night ride
    input_time_column = input_time_column if input_time_column else self.tap_in_time
    dataframe = (self.dataframe
                 .withColumn('dayofweek',
                             F.when(F.date_format(input_time_column, "HH") < 3,
                                    F.date_format(F.date_add(input_time_column, -1), "EEEE"))
                             .otherwise(F.date_format(input_time_column, "EEEE"))))

    dataframe = (dataframe
                 .filter(F.col('dayofweek').isin(days))
                 .drop('dayofweek'))
    return (Ezlink(dataframe, **self.columns)
            .annotate("in_days_of_week", days))

  def for_service(self, service: Text, direction: Text) -> "Ezlink":
    """
    Filter by service and direction.
    """
    if self.meta.get("is_service") == (service, direction):
    #if self.meta.get("is_service"):
      raise RuntimeError("Ezlink is already is_service="
                         .format(self.meta.get("is_service")))
    
    dataframe = (self.dataframe
                 .filter(self.service == service)
                 .filter(self.direction == direction))
    return (Ezlink(dataframe, **self.columns)
            .annotate("is_service", (service, direction)))
  
  def within_time_range(self, start_time: time, end_time: time) -> "Ezlink":
    """Return ezlink where each journey start or/and end within the specified time range."""
    if self.meta.get("within_time_range"):
      raise RuntimeError("Ezlink is already within_time_range="
                         .format(self.meta.get("within_time_range")))
    
    start_time_str = start_time.strftime("%H:%M:%S")
    end_time_str = end_time.strftime("%H:%M:%S")
    predicate_board = (F.date_format(self.tap_in_time, "HH:mm:ss")    
                       .between(start_time_str, end_time_str)) 
    predicate_alight = (F.date_format(self.tap_out_time, "HH:mm:ss")    
                        .between(start_time_str, end_time_str)) 
    dataframe = (self.dataframe
                 .filter(predicate_board | predicate_alight))

    return (Ezlink(dataframe, **self.columns)
            .annotate("within_time_range", (start_time, end_time)))
  
  def get_trips(self, route: Route):
    """Return a list of source-destination"""
    # column names
    stop_code = route.col("stop_code")
    seq = route.col("seq")
    seq_source = route.col("seq") + "_source"
    seq_destination = route.col("seq") + "_destination"
    
    ods = (self.dataframe
           .withColumn("pax", F.lit(1))
           .groupBy(self.src.alias("source"), 
                    self.dst.alias("destination"))
           .agg(F.count("pax").alias("pax"))
           .toPandas()[["source", "destination", "pax"]])
     
    # find out the sequence for source and destination
    route_df = (route
                .dataframe
                .set_index(stop_code)[[seq]])
    joined = (ods.set_index("source")
              .join(route_df, how="left"))
    joined.columns = ["destination", "pax", "source_seq"]
    # workaround for bug where index name disappears after join
    joined.index = joined.index.rename("source") 
    joined = joined.reset_index()

    # find seq for destination
    joined = (joined.set_index("destination")
              .join(route_df, how="left"))
    joined.columns = ["source", "pax", "source_seq", "destination_seq"]
    # workaround for bug where index name disappears after join
    joined.index = joined.index.rename("destination") 
    joined = joined.reset_index()
    
    # find source-destination with smallest number of stop travelled
    joined["stops_travelled"] = joined["destination_seq"] - joined["source_seq"]
    joined = joined.loc[joined["stops_travelled"] > 0]
    joined = (joined.loc[joined.groupby(["source", 
                                         "destination", 
                                         "source_seq", 
                                         "destination_seq"])["stops_travelled"].idxmin()])

    return Trips(joined.drop("stops_travelled", axis=1).astype({"source_seq": int, "destination_seq": int}),
                 src="source", 
                 src_seq="source_seq", 
                 dst="destination", 
                 dst_seq="destination_seq", 
                 pax="pax")
  
  def get_trips_amit(self, route: Route):
    """Return a list of source-destination"""
    # column names
    stop_code = route.col("stop_code")
    seq = route.col("seq")
    seq_source = route.col("seq") + "_source"
    seq_destination = route.col("seq") + "_destination"
    
    ods = (self.dataframe
           .withColumn("pax", F.lit(1))
           .groupBy(self.src.alias("source"), 
                    self.dst.alias("destination"))
           .agg(F.count("pax").alias("pax"))
           .toPandas()[["source", "destination", "pax"]])
     
    # find out the sequence for source and destination
    route_df = (route
                .dataframe
                .set_index(stop_code)[[seq]])
    
    joined = (ods.set_index("source")
              .join(route_df, how="left"))
    joined.columns = ["destination", "pax", "source_seq"]
    # workaround for bug where index name disappears after join
    joined.index = joined.index.rename("source") 
    joined = joined.reset_index()

    # find seq for destination
    joined = (joined.set_index("destination")
              .join(route_df, how="left"))
    joined.columns = ["source", "pax", "source_seq", "destination_seq"]
    # workaround for bug where index name disappears after join
    joined.index = joined.index.rename("destination") 
    joined = joined.reset_index()
    
    # find source-destination with smallest number of stop travelled
    joined["stops_travelled"] = joined["destination_seq"] - joined["source_seq"]
    joined = joined.loc[joined["stops_travelled"] > 0]
    joined = (joined.loc[joined.groupby(["source", 
                                         "destination", 
                                         "source_seq", 
                                         "destination_seq"])["stops_travelled"].idxmin()])  
    
    return joined, route_df
  
  def tap_in_by_hour(self) -> pyspark.sql.DataFrame:
    """
    Return a dataframe with the number of tap in by hour.
    """
    return (self.dataframe
            .select(F.hour(self.tap_in_time).alias('hr'))
            .groupBy(F.col('hr'))
            .agg(F.count('hr').alias('pax'))
            .sort(F.col('hr')))

  def num_days(self) -> int:
    """
    Number of distinct days in the ezlink.
    """
    if self._num_days:
      return self._num_days
    
    self._num_days = (self.dataframe
                      .select(self.year,
                              F.dayofyear(self.tap_in_time).alias('day'))
                      .distinct()
                      .count())
    return self._num_days

In [2]:
# resources and variable initialization
EZLINK = "ezlink-201702-bus.csv"
ROUTE = "lta_scheduled_bus_routes_for_feb2017.csv"

route_schema = dict(service="service", 
                    direction="direction", 
                    stop_code="BusStopCode", 
                    seq="BusStopSequence", 
                    km="km", 
                    dt_from="dt_from", 
                    dt_to="dt_to",
                    time_format='%d/%m/%Y')

schema = StructType([
    StructField('ALIGHTING_STOP_STN', StringType()),
    StructField('BOARDING_STOP_STN', StringType()),
    StructField('BUS_REG_NUM', StringType()),
    StructField('Bus_Trip_Num', StringType()),
    StructField('Direction', IntegerType()),
    StructField('JOURNEY_ID', LongType()),
    StructField('Ride_Distance', DoubleType()),
    StructField('TRAVEL_MODE', StringType()),
    StructField('Year', IntegerType()),
    StructField('tap_in_time', TimestampType()),
    StructField('tap_out_time', TimestampType()),    
    StructField('Srvc_Number', StringType()),    
    StructField('Date', DateType()),
])

# ALIGHTING_STOP_STN,BOARDING_STOP_STN,BUS_REG_NUM,Bus_Trip_Num,Direction,JOURNEY_ID,Ride_Distance,TRAVEL_MODE,Year,tap_in_time,tap_out_time,Srvc_Number,Date
ezlink_schema = dict(src="BOARDING_STOP_STN",
                     dst="ALIGHTING_STOP_STN",
                     year="Year",
                     bus_id="BUS_REG_NUM",
                     trip_id="Bus_Trip_Num",
                     journey_id="JOURNEY_ID",
                     travel_mode="TRAVEL_MODE",
                     service="Srvc_Number",
                     direction="Direction",
                     km="Ride_Distance",
                     tap_in_time="tap_in_time",
                     tap_out_time="tap_out_time",
                     date="Date")

route_valid_for_date = datetime(2017, 2, 1)
days_of_interest = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday"]
am_peak = dict(start_time=time(7, 30), end_time=time(9, 30)) 
pm_peak = dict(start_time=time(17, 0), end_time=time(20, 0))

In [3]:
import pyspark
from pyspark.sql import SparkSession

In [4]:
#conf = pyspark.SparkConf().setAll([('spark.executor.memory', '12g'), ('spark.driver.memory','8g')])
#spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sc.getConf().getAll()
sc._conf.getAll()

[('spark.app.id', 'local-1534309300342'),
 ('spark.driver.port', '60198'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.host', 'DSD-AS-9070'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'pyspark-shell')]

In [5]:
# read data
route = (Route.from_csv(ROUTE, **route_schema)
         .valid_for(route_valid_for_date))

# ezlink data
ezlink_data = spark.read.csv(EZLINK, header = "true", schema = schema)
ezlink = Ezlink(ezlink_data, **ezlink_schema)

In [None]:
ezlink.dataframe.cache()
dates = ezlink.dataframe.select('Date').distinct().rdd.map(lambda r:(r[0])).collect()

# Obtain trunk bus services in data that match trunk services list from above & with valid direction
ezlink_bus_srvc = ezlink.dataframe.select('Srvc_Number','Direction').distinct().rdd.map(lambda r:(r[0],r[1])).collect()
ezlink_bus_srvc = [item for item in ezlink_bus_srvc if (item[0] is not None) and (item[1] is not None)]

In [None]:
data_frames = []
for item in ezlink_bus_srvc:
  service_of_interest = dict(service=item[0], direction=item[1])
  service_route = route.for_service(**service_of_interest)
  if not service_route.dataframe.empty:
    ezlink_sub = (ezlink
                  .for_service(**service_of_interest))
    
    if (len(ezlink_sub.dataframe.head(1)) > 0):
          
      joined, route_df = (ezlink_sub
                          .get_trips_amit(service_route))
      
      # Generate 2D Matrix of origin-destination bus stops for the service (to include even 0 occurrence pairs)
      idx = list(route_df.index)
      mat_sz = len(idx)
      if (idx[0] == idx[mat_sz-1]): # checking for loop service & renaming first and last stops to prevent confusion in mapping
        joined.loc[(joined['source'] == idx[0]),'source'] = idx[0]+"_O"
        joined.loc[(joined['destination'] == idx[mat_sz-1]),'destination'] = idx[mat_sz-1] + "_D"
        ezlink_sub.dataframe = (ezlink_sub.dataframe
                                .withColumn('BOARDING_STOP_STN', F.when(ezlink_sub.dataframe['BOARDING_STOP_STN'] == idx[0], idx[0]+"_O").otherwise(ezlink_sub.dataframe['BOARDING_STOP_STN']))
                                .withColumn('ALIGHTING_STOP_STN', F.when(ezlink_sub.dataframe['ALIGHTING_STOP_STN'] == idx[mat_sz-1], idx[mat_sz-1] + "_D").otherwise(ezlink_sub.dataframe['ALIGHTING_STOP_STN'])))
        idx[0] = idx[0]+"_O"
        idx[mat_sz-1] = idx[mat_sz-1] + "_D"

      # count number of buses in service for each day at each stop and sum up for all days in dataset of interest
      ezlink_sub.dataframe.cache()
      bus_count = pd.np.zeros(mat_sz,dtype=np.int)
      for date in dates:
        for i,stop in enumerate(idx):
          bus_count[i] = bus_count[i] + (ezlink_sub.dataframe
                                         .filter(F.col('Date').isin(date))
                                         .filter(F.col('BOARDING_STOP_STN').isin(stop))
                                         .select('BUS_REG_NUM','Bus_Trip_Num')
                                         .distinct()
                                         .count())
      
      I = pd.Index(idx,
                   name="")
      C = pd.Index(idx,
                   name="")
      route_df_mat = pd.DataFrame(pd.np.zeros((mat_sz,mat_sz),
                                              dtype=np.int),
                                  index=I,
                                  columns=C)

      # Based on counts in joined, add the counts to the matrix
      for i in range(0,joined.shape[0]):
        route_df_mat.loc[joined.iloc[i,1],joined.iloc[i,0]] += joined.iloc[i,2]
      route_df_mat["boarding total"] = route_df_mat.sum(axis=1)
      route_df_mat.loc["alighting total"] = route_df_mat.sum(axis=0)

      # Create cumulative net passengers on board each service
      route_dist_series = pd.DataFrame(
        {"service": np.repeat(item[0], mat_sz),
         "direction" : np.repeat(item[1], mat_sz),
         "bus stop code": idx,
         "seq": list(range(mat_sz)),
         "bus count": pd.np.zeros(mat_sz,dtype=np.int),
         "Net Passengers on Bus at BusStop": pd.np.zeros(mat_sz,dtype=np.int)
        }
      )

      route_dist_series.iloc[0,0] = route_df_mat.iloc[0,route_df_mat.shape[1]-1]
      for j in range(1,route_dist_series.shape[0]):
          route_dist_series.iloc[j,0] = route_dist_series.iloc[j-1,0] + (route_df_mat.iloc[j,route_df_mat.shape[1]-1] - route_df_mat.iloc[route_df_mat.shape[0]-1,j])
      route_dist_series["bus count"] = bus_count 

      data_frames.append(route_dist_series)
    
route_dist_series_all = pd.concat(data_frames)

In [None]:
len(data_frames)

In [None]:
route_dist_series_all.to_csv("cumulative_net_commuter_on_bus.csv", index = False)