In [1]:
import os
# from os.path import expanduser, join
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext, Row

In [2]:
cwd = os.getcwd()
for part in cwd.split('/'):
    if part.lower().startswith('edureka'):
        user_id = part.title()
app_name = '{0} : Hive Integration'.format(user_id)
app_name

'Edureka_121039 : Hive Integration'

In [3]:
spark = SparkSession.builder.appName(app_name).getOrCreate()
sparkContext = spark.sparkContext
sqlContext = SQLContext(sparkContext)

In [4]:
def get_hdfs_filepath(file_name):
    my_hdfs = '/user/{0}'.format(user_id.lower())
    return os.path.join(my_hdfs, file_name)

### Sample Dataset

In [5]:
SAMPLE_TXT = get_hdfs_filepath('sampleData.txt')

### Refer warehouse location

In [6]:
spark.sparkContext.getConf().getAll()
# can be over-written using below
# SparkSession.builder.appName().config("spark.sql.warehouse.dir", "warehouse-location")

[(u'spark.dynamicAllocation.enabled', u'false'),
 (u'spark.eventLog.enabled', u'true'),
 (u'spark.port.maxRetries', u'1000'),
 (u'spark.yarn.jars',
  u'local:/opt/cloudera/parcels/SPARK2-2.1.0.cloudera2-1.cdh5.7.0.p0.171658/lib/spark2/jars/*'),
 (u'spark.executorEnv.PYTHONPATH',
  u'/opt/cloudera/parcels/SPARK2-2.1.0.cloudera2-1.cdh5.7.0.p0.171658/lib/spark2/python/lib/py4j-0.10.4-src.zip<CPS>/opt/cloudera/parcels/SPARK2-2.1.0.cloudera2-1.cdh5.7.0.p0.171658/lib/spark2/python/lib/pyspark.zip'),
 (u'spark.executor.extraLibraryPath',
  u'/opt/cloudera/parcels/CDH-5.11.1-1.cdh5.11.1.p0.4/lib/hadoop/lib/native'),
 (u'spark.ui.killEnabled', u'true'),
 (u'spark.eventLog.dir', u'hdfs://nameservice1/user/spark/applicationHistory'),
 (u'spark.dynamicAllocation.executorIdleTimeout', u'60'),
 (u'spark.serializer', u'org.apache.spark.serializer.KryoSerializer'),
 (u'spark.authenticate', u'false'),
 (u'spark.sql.hive.metastore.jars',
  u'${env:HADOOP_COMMON_HOME}/../hive/lib/*:${env:HADOOP_COMMON_HO

### Hive Metastore
Spark SQL uses a Hive metastore to manage the metadata of persistent relational entities (e.g. databases, tables, columns, partitions) in a relational database (for fast access).

A Hive metastore warehouse (aka spark-warehouse) is the directory where Spark SQL persists tables whereas a Hive metastore (aka metastore_db) is a relational database to manage the metadata of the persistent relational entities, e.g. databases, tables, columns, partitions.

By default, Spark SQL uses the embedded deployment mode of a Hive metastore with a Apache Derby database.

When SparkSession is created with Hive support the external catalog (aka metastore) is HiveExternalCatalog. HiveExternalCatalog uses spark.sql.warehouse.dir directory for the location of the databases

The benefits of using an external Hive metastore:
* Allow multiple Spark applications (sessions) to access it concurrently
* Allow a single Spark application to use table statistics without running "ANALYZE TABLE" every execution

<font color=blue>*Spark SQL uses the Hive-specific configuration properties that further fine-tune the Hive integration, e.g. spark.sql.hive.metastore.version or spark.sql.hive.metastore.jars.*</font>

**spark.sql.warehouse.dir** is a static configuration property that sets Hive’s hive.metastore.warehouse.dir property, i.e. the location of the Hive local/embedded metastore database (using Derby)

### Create table in Hive

In [15]:
create_tbl = "CREATE TABLE IF NOT EXISTS SPARKHIVE_{}(age INT, name STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'"
sqlContext.sql(create_tbl.format(user_id.upper()))

DataFrame[]

### Load data in Hive table

In [16]:
overwrite_tbl = "LOAD DATA LOCAL INPATH '/mnt/home/{}/sampleData.txt' OVERWRITE INTO TABLE SPARKHIVE_{}"
insert_to_tbl = "LOAD DATA LOCAL INPATH '/mnt/home/{}/sampleData.txt' INTO TABLE SPARKHIVE_{}"
overwrite_tbl = "LOAD DATA INPATH '/user/{}/sampleData.txt' OVERWRITE INTO TABLE SPARKHIVE_{}"

final_stmt = overwrite_tbl.format(user_id.lower(),user_id.upper())
sqlContext.sql(final_stmt)

DataFrame[]

### Run queries on Hive table

In [17]:
select_sql = "select * from SPARKHIVE_{}".format(user_id.upper())
sqlContext.sql(select_sql).show()

+---+------+
|age|  name|
+---+------+
| 30| Brian|
| 35|  Alex|
| 45| Shyam|
| 70| Trump|
+---+------+



### Truncate Hive table

In [10]:
truncate_sql = "TRUNCATE TABLE SPARKHIVE_{}".format(user_id.upper())
sqlContext.sql(truncate_sql)

DataFrame[]

### Existing tables

In [11]:
tables = sqlContext.sql("SHOW TABLES")
tables.where("tableName like '%sparkhive%'").show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|20181121_sparkhiv...|      false|
| default|           sparkhive|      false|
| default|    sparkhive_431591|      false|
| default|sparkhive_edureka...|      false|
+--------+--------------------+-----------+



### Drop Hive tables

In [13]:
drop_sql = "DROP TABLE IF EXISTS SPARKHIVE_{}".format(user_id.upper())
sqlContext.sql(drop_sql)

DataFrame[]