# Better Biking - Real-time city-wide bike re-distribution by incentivizing riders to drop bikes at high-demand locations

## Team Members
- Anubhav Jain (aj3187)
- Ayush Jain (aj3152)

### Project Introduction
In this project, we are trying to identify high demand biking stations in real time and incentivizing riders to drop bikes at these high-demand locations in a radius of 1 mile.

Our code allows users to enter their locations in the form of latitude and longitude and produces a list of stations near that location along with the discounts offered at each station.

In case a user wants the predicted discount at some other time, we allow the user to enter a time and then try to predict what discounts would be offered at that time

### Data Sources
We are using the real-time data feed from the Citi Bike General Bikeshare Feed Specification (https://github.com/MobilityData/gbfs/blob/master/gbfs.md) to fetch following data streams:
* Citi-bike station status (https://gbfs.citibikenyc.com/gbfs/en/station_status.json) - This data gives provides real-time bike and e-bike availability information 
* Citi-bike station information (https://gbfs.citibikenyc.com/gbfs/en/station_information.json) - This data provides the real-time station information like station id, name, location and total capacity

### Imports

In [3]:
import okhttp3.{Headers, OkHttpClient, Request, Response}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.api.java.UDF1
import org.apache.spark.sql.functions.{col, udf, from_json, explode}
import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructField, StructType, LongType}
import org.apache.hadoop.fs.{FileSystem, Path}
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
import scala.util.parsing.json.JSON

### Get data from URL

This function takes a URL as an argument and returns the response from that URL after executing a GET request.

In [5]:
def ExecuteHttpGet(url: String) : Option[String] = {

  val client: OkHttpClient = new OkHttpClient();

  val headerBuilder = new Headers.Builder
  val headers = headerBuilder
    .add("content-type", "application/json")
    .build

  val result = try {
      val request = new Request.Builder()
        .url(url)
        .headers(headers)
        .build();

      val response: Response = client.newCall(request).execute()
      response.body().string()
    }
    catch {
      case _: Throwable => null
    }

  Option[String](result)
}

### Getting Station Status Data from Citi-Bike

This function gets the station status data from Citi Bike by requesting data from https://gbfs.citibikenyc.com/gbfs/en/station_status.json

We then parse the data as JSON and then use Spark's DataFrame API to filter those stations where the following conditions are met
- **`is_renting` is True**: these are stations that are currently renting bikes
- **`is_installed` is True**: these are stations that currently have a dock installed
- **`is_returning` is True**: these are stations that are currently allowing bikes to be returned
- **`station_status` is 'active'**: these are stations that are currently active

We then select the columns `station_id`, which is the unique id that is associated with each station, and `num_bikes_available`, which is the number of bikes available at that station.

We then return the data as a DataFrame

In [7]:
def GetStatusDF() : org.apache.spark.sql.DataFrame = {
    val jsonStr = ExecuteHttpGet("https://gbfs.citibikenyc.com/gbfs/en/station_status.json").getOrElse("")
    val status_df_raw = spark.read.json(Seq(jsonStr).toDS)
    status_df_raw.withColumn("data", explode($"data.stations"))
       .select("data.*")
       .filter($"is_renting" === 1 && $"is_installed" === 1 && $"is_returning" === 1 && $"station_status" === "active")
       .select("station_id",
               "num_bikes_available"
               )
}

### Getting Station Information Data from Citi-Bike

This function gets the station information data from Citi Bike by requesting data from https://gbfs.citibikenyc.com/gbfs/en/station_information.json

We then parse the data as JSON and then use Spark's DataFrame API to filter those stations where the following conditions are met
- **`capacity` is not 0**: capacity should be non-zero for the station to have returns
- **`lat` and `lon` are not 0**: stations that have 0 latitude and longitude are virtual stations

We then select the follwing columns and return the data as a DataFrame

- `station_id`
- `capacity`
- `lon`
- `lat`
- `name`

In [9]:
def GetInfoDF() : org.apache.spark.sql.DataFrame = {
    val jsonStr = ExecuteHttpGet("https://gbfs.citibikenyc.com/gbfs/en/station_information.json").getOrElse("")
    val status_df_raw = spark.read.json(Seq(jsonStr).toDS)
    status_df_raw.withColumn("data", explode($"data.stations"))
       .select("data.*")
       .filter(!($"capacity" === 0 || $"lon" === 0 || $"lat" === 0))
       .select("station_id",
               "capacity",
               "lon",
               "lat",
               "name")
}


### Calculating Discounts

The function `getDiscounts` takes a station status DataFrame and the latitude and longitude of the user to calculate discounts.

We first take the input status DataFrame and join it with the current information DataFrame on the column `station_id`.

Then, we calculate the availability of bikes at each station as `num_bikes_available / capacity`

The `calculateDistance` function takes the latitudes and longitudes of two locations as arguments and returns the distance between them in miles. We use this function to calculate the distance of each station from the user.

We then filter all stations that have an availability of less than 40% and are within a 0.8 mile radius from the user.

A reward value is calculated for each station which is directly proportional to the distance (users should get more discounts for dropping bikes at locations that are farther away), and inversely proportional to the availability at that station.

We normalize the value of reward between 10% and 20% to calculate the total discount. We also add an additional 5% discount for station with availabilty \<10%


In [11]:
def calculateDistance = udf((lat1:Double, lon1:Double,lat2:Double, lon2:Double)=>{
    val AVERAGE_RADIUS_OF_EARTH = 3958.8
    val latDistance = Math.toRadians(lat1 - lat2)
    val lngDistance = Math.toRadians(lon1 - lon2)
    val sinLat = Math.sin(latDistance / 2)
    val sinLng = Math.sin(lngDistance / 2)
    val a = sinLat * sinLat +
    (Math.cos(Math.toRadians(lat1)) *
        Math.cos(Math.toRadians(lat2)) *
        sinLng * sinLng)
    val c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a))
    (AVERAGE_RADIUS_OF_EARTH * c)
})

