# Init SparkContext

In [1]:
import os
from datetime import datetime
from pyspark.sql import SparkSession, SQLContext
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType
from pyspark.storagelevel import StorageLevel

In [2]:
spark = (SparkSession.builder.appName("spark-101-{}".format(datetime.today()))
        .master("spark://spark-master:7077")      
        .getOrCreate())

sqlContext = SQLContext(spark)



In [3]:
sc = spark.sparkContext
sc

# Reading CSV file

In [4]:
base_path = "s3a://warehouse/tpch_data"

In [5]:
%%time
filename = "h_lineitem.dsv"

df_lineitem = (
    spark.read.option("delimiter", "|")
    .option("header", True)
    .option("inferSchema" , True)
    .csv(os.path.join(base_path, filename))
)

CPU times: user 16.1 ms, sys: 4.51 ms, total: 20.6 ms
Wall time: 49.9 s


In [6]:
df_lineitem.printSchema()

root
 |-- L_ORDERKEY: integer (nullable = true)
 |-- L_PARTKEY: integer (nullable = true)
 |-- L_SUPPKEY: integer (nullable = true)
 |-- L_LINENUMBER: integer (nullable = true)
 |-- L_QUANTITY: integer (nullable = true)
 |-- L_EXTENDEDPRICE: string (nullable = true)
 |-- L_DISCOUNT: string (nullable = true)
 |-- L_TAX: string (nullable = true)
 |-- L_RETURNFLAG: string (nullable = true)
 |-- L_LINESTATUS: string (nullable = true)
 |-- L_SHIPDATE: string (nullable = true)
 |-- L_COMMITDATE: string (nullable = true)
 |-- L_RECEIPTDATE: string (nullable = true)
 |-- L_SHIPINSTRUCT: string (nullable = true)
 |-- L_SHIPMODE: string (nullable = true)
 |-- L_COMMENT: string (nullable = true)



# Writing parquet files

In [7]:
df_lineitem.rdd.getNumPartitions()

14

In [8]:
table_name = "h_lineitem"
output_path = os.path.join(base_path, "parquet", table_name)

In [9]:
%%time
df_lineitem.write.parquet(output_path, mode="overwrite")

CPU times: user 24.2 ms, sys: 6.19 ms, total: 30.4 ms
Wall time: 1min 49s


In [10]:
df_lineitem.repartition(5).write.parquet(output_path, mode="overwrite")

In [11]:
df_lineitem_compressed = spark.read.parquet(output_path)
df_lineitem_compressed.rdd.getNumPartitions()

5

# Writing JSON files

In [12]:
df_semi_structure = (
    df_lineitem_compressed.groupby("L_ORDERKEY")
    .agg(
        F.collect_list("L_LINENUMBER").alias("items"),
        F.count("L_LINENUMBER").alias("num_items"),
    )
    .orderBy(F.col("num_items").desc())    
)

df_semi_structure.show()

