In [0]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.FloatType

 
This notebook will read NY Taxi data , transform it and save it to parquet format 

In [2]:

lazy val schema =
StructType(Array(
StructField("VendorId", IntegerType),
StructField("tpep_pickup_datetime", TimestampType),
StructField("tpep_dropoff_datetime", TimestampType),
StructField("passenger_count", IntegerType),
StructField("trip_distance", FloatType),
StructField("pickup_longitude", FloatType),
StructField("pickup_latitude", FloatType),
StructField("RatecodeID", IntegerType),
StructField("store_and_fwd_flag", StringType),
StructField("dropoff_longitude", FloatType),
StructField("dropoff_latitude", FloatType),
StructField("payment_type", IntegerType),
StructField("fare_amount", FloatType),
StructField("extra", FloatType),
StructField("mta_tax", FloatType),
StructField("tip_amount", FloatType),
StructField("tolls_amount", FloatType),
StructField("improvement_surcharge", FloatType),
StructField("total_amount", FloatType) 
  ))

In [3]:
// Either copy the data to your bucket or read it from the pulic bucket
// aws: after ssh into master: 
// wget https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2016-02.csv
// aws s3 cp yellow_tripdata_2016-02.csv s3://mybucket/data/

//var df = spark.read.option("inferSchema", "false").option("header", true).schema(schema).csv("s3://bucketcjm/data/yellow_tripdata_2016-02.csv") 

var df = spark.read.option("inferSchema", "false").option("header", true).schema(schema).csv("s3://nyc-tlc/trip data/yellow_tripdata_2018-01.csv")

In [4]:
df.show

In [5]:
def dropUseless(dataFrame: DataFrame): DataFrame = {
    dataFrame.drop(
      "payment_type",
      "extra",
      "mta_tax",
      "tip_amount",
      "tolls_amount",
      "improvement_surcharge",
      "total_amount")
  }

In [6]:
df = dropUseless(df)

In [7]:
df = df.withColumnRenamed("tpep_pickup_datetime", "pickup_datetime")
df = df.withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")
df = df.withColumnRenamed("VendorID", "vendor_id")
df = df.withColumnRenamed("store_and_fwd_flag", "store_and_fwd")
df = df.withColumnRenamed("RatecodeID", "rate_code")

In [8]:
z.show (df)

In [9]:
val start = unix_timestamp(col("pickup_datetime")).cast(LongType)
val end = unix_timestamp(col("dropoff_datetime")).cast(LongType)
df = df.withColumn("trip_time", (end-start))

In [10]:
// get day of week and hour
val datetime = col("pickup_datetime")
df = df.withColumn("year", year(datetime)).withColumn("month", month(datetime))
      .withColumn("day", dayofmonth(datetime))
      .withColumn("day_of_week", dayofweek(datetime))
      .withColumn("is_weekend",col("day_of_week").isin(1, 7).cast(IntegerType))  // 1: Sunday, 7: Saturday
      .withColumn("hour", hour(datetime))
      .drop("dropoff_datetime","pickup_datetime")

In [11]:
df.schema


In [12]:
// filter out anomalous values
df = df.filter(!($"trip_distance" < 1 and $"fare_amount" > 15 ))
df = df.filter(!($"trip_distance" < 10 and $"fare_amount" > 40 ))
df = df.filter($"trip_distance" > 0 and $"trip_distance" < 100)
df = df.filter($"fare_amount" > 0 and $"fare_amount" < 100)
df = df.filter($"passenger_count" > 0 and $"passenger_count" < 6)
df = df.filter($"pickup_longitude" > -75 and $"pickup_longitude" < -73)
df = df.filter($"dropoff_longitude" > -75 and $"dropoff_longitude" < -73)
df = df.filter($"pickup_latitude" > 40 and $"pickup_latitude" < 42)
df = df.filter($"dropoff_latitude" > 40 and $"dropoff_latitude" < 42)

In [13]:
df.show

In [14]:
// filter out anomouslous trip_time values
df = df.filter(!($"trip_time" < 1000 and $"fare_amount" > 40 ))
df = df.filter($"trip_time" > 10 )
df = df.filter($"trip_time" <  40000)

In [15]:
df.show

In [16]:
// calculate haversine distance  
def addHDistance(dataFrame: DataFrame): DataFrame = {
    val P = math.Pi / 180
    val lat1 = col("pickup_latitude")
    val lon1 = col("pickup_longitude")
    val lat2 = col("dropoff_latitude")
    val lon2 = col("dropoff_longitude")
    val internalValue = (lit(0.5)
      - cos((lat2 - lat1) * P) / 2
      + cos(lat1 * P) * cos(lat2 * P) * (lit(1) - cos((lon2 - lon1) * P)) / 2)
    val hDistance = lit(12734) * asin(sqrt(internalValue))
    dataFrame.withColumn("h_distance", hDistance)
  }

In [17]:
df = addHDistance(df)

In [18]:
df = df.filter(!($"h_distance" < 1 and $"fare_amount" >15))
df = df.filter(!($"h_distance" < 10 and $"fare_amount" > 40 ))
df = df.filter($"h_distance" > 1 and $"h_distance" < 100)

In [19]:
// round off location values
df = df.withColumn("pickup_longitude", round( df("pickup_longitude"),3)).withColumn("pickup_latitude", round( df("pickup_latitude"),3))
df = df.withColumn("dropoff_longitude", round( df("dropoff_longitude"),3)).withColumn("dropoff_latitude", round( df("dropoff_latitude"),3))

In [20]:
df = df.withColumn("h_distance", round( $"h_distance",3))

In [21]:
df.show

In [22]:
df.select("h_distance","fare_amount","trip_time").describe().show

In [23]:
df.schema


In [24]:
df.createOrReplaceTempView("taxi")


In [25]:
%%sql
select hour, avg(fare_amount), avg(h_distance)
from taxi
group by hour order by hour

In [26]:
df.write.mode("overwrite").parquet("s3://bucketcjm/data/pyellow_tripdata_2016-02")


In [27]:
df.unpersist

In [28]:
var tdf = spark.read.parquet("s3://bucketcjm/data/pyellow_tripdata_2016-02") 

In [29]:
tdf.show

In [30]:
tdf.cache
tdf.createOrReplaceTempView("taxi")
spark.catalog.cacheTable("taxi")

In [31]:
tdf.select("h_distance","fare_amount","trip_time").describe().show

In [32]:
tdf.groupBy("hour").avg("fare_amount", "h_distance").orderBy("hour").show()

In [33]:
tdf.groupBy("hour").avg("fare_amount", "h_distance").orderBy("hour").explain("formatted")

In [34]:
val df3 = tdf.groupBy("hour").count

df3.orderBy(asc("hour"))show(5)

In [35]:
df3.explain("formatted")

In [36]:
tdf.groupBy("hour").count.orderBy("hour").show()

In [37]:
%%sql
select * from taxi

In [38]:
%%sql
select trip_distance, rate_code, fare_amount, is_weekend, day_of_week from taxi


In [39]:
%%sql
select trip_distance,avg(trip_distance),  avg(fare_amount)
from taxi
group by trip_distance order by avg(trip_distance) desc


In [40]:
%%sql
select hour, avg(fare_amount), avg(trip_distance)
from taxi
group by hour order by hour 

In [41]:
%%sql
select rate_code, avg(fare_amount) , avg(trip_distance)
from taxi
group by rate_code order by rate_code


In [42]:
%%sql
