In [1]:
! hdfs dfs -rm -R -skipTrash /user/train/output_data/*

rm: `/user/train/output_data/*': No such file or directory


In [2]:
! hdfs dfs -ls  /user/train/output_data

In [1]:
import findspark

In [2]:
# /opt/manual/spark: this is SPARK_HOME path
findspark.init("/opt/manual/spark")

In [3]:
from pyspark.sql import SparkSession, functions as F

# Adding external libs to application

In [4]:
spark = SparkSession.builder \
.appName("Dataframe Reader") \
.master("yarn") \
.enableHiveSupport() \
.config("spark.jars.packages","org.apache.spark:spark-avro_2.12:3.0.0") \
.getOrCreate()

In [5]:
# config("spark.jars.packages","org.apache.spark:spark-avro_2.12:3.0.0") \
# is for avro. Avro is not bulit-in data format
# <groupId>:<artifactId>:<version>

In [6]:
# ! hdfs dfs -put /home/train/datasets/flight-data  /user/train/datasets

In [7]:
! hdfs dfs -ls /user/train/datasets

Found 8 items
-rw-r--r--   1 train supergroup       4556 2020-11-15 10:29 /user/train/datasets/Advertising.csv
-rw-r--r--   1 train supergroup   46401315 2020-11-14 16:17 /user/train/datasets/Hotel_Reviews.csv.gz
drwxr-xr-x   - train supergroup          0 2020-11-10 22:03 /user/train/datasets/cat_images
drwxr-xr-x   - train supergroup          0 2020-11-15 09:55 /user/train/datasets/flight-data
drwxr-xr-x   - train supergroup          0 2020-11-02 21:45 /user/train/datasets/hiveExternal
-rw-r--r--   1 train supergroup       4611 2020-11-01 12:21 /user/train/datasets/iris.csv
drwxr-xr-x   - train supergroup          0 2020-11-15 08:34 /user/train/datasets/market1mil_snappyparquet
drwxr-xr-x   - train supergroup          0 2020-11-15 08:30 /user/train/datasets/market5mil_snappyparquet


# DataFrameReader methods, arguments, and options

<img src="../images/spark_dataframe_reader_table.png"/>

<p>Source: Learning Spark, O'Reilly, 2020</p>

# Read csv

In [8]:
df = spark.read \
.format("csv") \
.option("header", True) \
.option("inferSchema", True) \
.option("sep", ",") \
.load("hdfs://localhost:9000/user/train/datasets/flight-data/csv/")

In [9]:
# You don't have to point a single file with .csv extension.
# You can read from a folder.
# But all csv files have to be same format

In [10]:
df.limit(5).toPandas()

Unnamed: 0,DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
0,United States,Romania,1
1,United States,Ireland,264
2,United States,India,69
3,Egypt,United States,24
4,Equatorial Guinea,United States,1


In [13]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



# Define a schema

In [11]:
flight_schema = "DEST_COUNTRY_NAME string, ORIGIN_COUNTRY_NAME string, count integer"

# Read with pre-defined schema

In [12]:
df_csv = spark.read \
.format("csv") \
.option("header", True) \
.schema(flight_schema) \
.option("sep", ",") \
.load("hdfs://localhost:9000/user/train/datasets/flight-data/csv/")

In [13]:
df_csv.limit(5).toPandas()

Unnamed: 0,DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
0,United States,Romania,1
1,United States,Ireland,264
2,United States,India,69
3,Egypt,United States,24
4,Equatorial Guinea,United States,1


In [14]:
df_csv.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



# Read parquet

    Parquet is the default and preferred data source for Spark because
    it’s efficient, uses columnar storage, and employs a fast compression
    algorithm.

In [15]:
df_parquet = spark.read \
.format("parquet") \
.load("hdfs://localhost:9000/user/train/datasets/flight-data/parquet/2010-summary.parquet")

In [16]:
df_parquet.limit(5).toPandas()

Unnamed: 0,DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
0,United States,Romania,1
1,United States,Ireland,264
2,United States,India,69
3,Egypt,United States,24
4,Equatorial Guinea,United States,1


In [17]:
df_parquet.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



# Read Images

In [18]:
from pyspark.ml import image

In [19]:
# ! hdfs dfs -put /home/train/datasets/cat_images/ /user/train/datasets

In [20]:
images_df = spark.read.format("image") \
.load("hdfs://localhost:9000/user/train/datasets/cat_images")

In [21]:
images_df.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)



In [22]:
images_df.select("image.origin", "image.height", "image.width", "image.nChannels","image.mode").show(5, truncate=False)

+---------------------------------------------------------------+------+-----+---------+----+
|origin                                                         |height|width|nChannels|mode|
+---------------------------------------------------------------+------+-----+---------+----+
|hdfs://localhost:9000/user/train/datasets/cat_images/cat.3.jpg |414   |500  |3        |16  |
|hdfs://localhost:9000/user/train/datasets/cat_images/cat.7.jpg |499   |495  |3        |16  |
|hdfs://localhost:9000/user/train/datasets/cat_images/cat.10.jpg|499   |489  |3        |16  |
|hdfs://localhost:9000/user/train/datasets/cat_images/cat.17.jpg|375   |499  |3        |16  |
|hdfs://localhost:9000/user/train/datasets/cat_images/cat.18.jpg|374   |500  |3        |16  |
+---------------------------------------------------------------+------+-----+---------+----+
only showing top 5 rows



# Binary Files

    Spark 3.0 adds support for binary files as a data source. The DataFrameReader converts
    each binary file into a single DataFrame row (record) that contains the raw content
    and metadata of the file.

In [23]:
binary_files_df = (spark.read
.format("binaryFile")
.option("pathGlobFilter", "*.jpg")
.option("recursiveFile", True)
.load("hdfs://localhost:9000/user/train/datasets/cat_images"))


In [24]:
binary_files_df.show(5)

+--------------------+--------------------+------+--------------------+
|                path|    modificationTime|length|             content|
+--------------------+--------------------+------+--------------------+
|hdfs://localhost:...|2020-11-10 22:03:...| 37971|[FF D8 FF E0 00 1...|
|hdfs://localhost:...|2020-11-10 22:03:...| 36934|[FF D8 FF E0 00 1...|
|hdfs://localhost:...|2020-11-10 22:03:...| 34315|[FF D8 FF E0 00 1...|
|hdfs://localhost:...|2020-11-10 22:03:...| 32072|[FF D8 FF E0 00 1...|
|hdfs://localhost:...|2020-11-10 22:03:...| 30119|[FF D8 FF E0 00 1...|
+--------------------+--------------------+------+--------------------+
only showing top 5 rows



# DataframeWriter

    DataFrameWriter.format(args)
    .option(args)
    .bucketBy(args)
    .partitionBy(args)
    .save(path)

<img src="../images/spark_dataframe_writer_table.png"/>

<p>Source: Learning Spark, O'Reilly, 2020</p>

In [26]:
import time

# Parquet

In [31]:
# Parquet is the default and recommended format

In [28]:
start_time = time.time()

df_csv.write \
.format("parquet") \
.mode("overwrite") \
.save("hdfs://localhost:9000/user/train/output_data/flight-data/parquet")

print("--- %s seconds ---" % (time.time() - start_time))

--- 1.5220863819122314 seconds ---


In [29]:
! hdfs dfs -du -h hdfs://localhost:9000/user/train/output_data/flight-data/

13.4 K   13.4 K   hdfs://localhost:9000/user/train/output_data/flight-data/avro
40.6 K   40.6 K   hdfs://localhost:9000/user/train/output_data/flight-data/csv
122.8 K  122.8 K  hdfs://localhost:9000/user/train/output_data/flight-data/json
11.2 K   11.2 K   hdfs://localhost:9000/user/train/output_data/flight-data/orc
14.7 K   14.7 K   hdfs://localhost:9000/user/train/output_data/flight-data/parquet


In [34]:
# This will create a folder and inside will be
"""
_SUCCESS
_committed_1799640464332036264
_started_1799640464332036264
part-00000-tid-1799640464332036264-91273258-d7ef-4dc7-<...>-c000.snappy.parquet
"""
# part-xxxx...  can be more than one.

'\n_SUCCESS\n_committed_1799640464332036264\n_started_1799640464332036264\npart-00000-tid-1799640464332036264-91273258-d7ef-4dc7-<...>-c000.snappy.parquet\n'

## With compression

In [30]:

start_time = time.time()

df_csv.write \
.format("parquet") \
.option("compression", "snappy") \
.mode("overwrite") \
.save("hdfs://localhost:9000/user/train/output_data/flight-data/parquet")

print("--- %s seconds ---" % (time.time() - start_time))

--- 1.413262128829956 seconds ---


In [36]:
! hdfs dfs -du -h hdfs://localhost:9000/user/train/output_data/flight-data/

14.7 K  14.7 K  hdfs://localhost:9000/user/train/output_data/flight-data/parquet


In [31]:
market5 = spark.read.format("csv") \
.option("compression","gzip") \
.option("header", True) \
.option("inferSchema", True) \
.option("sep", "\t") \
.load("file:///home/train/datasets/market5mil.csv.gz")

In [32]:
market5.count()

5387992

In [33]:
market5.limit(5).toPandas()

Unnamed: 0,LOGICALREF,COUNT_,ITEMCODE,ITEMNAME,FICHENO,DATE_,AMOUNT,PRICE,LINENETTOTAL,LINENET,...,CLIENTNAME,BRANDCODE,BRAND,CATEGORY_NAME1,CATEGORY_NAME2,CATEGORY_NAME3,STARTDATE,ENDDATE,SPECODE,CAPIBLOCK_CREADEDDATE
0,1,1,8,TOZ SEKER,15560,2017-01-02 00:00:00,45,26499999999999999,52999999999999998,49000000000000004,...,Hanım CANBULAT,,,İÇECEK,ÇAY KAHVE,SEKER TATLANDIRICI,2017-01-03 09:25:03,2017-01-03 09:25:43,K,2018-07-14 01:50:39
1,2,1,20868,KIRMIZI MERCIMEK,15560,2017-01-02 00:00:00,1006,27999999999999998,28199999999999998,279,...,Hanım CANBULAT,167.0,BAKLİYAT,GIDA,BAKLİYAT,AÇIK BAKLİYAT,2017-01-03 09:25:03,2017-01-03 09:25:43,K,2018-07-14 01:50:39
2,3,1,8583,"TEST MATIK 1,5 KG NORMAL",15560,2017-01-02 00:00:00,1,49500000000000002,49500000000000002,41900000000000004,...,Hanım CANBULAT,229.0,TEST,DETERJAN TEMİZLİK,ÇAMAŞIR YIKAMA,TOZ DETERJAN,2017-01-03 09:25:03,2017-01-03 09:25:43,K,2018-07-14 01:50:39
3,4,1,1454,BIZIM MAKARNA BONCUK,15560,2017-01-02 00:00:00,1,11000000000000001,11000000000000001,102,...,Hanım CANBULAT,146.0,ÜLKER,GIDA,MAKARNA,MAKARNA,2017-01-03 09:25:03,2017-01-03 09:25:43,K,2018-07-14 01:50:39
4,5,1,13519,FILIZ MAKARNA KISA KESME 500 GR,15560,2017-01-02 00:00:00,1,11000000000000001,11000000000000001,102,...,Hanım CANBULAT,52.0,FİLİZ,GIDA,MAKARNA,MAKARNA,2017-01-03 09:25:03,2017-01-03 09:25:43,K,2018-07-14 01:50:39


<h1 style="color:red;">!!!!!!   Caution: If you use existing folder, the data inside folder will be deleted !!!</h1>

In [34]:
market5.write.mode("overwrite") \
.option("compression", "snappy") \
.parquet("hdfs://localhost:9000/user/train/datasets/market5mil_snappyparquet")

# Available codecs are brotli, uncompressed, lz4, gzip, lzo, snappy, none, zstd.

KeyboardInterrupt: 

In [41]:
! hdfs dfs -ls /user/train/datasets | grep market

drwxr-xr-x   - train supergroup          0 2020-11-15 08:30 /user/train/datasets/market5mil_snappyparquet


In [42]:
market1 = spark.read.format("csv") \
.option("compression","gzip") \
.option("header", True) \
.option("inferSchema", True) \
.option("sep", ";") \
.load("file:///home/train/datasets/market1mil.csv.gz")

In [43]:
market1.limit(5).toPandas()

Unnamed: 0,LOGICALREF,COUNT_,ITEMCODE,ITEMNAME,FICHENO,DATE_,AMOUNT,PRICE,LINENETTOTAL,LINENET,...,CLIENTNAME,BRANDCODE,BRAND,CATEGORY_NAME1,CATEGORY_NAME2,CATEGORY_NAME3,STARTDATE,ENDDATE,SPECODE,CAPIBLOCK_CREADEDDATE
0,1,1,8,TOZ SEKER,15560,2.01.2017 00:00,45,265,53,49,...,Hanım CANBULAT,,,İÇECEK,ÇAY KAHVE,SEKER TATLANDIRICI,3.01.2017 09:25,3.01.2017 09:25,K,14.07.2018 01:50
1,2,1,20868,KIRMIZI MERCIMEK,15560,2.01.2017 00:00,1006,28,282,279,...,Hanım CANBULAT,167.0,BAKLİYAT,GIDA,BAKLİYAT,AÇIK BAKLİYAT,3.01.2017 09:25,3.01.2017 09:25,K,14.07.2018 01:50
2,3,1,8583,"TEST MATIK 1,5 KG NORMAL",15560,2.01.2017 00:00,1,495,495,419,...,Hanım CANBULAT,229.0,TEST,DETERJAN TEMİZLİK,ÇAMAŞIR YIKAMA,TOZ DETERJAN,3.01.2017 09:25,3.01.2017 09:25,K,14.07.2018 01:50
3,4,1,1454,BIZIM MAKARNA BONCUK,15560,2.01.2017 00:00,1,11,11,102,...,Hanım CANBULAT,146.0,ÜLKER,GIDA,MAKARNA,MAKARNA,3.01.2017 09:25,3.01.2017 09:25,K,14.07.2018 01:50
4,5,1,13519,FILIZ MAKARNA KISA KESME 500 GR,15560,2.01.2017 00:00,1,11,11,102,...,Hanım CANBULAT,52.0,FİLİZ,GIDA,MAKARNA,MAKARNA,3.01.2017 09:25,3.01.2017 09:25,K,14.07.2018 01:50


In [44]:
market1.write.mode("overwrite") \
.option("compression", "snappy") \
.parquet("hdfs://localhost:9000/user/train/datasets/market1mil_snappyparquet")

In [46]:
! hdfs dfs -ls /user/train/datasets | grep market

drwxr-xr-x   - train supergroup          0 2020-11-15 08:34 /user/train/datasets/market1mil_snappyparquet
drwxr-xr-x   - train supergroup          0 2020-11-15 08:30 /user/train/datasets/market5mil_snappyparquet


# ORC

In [47]:
start_time = time.time()

df_csv.write \
.format("orc") \
.mode("overwrite") \
.save("hdfs://localhost:9000/user/train/output_data/flight-data/orc")

print("--- %s seconds ---" % (time.time() - start_time))

--- 2.2660014629364014 seconds ---


# JSON

In [48]:
start_time = time.time()

df_csv.write \
.format("json") \
.mode("overwrite") \
.save("hdfs://localhost:9000/user/train/output_data/flight-data/json")

print("--- %s seconds ---" % (time.time() - start_time))

--- 0.9012176990509033 seconds ---


# CSV

In [49]:
start_time = time.time()

df_csv.write \
.format("csv") \
.mode("overwrite") \
.save("hdfs://localhost:9000/user/train/output_data/flight-data/csv")

print("--- %s seconds ---" % (time.time() - start_time))

--- 1.4392178058624268 seconds ---


# Avro

    Avro format is used, for example,
    by Apache Kafka for message serializing and deserializing. It offers many benefits,
    including direct mapping to JSON, speed and efficiency, and bindings available
    for many programming languages.

In [50]:
start_time = time.time()

df_csv.write \
.format("avro") \
.mode("overwrite") \
.save("hdfs://localhost:9000/user/train/output_data/flight-data/avro")

print("--- %s seconds ---" % (time.time() - start_time))

--- 1.7004945278167725 seconds ---


# Examine output sizes for each format

In [51]:
! hdfs dfs -du -h hdfs://localhost:9000/user/train/output_data/flight-data/

13.4 K   13.4 K   hdfs://localhost:9000/user/train/output_data/flight-data/avro
40.6 K   40.6 K   hdfs://localhost:9000/user/train/output_data/flight-data/csv
122.8 K  122.8 K  hdfs://localhost:9000/user/train/output_data/flight-data/json
11.2 K   11.2 K   hdfs://localhost:9000/user/train/output_data/flight-data/orc
14.7 K   14.7 K   hdfs://localhost:9000/user/train/output_data/flight-data/parquet


# Hive Table

In [52]:
# If we don't use enableHiveSupport() during the SparkSession
# following code will list spark catalog

In [53]:
spark.sql("show databases").show()

+----------+
| namespace|
+----------+
| bookstore|
|   default|
|homecredit|
|    retail|
|     test1|
|     test2|
|     train|
+----------+



In [54]:
spark.sql("create database if not exists train")

DataFrame[]

In [37]:
df_csv.show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
+-----------------+-------------------+-----+
only showing top 3 rows



In [38]:
df_csv.write \
.format("orc") \
.mode("overwrite") \
.saveAsTable("train.flights")

In [39]:
spark.sql("use train")
spark.sql("show tables").show()

+--------+--------------+-----------+
|database|     tableName|isTemporary|
+--------+--------------+-----------+
|   train|       flights|      false|
|   train|sum_of_flights|      false|
+--------+--------------+-----------+



In [40]:
spark.sql("select * from train.flights limit 5").show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+



In [41]:
spark.stop()