def filterByAvailability = udf((availability:Double)=>{
    availability < 0.4
})

def filterByDistance = udf((distance:Double)=>{
    distance < 0.8
})

def calculateReward = udf((availability:Double, distance:Double)=>{
    
    1 * distance/(if(availability == 0) 0.05 else availability)
})

def formatDiscount = udf((value:Double) => {
    ((math floor value * 100) / 100) + "%"
})

def formatLocation = udf((lat:Double, lon:Double) => {
    lat + "," + lon
})

def formatDistance = udf((distance:Double) => {
    ((math floor distance * 100) / 100) + " miles"
})

def formatAvail = udf((value:Double) => {
    ((math floor value * 10000) / 100) + "%"
})


case class StationData(
    station_id: String,
    num_bikes_available: Long,
    capacity: Long,
    distance: Double,
    lon: Double,
    lat: Double,
    name: String,
    availability: Double,
    reward: Double
)

def getDiscounts = (statusDf: org.apache.spark.sql.DataFrame, source_lat: Double, source_lon: Double) => {
    
    val infoDf = GetInfoDF()
    
    val df = statusDf.join(infoDf,Seq("station_id"),"inner")
                     .withColumn("availability",col("num_bikes_available")/col("capacity"))
                     .withColumn("source_lat",lit(source_lat))
                     .withColumn("source_lon",lit(source_lon))
                     .withColumn("distance", calculateDistance(col("lat"), col("lon"),col("source_lat"), col("source_lon")))
                     .withColumn("reward", calculateReward(col("availability"), col("distance")))
                     .as[StationData]
                     
    val t = df.filter(filterByDistance($"distance"))
              .filter(filterByAvailability($"availability"))
    
    val (max_r,min_r) = t.select(max("reward"), min("reward"))
                         .as[(Double, Double)]
                         .first()
                         
    val finalDf = t.withColumn("discountVal",  when($"availability".gt(0.1), (($"reward" - min_r) * 10/ (max_r - min_r) + 10))
                                              .otherwise((($"reward" - min_r) * 10/ (max_r - min_r) + 15)))
                  .withColumn("discount", formatDiscount(col("discountVal")))
                  .withColumn("location", formatLocation(col("lat"), col("lon")))
                  .withColumn("distance", formatDistance(col("distance")))
                  .withColumn("availability", formatAvail(col("availability")))
                  .sort(col("discountVal").desc)
                  .select("station_id", "name", "location", "num_bikes_available", "capacity", "distance", "availability", "discount")
    z.show(finalDf)
    finalDf
}


 
### Demo of calculating discounts at current time

