##### Create Manage Table and access the catlog

creating managed table need persistent metastore. apache spark depends on hive metastore. so we need hive for this example.  

lets load sparksession and read parquet file

In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession

from lib.logger import Log4j

spark = SparkSession \
            .builder \
            .master("local[3]") \
            .appName("Read Formats using API") \
            .enableHiveSupport() \
            .getOrCreate()

logger = Log4j(spark)
logger.info("Starting HelloSparkSQL")

flightTimeParquetDF = spark.read \
        .format("parquet") \
        .load("data/flight*.parquet")

In [2]:
flightTimeParquetDF.show(5)

+----------+----------+-----------------+------+----------------+----+--------------+------------+--------+---------+-------+------------+--------+---------+--------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|ORIGIN_CITY_NAME|DEST|DEST_CITY_NAME|CRS_DEP_TIME|DEP_TIME|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|CANCELLED|DISTANCE|
+----------+----------+-----------------+------+----------------+----+--------------+------------+--------+---------+-------+------------+--------+---------+--------+
|2000-01-01|        DL|             1451|   BOS|      Boston, MA| ATL|   Atlanta, GA|        1115|    1113|     1343|      5|        1400|    1348|        0|     946|
|2000-01-01|        DL|             1479|   BOS|      Boston, MA| ATL|   Atlanta, GA|        1315|    1311|     1536|      7|        1559|    1543|        0|     946|
|2000-01-01|        DL|             1857|   BOS|      Boston, MA| ATL|   Atlanta, GA|        1415|    1414|     1642|      9|        1721|    1651|        0|     946

Lets not do any data processing. as we need to learn mechanics of saving dataframe into managed tables.

why i do not save it as paruqet file? why we need to save it as managed table?
when you want to access datafile from source, You may read datafile using reader api. after processing yu may then save it as from dataframe as datafile in parquet format using writer api. what if you want to make this data available to sql compint tool? you require it to store as tables. so tools can access it over jdbc/odbc connectors.
table you can access through JDBC/ODBC connectors by various tools like tablue, talend, powerbi etc.
parquet, json, avro not accesible through JDBC/ODBC connectors.

use save AsTable method, it takes table name and creates managed table and store table to current database. but what is current database?
Apache Spark come with one default database, and database name itself is default. if you want to store it in different database then you have two options
1. prefix table name with database name
2. access catlog and set current database for this session

lets create database AIRLINE_DB

In [None]:
SHOW DATABASES

In [5]:
spark.sql("CREATE DATABASE IF NOT EXISTS AIRLINE_DB")

DataFrame[]

method 1 prefix

In [3]:
flightTimeParquetDF.write \
        .mode("overwrite") \
        .saveAsTable("AIRLINE_DB.flight_data_tbl")

method 2 set current database

In [6]:
spark.catalog.setCurrentDatabase("AIRLINE_DB")

In [7]:
flightTimeParquetDF.write \
        .mode("overwrite") \
        .saveAsTable("flight_data_tbl")

In [8]:
logger.info(spark.catalog.listTables("AIRLINE_DB"))
spark.catalog.listTables("AIRLINE_DB")

[Table(name='flight_data_tbl', database='airline_db', description=None, tableType='MANAGED', isTemporary=False)]

In [9]:
spark.sql("SELECT * FROM flight_data_tbl").show()

+----------+----------+-----------------+------+----------------+----+--------------+------------+--------+---------+-------+------------+--------+---------+--------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|ORIGIN_CITY_NAME|DEST|DEST_CITY_NAME|CRS_DEP_TIME|DEP_TIME|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|CANCELLED|DISTANCE|
+----------+----------+-----------------+------+----------------+----+--------------+------------+--------+---------+-------+------------+--------+---------+--------+
|2000-01-01|        DL|             1451|   BOS|      Boston, MA| ATL|   Atlanta, GA|        1115|    1113|     1343|      5|        1400|    1348|        0|     946|
|2000-01-01|        DL|             1479|   BOS|      Boston, MA| ATL|   Atlanta, GA|        1315|    1311|     1536|      7|        1559|    1543|        0|     946|
|2000-01-01|        DL|             1857|   BOS|      Boston, MA| ATL|   Atlanta, GA|        1415|    1414|     1642|      9|        1721|    1651|        0|     946

In [10]:
spark.sql("SELECT count(*) FROM flight_data_tbl").show()

+--------+
|count(1)|
+--------+
|  470477|
+--------+



When you are working in local mode: these directories created in your local directory
When you working in cluster mode: it would configure by cluster admin and it would be in common location across your all spark applications.

However we created table. lets create partitioned table. add partitionedBy method. i am using two columns to partition my table.

In [12]:
flightTimeParquetDF.write \
        .mode("overwrite") \
        .partitionBy("OP_CARRIER", "ORIGIN") \
        .option("path", "dataSink/parquet/") \
        .saveAsTable("flight_data_tbl")

In [13]:
spark.sql("SELECT count(*) FROM flight_data_tbl").show()

+--------+
|count(1)|
+--------+
|  470477|
+--------+



Great i get more parttions say more than 100 or 200. however for large dataset still these 200+ parttions are fine but what if i have 10000 unique values, do you want to create 10k partitions?
absolutely not right?
so do not parttion your table that has got to many unique value.
Instead you can use bucketby.
The bucketBy allows you to restrict no of parttions
bucketBy(5, ) - limits to five partitions know as buckets. still i want to parttion on same two columns, add them
bucketBy(5, "OP_CARRIER", "ORIGIN" )
change to csv - as parquet is binary format, and to manually investigate data changing format purposefully

In [16]:
flightTimeParquetDF.write \
        .format("csv") \
        .mode("overwrite") \
        .bucketBy(5, "OP_CARRIER", "ORIGIN") \
        .option("path", "dataSink/csv/") \
        .saveAsTable("flight_data_tbl")

i got five parttions. backeting does not require lengthy directory structure. it is as simple as data files.
but how it happened?
I asked to create 5 buckets.
Spark created five files, each file is one bucket.
Now spark read one record, look at key column values, in our case it is op_carrier and origin, now spark will compute hash value, 
you got some hash number, divide it by 5 and take reminder, you are getting something 0-4, if number is 0 place record in first file.. and so on and soforth for each record.(there are various hashing method s are but basic principle is the same)
Each unique key going to produce same hash value and they are in one file.
these bucket improves join operation and if sorted then much more useful in certain operations. so bucketby also have sort by companion. 

In [17]:
flightTimeParquetDF.write \
        .format("csv") \
        .mode("overwrite") \
        .bucketBy(5, "OP_CARRIER", "ORIGIN") \
        .sortBy("OP_CARRIER", "ORIGIN") \
        .option("path", "dataSink/csv/") \
        .saveAsTable("flight_data_tbl")

In [18]:
spark.stop()