**Realtime-Data-Analytics-Using-Spark**

In [6]:
import pandas as pd
pd_df = pd.read_csv("Uber-Jan-Feb-FOIL.csv", header=0)
pd_df.head()

Unnamed: 0,dispatching_base_number,date,active_vehicles,trips
0,B02512,1/1/2015,190,1132
1,B02765,1/1/2015,225,1765
2,B02764,1/1/2015,3427,29421
3,B02682,1/1/2015,945,7679
4,B02617,1/1/2015,1228,9537


In [8]:
sqlc = sqlContext.read.format("com.databricks.spark.csv") \
    .options(header="true", inferschema="true").load("Uber-Jan-Feb-FOIL.csv")
sqlc.take(5)    

[Row(dispatching_base_number=u'B02512', date=u'1/1/2015', active_vehicles=190, trips=1132),
 Row(dispatching_base_number=u'B02765', date=u'1/1/2015', active_vehicles=225, trips=1765),
 Row(dispatching_base_number=u'B02764', date=u'1/1/2015', active_vehicles=3427, trips=29421),
 Row(dispatching_base_number=u'B02682', date=u'1/1/2015', active_vehicles=945, trips=7679),
 Row(dispatching_base_number=u'B02617', date=u'1/1/2015', active_vehicles=1228, trips=9537)]

**Register a TempTable to make the Spark SQL much more like SQL.**

e.g. we can use the registered temp name behave like SQL Table Name.

In [9]:
sqlc.registerTempTable("uber")
sqlc

DataFrame[dispatching_base_number: string, date: string, active_vehicles: int, trips: int]

In [10]:
sqlc.printSchema()

root
 |-- dispatching_base_number: string (nullable = true)
 |-- date: string (nullable = true)
 |-- active_vehicles: integer (nullable = true)
 |-- trips: integer (nullable = true)



In [13]:
# Using sqlContext
sqlContext.sql("SELECT DISTINCT dispatching_base_number FROM uber").collect()

[Row(dispatching_base_number=u'B02512'),
 Row(dispatching_base_number=u'B02598'),
 Row(dispatching_base_number=u'B02682'),
 Row(dispatching_base_number=u'B02765'),
 Row(dispatching_base_number=u'B02617'),
 Row(dispatching_base_number=u'B02764')]

In [14]:
# Using pandas
pd_df.dispatching_base_number.unique()

array(['B02512', 'B02765', 'B02764', 'B02682', 'B02617', 'B02598'],
      dtype=object)

**What Bases are the busiest?**

In [17]:
sqlContext.sql("""SELECT DISTINCT(`dispatching_base_number`),
                SUM(`trips`) AS TripCount FROM uber
                GROUP BY `dispatching_base_number`
                ORDER BY TripCount DESC
                """).show()

+-----------------------+---------+
|dispatching_base_number|TripCount|
+-----------------------+---------+
|                 B02764|  1914449|
|                 B02617|   725025|
|                 B02682|   662509|
|                 B02598|   540791|
|                 B02765|   193670|
|                 B02512|    93786|
+-----------------------+---------+



**What Dates are the busiest?**

In [18]:
sqlContext.sql("""SELECT DISTINCT(date),
                SUM(`trips`) AS TripCount FROM uber
                GROUP BY date
                ORDER BY TripCount DESC LIMIT 10
                """).show()

+---------+---------+
|     date|TripCount|
+---------+---------+
|2/20/2015|   100915|
|2/14/2015|   100345|
|2/21/2015|    98380|
|2/13/2015|    98024|
|1/31/2015|    92257|
|2/15/2015|    89401|
|2/27/2015|    88806|
|2/19/2015|    88757|
|2/28/2015|    88181|
| 2/6/2015|    85940|
+---------+---------+