A user can input their current location in the textbox in the next cell and we can call `GetStatusDF()` to get the current status of stations and calculate discounts by calling `getDiscounts(currentStatus, user_lat, user_lon)`

In [13]:
val coordinates = ("" + z.textbox("latitude, longitude")).split(",").map(_.trim)

val user_lat = coordinates(0).toDouble
val user_lon = coordinates(1).toDouble

In [14]:
val currentStatus = GetStatusDF()
getDiscounts(currentStatus, user_lat, user_lon)

### Predicting Bike Availability at a Given Time 

The function `getPredictedStatusDf` takes as a time string in the format "HH:MM" as an argument and returns a DataFrame representing the predicted bike availability at that time.

We have stored the `station_status.json` files for an entire day in the directory `citibike_files/files/`. For our project this is data for a single day, but we can use data for an entire week and use the data for the same weekday as the user requests to improve the accuracy of our prediction.

We filter files which have data for times within 10 minutes of the requested time. We then parse all these files and get a pair RDD of `(station_id, num_bikes_available)`

We use this pair RDD to calculate the average number of bikes available at each station at the given time and return the result as a DataFrame

In [16]:
def getPredictedStatusDf = (inputTime: String) => {
    val paths = sc.parallelize(fs.listStatus(new Path("/user/aj3187_nyu_edu/citibike_files/files/")).map(_.getPath.toString))
    val queryTime = inputTime.split(":").map(_.trim)
    val timeInMinutes = queryTime(0).toInt * 60 + queryTime(1).toInt
    
    val pathsDf = paths.toDF("files")
                   .withColumn("ts", $"files".substr(64,10))
                   .withColumn("time", from_unixtime($"ts"))
                   .withColumn("minutes", minute($"time"))
                   .withColumn("hour", hour($"time"))
                   .withColumn("totalMinutes", $"hour" * 60 + $"minutes")
                   .filter(abs($"totalMinutes" - timeInMinutes) < 10)
                   .select("files")
                   
    val pathsString = pathsDf.rdd.map(p => p(0).toString).collect().mkString(",")
    
    val jsonsRdd = sc.textFile(pathsString)
    
    val statusData = jsonsRdd.flatMap(
                    r => JSON.parseFull(r).getOrElse(0)
                             .asInstanceOf[Map[String,Any]]
                             .get("data").getOrElse(0).asInstanceOf[Map[String,Any]]
                             .get("stations").getOrElse(0).asInstanceOf[List[Map[String,String]]]
                             .map(m => m.get("station_id").getOrElse(0) -> m.get("num_bikes_available").getOrElse(0))
                    )
                    .collect()
    
    val statusDataAvg = statusData.groupBy(_._1)
                                  .mapValues(xs => xs.map(x => x._2).reduce((x,y) => x.toString.toDouble + y.toString.toDouble) -> xs.length)
                                  .mapValues(xs => Math.ceil(xs._1.toString.toDouble / xs._2.toString.toDouble).toLong)
                                  .toArray
                                  .map(x => x._1.toString -> x._2)
                                  
    sc.parallelize(statusDataAvg).toDF("station_id","num_bikes_available")    
}

  



 
### Demo of calculating discounts at a given time

A user can input a time in the textbox in the next cell and we can call `getPredictedStatusDf(inputTime)` to get the predicted status of stations at that time and calculate discounts by calling `getDiscounts(predictedStatus, user_lat, user_lon)`

In [18]:
val inputTime = ("" + z.textbox("time to drop (HH:MM)"))

In [19]:
val predictedStatus = getPredictedStatusDf(inputTime)
getDiscounts(predictedStatus, user_lat, user_lon)