# Step 1) Read Data from Google Cloud Storage

In [3]:
dfTaxi = spark.read.format("csv").option("header", "true").option("inferschema", "true").load("gs://concrete-domain-259908/train.csv")

In [40]:
dfTaxi.head()

Row(TRIP_ID=1372636858620000589, CALL_TYPE=u'C', ORIGIN_CALL=None, ORIGIN_STAND=None, TAXI_ID=20000589, TIMESTAMP=1372636858, DAY_TYPE=u'A', MISSING_DATA=False, POLYLINE=u'[[-8.618643,41.141412],[-8.618499,41.141376],[-8.620326,41.14251],[-8.622153,41.143815],[-8.623953,41.144373],[-8.62668,41.144778],[-8.627373,41.144697],[-8.630226,41.14521],[-8.632746,41.14692],[-8.631738,41.148225],[-8.629938,41.150385],[-8.62911,41.151213],[-8.629128,41.15124],[-8.628786,41.152203],[-8.628687,41.152374],[-8.628759,41.152518],[-8.630838,41.15268],[-8.632323,41.153022],[-8.631144,41.154489],[-8.630829,41.154507],[-8.630829,41.154516],[-8.630829,41.154498],[-8.630838,41.154489]]')

# Step 2) Save the Data

In [8]:
dfTaxi.write.mode("overwrite").option("header", "true").format("csv").saveAsTable("TaxiCSV")

In [9]:
dfTaxi.write.mode("overwrite").option("header", "true").format("parquet").saveAsTable("TaxiParquet")

# Step 3) Query Data

## Simple Queries

In [39]:
sqlContext.sql("""
SELECT COUNT(*)
FROM TaxiCSV
""").show()
%time

+--------+
|count(1)|
+--------+
| 1710670|
+--------+

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 8.11 µs


In [23]:
sqlContext.sql("""
SELECT COUNT(*)
FROM TaxiParquet
""").show()
%time

+--------+
|count(1)|
+--------+
| 1710670|
+--------+

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 6.91 µs


## Complex Queries

## #1 

In [41]:
sqlContext.sql(
"""
SELECT SUM(Case when DAY_TYPE = 'B' then 1 else 0 end) as Trips_On_Holiday
      , SUM(Case when DAY_TYPE = 'C' then 1 else 0 end) as Trips_Before_Holiday
      , SUM(Case when DAY_TYPE = 'A' then 1 else 0 end) as Trips_On_NormalDays
FROM TaxiCSV
"""
).show()
%time

+----------------+--------------------+-------------------+
|Trips_On_Holiday|Trips_Before_Holiday|Trips_On_NormalDays|
+----------------+--------------------+-------------------+
|               0|                   0|            1710670|
+----------------+--------------------+-------------------+

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 8.11 µs


In [42]:
sqlContext.sql(
"""
SELECT SUM(Case when DAY_TYPE = 'B' then 1 else 0 end) as Trips_On_Holiday
      , SUM(Case when DAY_TYPE = 'C' then 1 else 0 end) as Trips_Before_Holiday
      , SUM(Case when DAY_TYPE = 'A' then 1 else 0 end) as Trips_On_NormalDays
FROM TaxiParquet
"""
).show()
%time

+----------------+--------------------+-------------------+
|Trips_On_Holiday|Trips_Before_Holiday|Trips_On_NormalDays|
+----------------+--------------------+-------------------+
|               0|                   0|            1710670|
+----------------+--------------------+-------------------+

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 6.91 µs


## #2

In [43]:
sqlContext.sql("""SELECT SUM(Case when MISSING_DATA = false then 1 else 0 end) as No_Missing_Data
      , SUM(Case when MISSING_DATA = true then 1 else 0 end) as Missing_Data
FROM taxicsv""").show()
%time

+---------------+------------+
|No_Missing_Data|Missing_Data|
+---------------+------------+
|        1710660|          10|
+---------------+------------+

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 27.2 µs


In [44]:
sqlContext.sql("""SELECT SUM(Case when MISSING_DATA = false then 1 else 0 end) as No_Missing_Data
      , SUM(Case when MISSING_DATA = true then 1 else 0 end) as Missing_Data
FROM taxiparquet""").show()
%time

+---------------+------------+
|No_Missing_Data|Missing_Data|
+---------------+------------+
|        1710660|          10|
+---------------+------------+

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 7.87 µs
