In [6]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Reading Complex Data Formats")
    .master("local[*]")
    .getOrCreate()
)

spark

In [7]:
df_parquet = spark.read.format("parquet").load("data/input/sales_total_parquet/*.parquet")

In [9]:
df_parquet.printSchema()

root
 |-- transacted_at: timestamp (nullable = true)
 |-- trx_id: integer (nullable = true)
 |-- retailer_id: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- city_id: integer (nullable = true)



In [10]:
df_parquet.show()

+-------------------+----------+-----------+--------------------+-------+----------+
|      transacted_at|    trx_id|retailer_id|         description| amount|   city_id|
+-------------------+----------+-----------+--------------------+-------+----------+
|2017-12-20 20:00:00| 561803551| 2077350195|Walgreen  ccd id:...|  69.66| 350411713|
|2017-12-10 22:00:00| 966498100| 1232435973|Toys R Us   ccd i...|  16.44|1554564545|
|2017-12-19 17:00:00|  40380012| 1898522855|Target   arc id: ...|2854.84| 920189167|
|2017-12-19 19:00:00|1735489522|  847200066|unkn     ccd id: ...| 280.31|2011632272|
|2017-01-11 14:00:00|1513345631| 1953761884|Home Depot   arc ...|  20.51|1528300441|
|2017-12-24 23:00:00| 884145953| 2001148981|Costco  ccd id: 4...|  11.75|2116046074|
|2017-05-14 19:00:00|1003554030| 1903529855|                unkn|  61.76|1710668653|
|2017-12-24 19:00:00| 921164309|  847200066|unkn   ppd id: 95...|  27.83|2074005445|
|2017-12-16 17:00:00| 549217139| 1070485878|          Amazon.com|

In [12]:
df_orc = spark.read.format("orc").load("data/input/sales_total_orc/*.orc")
df_orc.printSchema()

root
 |-- transacted_at: timestamp (nullable = true)
 |-- trx_id: integer (nullable = true)
 |-- retailer_id: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- city_id: integer (nullable = true)



In [13]:
df_orc.show()

+-------------------+----------+-----------+--------------------+-------+----------+
|      transacted_at|    trx_id|retailer_id|         description| amount|   city_id|
+-------------------+----------+-----------+--------------------+-------+----------+
|2017-12-20 20:00:00| 561803551| 2077350195|Walgreen  ccd id:...|  69.66| 350411713|
|2017-12-10 22:00:00| 966498100| 1232435973|Toys R Us   ccd i...|  16.44|1554564545|
|2017-12-19 17:00:00|  40380012| 1898522855|Target   arc id: ...|2854.84| 920189167|
|2017-12-19 19:00:00|1735489522|  847200066|unkn     ccd id: ...| 280.31|2011632272|
|2017-01-11 14:00:00|1513345631| 1953761884|Home Depot   arc ...|  20.51|1528300441|
|2017-12-24 23:00:00| 884145953| 2001148981|Costco  ccd id: 4...|  11.75|2116046074|
|2017-05-14 19:00:00|1003554030| 1903529855|                unkn|  61.76|1710668653|
|2017-12-24 19:00:00| 921164309|  847200066|unkn   ppd id: 95...|  27.83|2074005445|
|2017-12-16 17:00:00| 549217139| 1070485878|          Amazon.com|

In [14]:
import time

def get_time(func):
    def inner_get_time() -> str:
        start_time = time.time()
        func()
        end_time = time.time()
        return (f"Execution time: {(end_time - start_time)*1000} ms")
    print(inner_get_time())

In [15]:
@get_time
def x():
    df = spark.read.format("parquet").load("data/input/sales_data.parquet")
    df.count()

Execution time: 583.8000774383545 ms


In [16]:
@get_time
def x():
    df = spark.read.format("parquet").load("data/input/sales_data.parquet")
    df.select("trx_id").count()

Execution time: 220.10350227355957 ms


In [25]:
@get_time
def x():
    df = spark.read.format("csv").option("header", True).option("inferSchema", True).load("data/input/sales.csv")
    df.select("trx_id").count()

Execution time: 1225.189447402954 ms