+----------+--------------------+---------+
|L_ORDERKEY|               items|num_items|
+----------+--------------------+---------+
|   5999976|[8, 6, 5, 1, 7, 4...|        8|
|     10180|[5, 1, 3, 7, 2, 6...|        7|
|     10183|[5, 1, 3, 4, 7, 6...|        7|
|     10342|[3, 7, 5, 2, 4, 1...|        7|
|     10211|[3, 2, 6, 7, 1, 4...|        7|
|     10184|[3, 5, 7, 2, 1, 4...|        7|
|     10209|[7, 5, 4, 6, 1, 2...|        7|
|     10210|[6, 7, 1, 3, 5, 2...|        7|
|     10306|[7, 6, 3, 4, 1, 5...|        7|
|     10215|[6, 4, 7, 2, 3, 1...|        7|
|     10216|[1, 3, 5, 2, 4, 7...|        7|
|     10241|[3, 7, 5, 4, 1, 2...|        7|
|     10243|[3, 2, 6, 7, 5, 4...|        7|
|     10247|[3, 2, 7, 5, 1, 4...|        7|
|     10248|[3, 6, 5, 1, 4, 2...|        7|
|     10276|[4, 6, 5, 1, 2, 3...|        7|
|     10309|[2, 3, 1, 6, 5, 7...|        7|
|     10310|[2, 4, 3, 6, 5, 1...|        7|
|     10311|[2, 3, 1, 4, 7, 5...|        7|
|     10338|[3, 7, 6, 1, 4, 2...

In [13]:
df_semi_structure.printSchema()

root
 |-- L_ORDERKEY: integer (nullable = true)
 |-- items: array (nullable = false)
 |    |-- element: integer (containsNull = false)
 |-- num_items: long (nullable = false)



In [14]:
%%time
output_json = os.path.join(base_path, "json", table_name)
df_semi_structure.write.json(output_json, mode="overwrite")

CPU times: user 3.97 ms, sys: 1.81 ms, total: 5.78 ms
Wall time: 16.8 s


# Compare uncompressed/compressed files

In [15]:
%%time
df_lineitem.count()

CPU times: user 30.7 ms, sys: 5.96 ms, total: 36.7 ms
Wall time: 37.7 s


11996782

In [16]:
%%time
df_lineitem_compressed.count()

CPU times: user 2.21 ms, sys: 1.12 ms, total: 3.33 ms
Wall time: 1.2 s


11996782

# Cached vs non-cached

In [17]:
%%time
df_lineitem.cache()
df_lineitem.count()

CPU times: user 46.1 ms, sys: 6.25 ms, total: 52.3 ms
Wall time: 1min 14s


11996782

In [18]:
%%time
df_lineitem.count()

CPU times: user 1.64 ms, sys: 112 µs, total: 1.75 ms
Wall time: 275 ms


11996782

In [19]:
%%time
df_lineitem.unpersist()
df_lineitem.count()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
%%time
df_lineitem.persist(StorageLevel.MEMORY_AND_DISK_2)
df_lineitem.count()

In [None]:
%%time
df_lineitem.unpersist()
df_lineitem.persist(StorageLevel.DISK_ONLY)
df_lineitem.count()

In [None]:
%%time
df_lineitem.count()

# Hive Metastore

In [None]:
df_lineitem_compressed.limit(5).toPandas()

In [None]:
# register temporary view
table_name = "h_lineitem"
df_lineitem_compressed.createOrReplaceTempView(table_name)

In [None]:
spark.sql("SHOW TABLES").show()

In [None]:
spark.sql("SELECT * FROM h_lineitem LIMIT 5").show()

# Dataframe transformation

## SELECT

In [None]:
df_lineitem_compressed.select("L_ORDERKEY", "L_LINENUMBER", "L_QUANTITY").show(5)

In [None]:
ls_selected_cols = ["L_ORDERKEY", "L_LINENUMBER", "L_QUANTITY"]
df_lineitem_compressed.select(ls_selected_cols).show(5)

In [None]:
df_lineitem_compressed.selectExpr("L_ORDERKEY AS order_no", "L_LINENUMBER AS line_no", "L_QUANTITY AS quantity").show(5)

## WHERE

In [None]:
df_lineitem_compressed.filter("L_ORDERKEY = 1090753").show()

In [None]:
df_lineitem_compressed.where("L_ORDERKEY = 1090753 AND L_LINENUMBER < 4").show()

In [None]:
df_lineitem_compressed.where((F.col("L_ORDERKEY") == 1090753) & (F.col("L_LINENUMBER") < F.lit(4))).show()

## ORDER BY

In [None]:
df_filtered = df_lineitem_compressed.where("L_ORDERKEY = 1090753 AND L_LINENUMBER < 4")
df_filtered.orderBy("L_LINENUMBER").show()

In [None]:
df_filtered.orderBy("L_LINENUMBER", ascending=False).show()

## New columns

In [None]:
df_filtered = df_lineitem_compressed.where("L_ORDERKEY = 1090753")
df_filtered = df_filtered.selectExpr("L_ORDERKEY AS order_no", "L_LINENUMBER AS line_no", "L_EXTENDEDPRICE", "CAST(L_QUANTITY AS INT) AS quantity")
df_filtered.show()

In [None]:
df_filtered = df_filtered.withColumn("amount", F.regexp_replace("L_EXTENDEDPRICE", ",", ".").cast(FloatType()))
df_filtered.show()

In [None]:
df_filtered = df_filtered.withColumn("total_price", F.col("amount") * F.col("quantity"))
df_filtered.show()

## GROUP BY, AGG: sum(), min(), max(), avg()

In [None]:
df_agg = (
    df_filtered.groupBy("order_no")
    .agg(
        F.min("total_price").alias("min_price"),
        F.max("total_price").alias("max_price"),
        F.avg("total_price").alias("avg_price"),
        F.sum("total_price").alias("total_price"),
        F.count("line_no").alias("num_items")
    )
)

df_agg = df_agg.selectExpr(["*"] + ["(total_price / num_items) AS values_per_items"])
df_agg.show()