# Exercise file formats speed comparison in Spark 2.0

## Start Spark session



https://spark.apache.org/docs/latest/sql-getting-started.html

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

## Load test data into dataframe and inspect

In [4]:
df = spark.read.load("dataset_1e7.csv", format="csv", header = "True")
df.show(10)
#print('Schema is:')
#df.printSchema()

+---+----+---------------------+-------+--------------------+
| id|name|number_family_members|country|               Covid|
+---+----+---------------------+-------+--------------------+
|  0|   1|        Crystal Smith|      3|   Equatorial Guinea|
|  1|   2|      Bradley Walters|      2|                Mali|
|  2|   3|           Chad Gross|      3|        Burkina Faso|
|  3|   4|        Kathryn Smith|      3|             Moldova|
|  4|   5|          Sarah Stout|      5|             Moldova|
|  5|   6|         Daniel Smith|      5|United States Vir...|
|  6|   7|        Ashley Martin|      2|              Norway|
|  7|   8|     Jamie Harrington|      4|               Chile|
|  8|   9|          Mark Mendez|      3|             Georgia|
|  9|  10|        Rebecca Fritz|      1|Central African R...|
+---+----+---------------------+-------+--------------------+
only showing top 10 rows



In [11]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- number_family_members: string (nullable = true)
 |-- country: string (nullable = true)
 |-- Covid: string (nullable = true)



# Write times


In [5]:
def write(fmt):
    if fmt == "json":
        df.write.json("file:///home/jovyan/work/fileformats/output/proto.json")
    elif fmt == "csv":
        df.write.csv("file:///home/jovyan/work/fileformats/output/proto.csv")
    elif fmt == "avro":
        df.write.avro("file:///home/jovyan/work/fileformats/proto.avro")
    elif fmt == "parquet":
        df.write.parquet("file:///home/jovyan/work/fileformats/output/proto.parquet")

def write_2(fmt): # Using f-strings
    df.write.format(f'{fmt}').save(f'file:///home/jovyan/work/fileformatsfile:///home/jovyan/work/fileformats/output/proto.{fmt}',header = 'true')

In [6]:
import shutil
import os
import time

if os.path.exists("./output/"):
    shutil.rmtree("./output/")
    print('dirtee deleted')
    os.makedirs("./output/")
    print('created empty dir \n')

for fmt in ['csv','json', 'parquet']:
    print(fmt)
    start = time.time()
    write_2(fmt)
    print(time.time() - start)
    print('\n')
    

csv
13.938474893569946


json
18.796042680740356


parquet
21.96593976020813




# Read times

In [9]:
def read(fmt):
    if fmt == "json":
        sdf = spark.read.option("header", "true").json("file:///home/jovyan/work/fileformats/output/proto.json")
    elif fmt == "csv":
        sdf = spark.read.option("header","true").csv("file:///home/jovyan/work/fileformats/output/proto.csv")
    elif fmt == "avro":
        sdf = spark.read.format("avro").option("header","true").load("file:///home/jovyan/work/fileformats/output/proto.avro")
    elif fmt == "parquet":
        sdf = spark.read.option("header","true").parquet("file:///home/jovyan/work/fileformats/output/proto.parquet")
    return sdf

In [10]:
for fmt in ['csv','json', 'parquet']:
    print(fmt)
    start = time.time()
    sdf = read(fmt)
  #  sdf.printSchema()
    print(time.time() - start)
    print('\n')
    

csv
0.5632686614990234


json
5.866176128387451


parquet
0.38027310371398926




# Retrieve info from files

## Insert new record

In [12]:
# https://medium.com/@mike_82447/add-a-row-to-a-spark-dataframe-4e52f869c17b
new_record = [['10000000','John Doe', '8', 'Netherlands', '0']]
new_record_df = spark.createDataFrame(new_record)


In [13]:
for fmt in ['csv','json', 'parquet']:
    print(fmt)
    start = time.time()
    sdf = read(fmt)
    sdf.union(new_record_df)
    print(time.time() - start)
    print('\n')

csv
0.5563492774963379


json
7.298802614212036


parquet
0.2456364631652832




In [18]:
sdf = read('csv')
sdf.groupBy("Covid").count().show()

+--------------------+-----+
|               Covid|count|
+--------------------+-----+
|                Chad|40892|
|            Paraguay|40814|
|            Anguilla|40878|
|               Macao|40950|
|Heard Island and ...|40972|
|               Yemen|40779|
|             Senegal|40607|
|             Tokelau|41085|
|              Sweden|40679|
|French Southern T...|40844|
|            Kiribati|40461|
|              Guyana|40817|
|              Jersey|40852|
|         Philippines|40738|
|             Eritrea|41125|
|      Norfolk Island|40816|
|            Djibouti|40710|
|               Tonga|41012|
|            Malaysia|40709|
|           Singapore|40789|
+--------------------+-----+
only showing top 20 rows



## Group by

In [14]:
for fmt in ['csv','json', 'parquet']:
    print(fmt)
    sdf = read(fmt)
   # sdf.printSchema()
    start = time.time()
    sdf.groupBy("Covid").count().show()
    print(time.time() - start)
    print('\n')

csv
+--------------------+-----+
|               Covid|count|
+--------------------+-----+
|                Chad|40892|
|            Paraguay|40814|
|            Anguilla|40878|
|               Macao|40950|
|Heard Island and ...|40972|
|               Yemen|40779|
|             Senegal|40607|
|             Tokelau|41085|
|              Sweden|40679|
|French Southern T...|40844|
|            Kiribati|40461|
|              Guyana|40817|
|              Jersey|40852|
|         Philippines|40738|
|             Eritrea|41125|
|      Norfolk Island|40816|
|            Djibouti|40710|
|               Tonga|41012|
|            Malaysia|40709|
|           Singapore|40789|
+--------------------+-----+
only showing top 20 rows

