In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import plotly.graph_objects as go
from pyspark.sql.functions import count, col,  mean as _mean, desc, date_trunc

spark = SparkSession.builder.appName("Data Sprints Technical Test").getOrCreate()

In [2]:
trips_path = ["data/data-sample_data-nyctaxi-trips-2009-json_corrigido.json",
              "data/data-sample_data-nyctaxi-trips-2010-json_corrigido.json",
              "data/data-sample_data-nyctaxi-trips-2011-json_corrigido.json",
              "data/data-sample_data-nyctaxi-trips-2012-json_corrigido.json"]
vendor_path = "data/data-vendor_lookup-csv.csv"
payments_path = "data/data-payment_lookup-csv.csv"

In [3]:
trips_data = spark.read.json(trips_path)
trips_data.printSchema()

root
 |-- dropoff_datetime: string (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_fwd_flag: long (nullable = true)
 |-- surcharge: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- vendor_id: string (nullable = true)



In [4]:
payments_data = spark.read.csv(payments_path).rdd.zipWithIndex()
payments_data = payments_data.filter(lambda x: x[1] > 1).map(lambda x: x[0]).toDF(['payment_type', 'payment_lookup'])
payments_data.printSchema()


root
 |-- payment_type: string (nullable = true)
 |-- payment_lookup: string (nullable = true)



In [35]:
vendors_data = spark.read.csv(vendor_path, header=True, inferSchema=True)
vendors_data.printSchema()
vendors_data_pd = vendors_data.toPandas()

root
 |-- vendor_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- current: string (nullable = true)



## Quesitos Mínimos

### 1. Qual a distância média percorrida por viagens com no máximo 2 passageiros?

In [17]:
distances = trips_data.filter(trips_data.passenger_count <= 2)\
                      .select(_mean(col('trip_distance')).alias('average_trip_distance'))

pd_distances = distances.toPandas()

fig = go.Figure(go.Indicator(
    title = {'text': "Average distance for trips with maximum of 2 passengers"},
    mode = "number",
    value = pd_distances.average_trip_distance[0],
    number = {'suffix': " miles"},
    domain = {'x': [0, 1], 'y': [0, 1]}))

fig.show()

### 2. Quais os 3 maiores vendors em quantidade total de dinheiro arrecadado?

In [38]:
top_3_vendors = trips_data.select('vendor_id','total_amount')\
                          .groupBy('vendor_id').sum()\
                          .select('vendor_id', col("sum(total_amount)").alias("total"))\
                          .orderBy(desc('total'))

top_3_vendors_pd = top_3_vendors.toPandas()[0:3]

top_3_vendors_pd = top_3_vendors_pd.merge(vendors_data_pd[['vendor_id', 'name']], on='vendor_id' )

top_3_vendors_pd.sort_values(by="total", inplace=True)

fig = go.Figure(go.Bar(
    x=top_3_vendors_pd.total,
    y=top_3_vendors_pd.name,
    orientation='h'))
fig.update_layout(title = "Top 3 Vendors")
fig.show()

### 3. Faça um histograma da distribuição mensal, nos 4 anos, de corridas pagas em dinheiro

In [39]:
query = """
SELECT date_trunc('MONTH', trips.pickup_datetime) as trip_month,                                                                   
       count(trips.pickup_datetime) as trip_count
FROM trips
JOIN payments on trips.payment_type = payments.payment_type
WHERE payments.payment_lookup = 'Cash'
GROUP BY trip_month
ORDER BY trip_month
"""

histogram_data = spark.sql(query)

from pyspark.sql.functions import count, col,  mean as _mean, desc, date_trunc


histogram_data_pd = histogram_data.toPandas()

fig = go.Figure(go.Bar(
    x=histogram_data_pd.trip_month,
    y=histogram_data_pd.trip_count,
    orientation='v'))
fig.update_layout(title = "Montly Distribution Between 2009 and 2012")
fig.show()

### 4. Faça um gráfico de série temporal contando a quantidade de gorjetas de cada dia nos últimos 3 meses de 2012.

In [9]:
query = """
SELECT DATE(pickup_datetime) as day, 
       SUM(tip_amount) as tips
FROM trips 
WHERE extract(YEAR from pickup_datetime) = 2012 
  AND extract(MONTH from pickup_datetime) IN (10, 11, 12)
GROUP BY DATE(pickup_datetime)
ORDER BY DATE(pickup_datetime)
"""

tips = spark.sql(query)

pd_tips = tips.toPandas()

fig = go.Figure(data=go.Scatter(x=pd_tips.day, y=pd_tips.tips))
fig.update_layout(title = "Tips ammout on October, November and December 2012")
fig.show()

## Quesitos bônus

### Qual o tempo médio das corridas nos dias de sábado e domingo?

In [10]:
trips_data = trips_data.withColumn("trip_duration",(col("dropoff_datetime").cast("timestamp").cast("long") - col("pickup_datetime").cast("timestamp").cast("long"))/60.)
trips_data.createOrReplaceTempView("trips")

query = """
SELECT AVG(trip_duration) as average_trip_duration
FROM trips 
WHERE DAY(pickup_datetime) IN (1,7)
"""

trip_duration = spark.sql(query)
pd_trip_duration = trip_duration.toPandas()

fig = go.Figure(go.Indicator(
    title = {'text': "Average duration for trips in the weekend"},
    mode = "number",
    value = pd_trip_duration.average_trip_duration[0],
    number = {'suffix': " minutes"},
    domain = {'x': [0, 1], 'y': [0, 1]}))

fig.show()

### Fazer uma visualização em mapa com latitude e longitude de pickups and dropoffs no ano de 2010

In [22]:
from pyspark.sql.functions import year 
trip_coordinates = trips_data.filter(year(trips_data.pickup_datetime)==2010)\
                             .filter(trips_data.pickup_latitude!=0)\
                             .filter(trips_data.pickup_longitude!=0)\
                             .filter(trips_data.dropoff_latitude!=0)\
                             .filter(trips_data.dropoff_longitude!=0)\
                             .select('pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude')

trip_coordinates_pd = trip_coordinates.toPandas()

Unnamed: 0,pickup_latitude,pickup_longitude,dropoff_latitude,dropoff_longitude
0,40.742950,-74.004114,40.747950,-73.994712
1,40.747784,-73.996506,40.792385,-73.940449
2,40.752070,-73.951340,40.768108,-73.944535
3,40.729128,-74.001117,40.721812,-73.958122
4,40.756873,-73.976600,40.776075,-73.979757
...,...,...,...,...
984512,40.737280,-73.988602,40.721065,-73.987888
984513,40.784441,-73.947063,40.738477,-73.987610
984514,40.743763,-74.006156,40.764069,-73.974005
984515,40.783816,-73.979615,40.764176,-73.988470


In [40]:
short_trip_coordinates_pd = trip_coordinates_pd[0:500]

fig = go.Figure()

fig.add_trace(go.Scattermapbox(
    mode = "markers",
        lon = short_trip_coordinates_pd['pickup_longitude'],
    lat = short_trip_coordinates_pd['pickup_latitude'],
    marker = {'size': 10, 'opacity': 0.5},
    name="Pickups"))

fig.add_trace(go.Scattermapbox(
    mode = "markers",
        lon = short_trip_coordinates_pd['dropoff_longitude'],
    lat = short_trip_coordinates_pd['dropoff_latitude'],
    marker = {'size': 10, 'opacity': 0.5},
    name="Dropoffs"))

fig.update_layout(
    margin ={'l':0,'t':0,'b':0,'r':0},
    mapbox = {
        'center': {'lon': -74, 'lat': 40.73},
        'style': "open-street-map",
        'center': {'lon': -74, 'lat': 40.73},
        'zoom': 10})

fig.show()