# New York Taxis Analysis

## Load Traffic data

In [None]:
val rawDataDF = spark.read.format("csv").option("header","true").load("taxis.csv")

## Define Processing functions

### Function to calculate air distance between two coordinates

In [None]:
import java.lang.Math; 
def travelDistanceInKM(departureLat: Double, departureLon: Double, arrivalLat: Double, arrivalLon: Double): Double = {
  var earthRadiusKm = 6371;
  var distanceLat = Math.toRadians(arrivalLat-departureLat);
  var distanceLon = Math.toRadians(arrivalLon-departureLon);
  var departureLatInRadians = Math.toRadians(departureLat);
  var arrivalLatInRadians = Math.toRadians(arrivalLat);
  var a = Math.sin(distanceLat/2) * Math.sin(distanceLat/2) +
          Math.sin(distanceLon/2) * Math.sin(distanceLon/2) * Math.cos(departureLatInRadians) * Math.cos(arrivalLatInRadians); 
  var c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a)); 
  return earthRadiusKm * c;
}

### Function to derive day string from timestamp

In [None]:
import java.text.SimpleDateFormat;
def travelTimetoDay(dateTime: String): String = {
  val sdfTime = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
  val sdfDay= new SimpleDateFormat("E");
  return sdfDay.format(sdfTime.parse(dateTime));
}

### Function to derive timerange from timestamp

In [None]:
def travelTimetoRange(dateTime: String): String = {
  val sdfTime = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
  val sdfHour = new SimpleDateFormat("H");
  sdfHour.format(sdfTime.parse(dateTime)).toInt / 4 match {
      case 0 => return "0h-4h"
      case 1 => return "4h-8h"
      case 2 => return "8h-12h"
      case 3 => return "12h-16h"
      case 4 => return "16h-20h"
      case 5 => return "20h-24h"
  }   
}

## Register processing function to spark session to use on input data frame

In [None]:
val travelDistance = spark.udf.register("travelDistance",travelDistanceInKM _);
val travelDay = spark.udf.register("travelDay",travelTimetoDay _);
val travelTimeRange = spark.udf.register("travelTimeRange",travelTimetoRange _);

## Process input data frame to add new columns

In [None]:
val processedDataDF = rawDataDF
.withColumn("duration in hours",col("trip_duration") / (60 * 60))
.withColumn("distance in km",travelDistance(col("pickup_latitude"),col("pickup_longitude"),col("dropoff_latitude"),col("dropoff_longitude")))
.withColumn("speed in km/h", col("distance in km") / col("duration in hours"))
.withColumn("day",travelDay(col("pickup_datetime")))
.withColumn("timerange",travelTimeRange(col("pickup_datetime")))

## Results

### Travel speeds

In [None]:
val speedDataDF = processedDataDF.select("id","speed in km/h")

In [None]:
speedDataDF.show(10)

## Number of travels on each day of the week

In [None]:
val travelsPerDayDF = processedDataDF.groupBy("day").count()

In [None]:
travelsPerDayDF.show(10)

## Number of travels on each time range

In [None]:
val travelsPerHours = processedDataDF.groupBy("timerange").count()

In [None]:
travelsPerHours.show(10)

## Total distance on each day of the week

In [None]:
val distancePerDayDF = processedDataDF.groupBy("day").sum("distance in km")

In [None]:
distancePerDayDF.show(10)