5.210748910903931


json
+--------------------+-----+
|               Covid|count|
+--------------------+-----+
|                Chad|40892|
|            Anguilla|40878|
|            Paraguay|40814|
|               Macao|40950|
|Heard Island and ...|40972|
|               Y

In [19]:
# Function to execute query
def SQL_query(fmt, SQL_statement):
    sdf=read(f'{fmt}')
    sdf.createOrReplaceTempView("TempView")
    sqlDF = spark.sql(SQL_statement)
    sqlDF.show()
    


In [20]:
SQL_statement = "SELECT country, count(*) as count FROM TempView group by country"

for fmt in ['csv','json', 'parquet']:
    print(fmt)
    start = time.time()
    SQL_query(fmt, SQL_statement)
    print(time.time() - start)
    print('\n')

csv
+-------+-------+
|country|  count|
+-------+-------+
|      3|1668367|
|      0|1665694|
|      5|1666876|
|      1|1666173|
|      4|1668307|
|      2|1664583|
+-------+-------+

5.957247257232666


json
+-------+-------+
|country|  count|
+-------+-------+
|      3|1668367|
|      0|1665694|
|      5|1666876|
|      1|1666173|
|      4|1668307|
|      2|1664583|
+-------+-------+

17.537051916122437


parquet
+-------+-------+
|country|  count|
+-------+-------+
|      3|1668367|
|      0|1665694|
|      5|1666876|
|      1|1666173|
|      4|1668307|
|      2|1664583|
+-------+-------+

1.9546408653259277




In [21]:
SQL_statement = "SELECT count(*) FROM TempView where country = 'Macao'"
for fmt in ['csv','json', 'parquet']:
    print(fmt)
    start = time.time()
    SQL_query(fmt, SQL_statement)
    print(time.time() - start)
    print('\n')

csv
+--------+
|count(1)|
+--------+
|       0|
+--------+

3.8470730781555176


json
+--------+
|count(1)|
+--------+
|       0|
+--------+

12.041115045547485


parquet
+--------+
|count(1)|
+--------+
|       0|
+--------+

0.6845920085906982




In [22]:
SQL_statement = "SELECT * FROM TempView where id = 1"
for fmt in ['csv','json', 'parquet']:
    print(fmt)
    start = time.time()
    SQL_query(fmt, SQL_statement)
    print(time.time() - start)
    print('\n')

csv
+---+----+---------------------+-------+-----+
| id|name|number_family_members|country|Covid|
+---+----+---------------------+-------+-----+
|  1|   2|      Bradley Walters|      2| Mali|
+---+----+---------------------+-------+-----+

9.384071350097656


json
+-----+-------+---+----+---------------------+
|Covid|country| id|name|number_family_members|
+-----+-------+---+----+---------------------+
| Mali|      2|  1|   2|      Bradley Walters|
+-----+-------+---+----+---------------------+

16.175256490707397


parquet
+---+----+---------------------+-------+-----+
| id|name|number_family_members|country|Covid|
+---+----+---------------------+-------+-----+
|  1|   2|      Bradley Walters|      2| Mali|
+---+----+---------------------+-------+-----+

2.5878260135650635




## Min, max, count

In [23]:
fmt = 'parquet'
sdf=read(f'{fmt}')
field = 'number_family_members'
start = time.time()
sdf.agg({field: "max"}).show(), sdf.agg({field: "min"}).show(), sdf.agg({field: "count"}).show()
print(time.time() - start)

+--------------------------+
|max(number_family_members)|
+--------------------------+
|                Zoe Zuniga|
+--------------------------+

+--------------------------+
|min(number_family_members)|
+--------------------------+
|              Aaron Abbott|
+--------------------------+

+----------------------------+
|count(number_family_members)|
+----------------------------+
|                    10000000|
+----------------------------+

4.654228687286377


In [29]:
fmt = 'json'
sdf=read(f'{fmt}')
field = 'number_family_members'
start = time.time()
sdf.agg({field: "max"}).show(), sdf.agg({field: "min"}).show(), sdf.agg({field: "count"}).show()
print(time.time() - start)

+--------------------------+
|max(number_family_members)|
+--------------------------+
|                Zoe Zuniga|
+--------------------------+

+--------------------------+
|min(number_family_members)|
+--------------------------+
|              Aaron Abbott|
+--------------------------+

+----------------------------+
|count(number_family_members)|
+----------------------------+
|                    10000000|
+----------------------------+

13.4062659740448


In [30]:
fmt = 'csv'
sdf=read(f'{fmt}')
field = 'number_family_members'
start = time.time()
sdf.agg({field: "max"}).show(), sdf.agg({field: "min"}).show(), sdf.agg({field: "count"}).show()
print(time.time() - start)

+--------------------------+
|max(number_family_members)|
+--------------------------+
|                Zoe Zuniga|
+--------------------------+

+--------------------------+
|min(number_family_members)|
+--------------------------+
|              Aaron Abbott|
+--------------------------+

+----------------------------+
|count(number_family_members)|
+----------------------------+
|                    10000000|
+----------------------------+

9.704328536987305


# File sizes

In [31]:
import os
for r, d, f in os.walk('./output'):
    size = sum(os.path.getsize(os.path.join(r,n)) for n in f) / 1048576
    size = round(size,2)
    print("{} is {} MB".format(r, size))

./output is 0.0 MB
./output/proto.csv is 419.63 MB
./output/proto.json is 1053.98 MB
./output/proto.parquet is 185.52 MB
