In [11]:
import numpy
import time
import math

In [12]:
import glob

csv_files = glob.glob('/data/tmp/parquet/trip_data/*.csv')
csv_files = sorted(csv_files, key=lambda f: f)
print '\n'.join(csv_files)


/data/tmp/parquet/trip_data/trip_data_1.csv
/data/tmp/parquet/trip_data/trip_data_10.csv
/data/tmp/parquet/trip_data/trip_data_11.csv
/data/tmp/parquet/trip_data/trip_data_12.csv
/data/tmp/parquet/trip_data/trip_data_2.csv
/data/tmp/parquet/trip_data/trip_data_3.csv
/data/tmp/parquet/trip_data/trip_data_4.csv
/data/tmp/parquet/trip_data/trip_data_5.csv
/data/tmp/parquet/trip_data/trip_data_6.csv
/data/tmp/parquet/trip_data/trip_data_7.csv
/data/tmp/parquet/trip_data/trip_data_8.csv
/data/tmp/parquet/trip_data/trip_data_9.csv


In [13]:
ssql = SQLContext(sc)

In [None]:
%%time
dfs = []
for csv_file in csv_files:
    tstart = time.time()
    df = ssql.read.format('com.databricks.spark.csv')\
        .options(header='false', inferschema='true')\
        .load(csv_file)
    dfs.append(df)
    print '%s = %d' % (csv_file, time.time() - tstart)


In [None]:
%%time
# write all dataframes to a parquet 
for df in dfs:
    df.write.mode("append").parquet("/data/tmp/parquet/trip_data.parquet")


In [14]:
%%time
# load the parquet back 
pdf = ssql.read.parquet("/data/tmp/parquet/trip_data.parquet")

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 136 ms


In [15]:
print pdf.count()
pdf.printSchema()

173179771
root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)



In [19]:
%%time
pdf.createOrReplaceTempView("trips")
df = ssql.sql('select count(*), avg(_c8), avg(_c9) from trips where _c12 > 37 and _c12 < 50')
df.show()

+--------+------------------------+------------------------+
|count(1)|avg(CAST(_c8 AS DOUBLE))|avg(CAST(_c9 AS DOUBLE))|
+--------+------------------------+------------------------+
|  533566|       808.8370323446396|      3.1116808792164283|
+--------+------------------------+------------------------+

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 6.78 s


In [None]:
%%time
# partition the dataframes in two parquets to
# test performance of 'join'

# write one half of dataframes to a parquet 1
for df in dfs[0:len(dfs)/2]:
    df.write.mode("append").parquet("/data/tmp/parquet/trip_data.parquet1")

for df in dfs[len(dfs)/2:]:
    df.write.mode("append").parquet("/data/tmp/parquet/trip_data.parquet2")

pdf1 = ssql.read.parquet("/data/tmp/parquet/trip_data.parquet1")
pdf2 = ssql.read.parquet("/data/tmp/parquet/trip_data.parquet2")
pdf1.createOrReplaceTempView("t1")
pdf2.createOrReplaceTempView("t2")


In [None]:
%%time
df = ssql.sql('select avg(t1._c8), avg(t2._c9) \
    from t1, t2 \
    where t1._c12 > 10 and t1._c12 < 37\
    and   t2._c12 > 37 and t2._c12 < 50\
    and   t1._c12 < t2._c12\
    ')
df.show()

In [None]:
%%time
df = ssql.sql('select count(*), avg(_c8), avg(_c9) from t2 where _c12 > 37 and _c12 < 41')
df.show()

In [None]:
pdf1.count()

In [None]:
%%time
# compare parquet vs csv performance
cdf = ssql.read.format('com.databricks.spark.csv')\
    .options(header='false', inferschema='true')\
    .load('/data/tmp/parquet/trip_data/*.csv')


In [None]:
%%time
cdf.createOrReplaceTempView("trips")
df = ssql.sql('select avg(_c8), avg(_c9) from trips where _c12 > 37 and _c12 < 50')
df.show()