# Hive. Basics

In [1]:
!wget -q https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz
!rm spark-3.3.1-bin-hadoop3.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [3]:
import findspark
findspark.init('spark-3.3.1-bin-hadoop3')

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,ArrayType

In [5]:
spark = SparkSession.builder.master('local[*]').enableHiveSupport().appName("HiveTest").getOrCreate()

In [6]:
import pandas as pd

In [7]:
dataset1 = pd.read_csv("/content/drive/MyDrive/datasets/test_data.csv")

In [8]:
dataset1.head()

Unnamed: 0,region,manager,product,amount
0,r2,m1,pr3,12
1,r2,m5,pr6,49
2,r2,m3,pr1,49
3,r5,m4,pr7,59
4,r5,m2,pr2,68


In [9]:
schema = StructType([ \
    StructField("region",StringType(),True), \
    StructField("manager",StringType(),True),\
    StructField("product",StringType(),True),\
    StructField("amount",IntegerType(),True)
  ])
 
df = spark.read.format("csv") \
      .options(header='True', delimiter=',') \
      .schema(schema) \
      .load("/content/drive/MyDrive/datasets/test_data.csv")
df.printSchema()
df.show(7, truncate=False)

root
 |-- region: string (nullable = true)
 |-- manager: string (nullable = true)
 |-- product: string (nullable = true)
 |-- amount: integer (nullable = true)

+------+-------+-------+------+
|region|manager|product|amount|
+------+-------+-------+------+
|r2    |m1     |pr3    |12    |
|r2    |m5     |pr6    |49    |
|r2    |m3     |pr1    |49    |
|r5    |m4     |pr7    |59    |
|r5    |m2     |pr2    |68    |
|r1    |m4     |pr3    |50    |
|r2    |m5     |pr2    |21    |
+------+-------+-------+------+
only showing top 7 rows



### Create Database

In [10]:
spark.sql("""CREATE DATABASE IF NOT EXISTS test_db LOCATION '/content/hive/warehouse'""")

DataFrame[]

### Show Databases

In [11]:
spark.sql("""SHOW DATABASES;""").show(10)

+---------+
|namespace|
+---------+
|  default|
|  test_db|
+---------+



### Use Database

In [12]:
spark.sql("""USE test_db;""")

DataFrame[]

### Describe Database

In [13]:
spark.sql("""DESCRIBE DATABASE EXTENDED test_db;""").show(truncate=False)

+--------------+----------------------------+
|info_name     |info_value                  |
+--------------+----------------------------+
|Namespace Name|test_db                     |
|Comment       |                            |
|Location      |file:/content/hive/warehouse|
|Owner         |root                        |
|Properties    |                            |
+--------------+----------------------------+



In [14]:
spark.sql("""DESCRIBE SCHEMA EXTENDED test_db;""").show(truncate=False)

+--------------+----------------------------+
|info_name     |info_value                  |
+--------------+----------------------------+
|Namespace Name|test_db                     |
|Comment       |                            |
|Location      |file:/content/hive/warehouse|
|Owner         |root                        |
|Properties    |                            |
+--------------+----------------------------+



### Drop Database

In [15]:
spark.sql("""CREATE DATABASE IF NOT EXISTS test_db_tmp LOCATION '/content/hive/warehouse'""")

DataFrame[]

In [16]:
spark.sql("""DROP DATABASE  test_db_tmp;""")

DataFrame[]

### Create Table. Example 1

In [17]:
spark.sql("""DROP TABLE IF EXISTS test_db.sales_spark;""")

DataFrame[]

In [18]:
(
    df.write
    .partitionBy("product")
    .mode("overwrite")
    .saveAsTable("test_db.sales_spark")
 
)

In [19]:
spark.sql("""SHOW TABLES IN test_db""").show(100, truncate=False)

+---------+-----------+-----------+
|namespace|tableName  |isTemporary|
+---------+-----------+-----------+
|test_db  |sales_spark|false      |
+---------+-----------+-----------+



In [20]:
spark.sql("""DESCRIBE FORMATTED test_db.sales_spark""").show(100, truncate=False)

+----------------------------+--------------------------------------------------------------+-------+
|col_name                    |data_type                                                     |comment|
+----------------------------+--------------------------------------------------------------+-------+
|region                      |string                                                        |null   |
|manager                     |string                                                        |null   |
|amount                      |int                                                           |null   |
|product                     |string                                                        |null   |
|# Partition Information     |                                                              |       |
|# col_name                  |data_type                                                     |comment|
|product                     |string                                              

In [21]:
spark.sql("""SELECT db.* FROM test_db.sales_spark as db""").show(10)

+------+-------+------+-------+
|region|manager|amount|product|
+------+-------+------+-------+
|    r5|     m2|    68|    pr2|
|    r2|     m5|    21|    pr2|
|    r6|     m3|    22|    pr2|
|    r5|     m3|    46|    pr2|
|    r9|     m4|    82|    pr2|
|   r10|     m2|    74|    pr2|
|    r2|     m1|    12|    pr3|
|    r1|     m4|    50|    pr3|
|    r1|     m5|    59|    pr3|
|    r6|     m5|    28|    pr3|
+------+-------+------+-------+
only showing top 10 rows



### Show All Partitions on Hive Table

In [22]:
spark.sql("""SHOW PARTITIONS test_db.sales_spark;""").show(100, truncate=False)

+-----------+
|partition  |
+-----------+
|product=pr1|
|product=pr2|
|product=pr3|
|product=pr4|
|product=pr5|
|product=pr6|
|product=pr7|
+-----------+



### Create Table. Example 2

In [23]:
spark.sql("""DROP TABLE IF EXISTS test_db.sales_hive;""")

DataFrame[]

In [24]:
spark.sql("""CREATE TABLE IF NOT EXISTS test_db.sales_hive (                                       
                                        manager string,
                                        product string,
                                        amount int )
                                        PARTITIONED BY(region string)
                                        COMMENT 'Sales Table. Hive'
                                        ROW FORMAT DELIMITED
                                        FIELDS TERMINATED BY ',';""")

DataFrame[]

In [25]:
spark.sql("""SHOW TABLES IN test_db""").show(100, truncate=False)

+---------+-----------+-----------+
|namespace|tableName  |isTemporary|
+---------+-----------+-----------+
|test_db  |sales_hive |false      |
|test_db  |sales_spark|false      |
+---------+-----------+-----------+



In [26]:
spark.sql("""DESCRIBE test_db.sales_hive""").show(100, truncate=False)

+-----------------------+---------+-------+
|col_name               |data_type|comment|
+-----------------------+---------+-------+
|manager                |string   |null   |
|product                |string   |null   |
|amount                 |int      |null   |
|region                 |string   |null   |
|# Partition Information|         |       |
|# col_name             |data_type|comment|
|region                 |string   |null   |
+-----------------------+---------+-------+



### Hive Load CSV File into Table

Note: Remember the partitioned column should be the last column on the file to loaded data into right partitioned column of the table.

If your partition column is not at the end then you need to do following.

* Create another table without partition.
* Load data into the table (assume "region" is at first column).
* Insert into the partitioned table by selecting columns from the non-partitioned table (make sure you select "region" at the end).

In [27]:
spark.sql("""DROP TABLE IF EXISTS test_db.sales_hive_tmp;""")

DataFrame[]

In [28]:
spark.sql("""CREATE TABLE IF NOT EXISTS test_db.sales_hive_tmp (
                                                  region string,                                       
                                                  manager string,
                                                  product string,
                                                  amount int)
                                                  COMMENT 'Sales Table. TMP'
                                                  ROW FORMAT DELIMITED
                                                  FIELDS TERMINATED BY ',';""")

DataFrame[]

In [29]:
dataset2 = pd.read_csv("/content/drive/MyDrive/datasets/test_data_without_header.csv",header=None)

In [30]:
dataset2.head()

Unnamed: 0,0,1,2,3
0,r2,m1,pr3,12
1,r2,m5,pr6,49
2,r2,m3,pr1,49
3,r5,m4,pr7,59
4,r5,m2,pr2,68


In [31]:
spark.sql("""LOAD DATA LOCAL INPATH '/content/drive/MyDrive/datasets/test_data_without_header.csv' 
             INTO TABLE test_db.sales_hive_tmp;""")

DataFrame[]

In [32]:
spark.sql("""SELECT *
            FROM test_db.sales_hive_tmp 
            LIMIT 10;""").show()

+------+-------+-------+------+
|region|manager|product|amount|
+------+-------+-------+------+
|    r2|     m1|    pr3|    12|
|    r2|     m5|    pr6|    49|
|    r2|     m3|    pr1|    49|
|    r5|     m4|    pr7|    59|
|    r5|     m2|    pr2|    68|
|    r1|     m4|    pr3|    50|
|    r2|     m5|    pr2|    21|
|    r2|     m5|    pr1|    21|
|    r6|     m4|    pr4|    68|
|    r6|     m3|    pr2|    22|
+------+-------+-------+------+



Use INSERT OVERWRITE TABLE to delete existing data in a partition and load with new data.

In [33]:
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
spark.sql("""INSERT OVERWRITE TABLE test_db.sales_hive PARTITION(region) 
             SELECT manager,product,amount,region FROM test_db.sales_hive_tmp;""")

DataFrame[]

In [34]:
spark.sql("""SELECT *
            FROM test_db.sales_hive 
            LIMIT 10;""").show()

+-------+-------+------+------+
|manager|product|amount|region|
+-------+-------+------+------+
|     m2|    pr2|    74|   r10|
|     m3|    pr6|    66|    r4|
|     m5|    pr5|    79|    r4|
|     m2|    pr5|    51|    r4|
|     m2|    pr3|    84|    r4|
|     m4|    pr4|    68|    r6|
|     m3|    pr2|    22|    r6|
|     m2|    pr1|    64|    r6|
|     m5|    pr3|    28|    r6|
|     m4|    pr3|    50|    r1|
+-------+-------+------+------+



### Truncate Table

In [35]:
spark.sql("""TRUNCATE TABLE test_db.sales_hive""")

DataFrame[]

### Insert Data into Partition Table

In [36]:
spark.sql("""SELECT *
            FROM test_db.sales_hive 
            LIMIT 10;""").show()

+-------+-------+------+------+
|manager|product|amount|region|
+-------+-------+------+------+
+-------+-------+------+------+



In [37]:
spark.sql("""INSERT INTO test_db.sales_hive PARTITION(region='r1') VALUES ('m1','pr1',1);""")

DataFrame[]

In [38]:
spark.sql("""INSERT INTO test_db.sales_hive PARTITION(region='r2') VALUES ('m2','pr2',2);""")

DataFrame[]

In [39]:
spark.sql("""SELECT *
            FROM test_db.sales_hive 
            LIMIT 10;""").show()

+-------+-------+------+------+
|manager|product|amount|region|
+-------+-------+------+------+
|     m1|    pr1|     1|    r1|
|     m2|    pr2|     2|    r2|
+-------+-------+------+------+



### String Functions

In [40]:
spark.sql("""SELECT tbl.region,
                    tbl.manager,
                    tbl.product,
                    tbl.amount,
                    concat_ws(" - ", tbl.region, tbl.manager, tbl.product) as col_concat,
                    printf('%d amount', tbl.amount) as col_printf,
                    replace(tbl.region, "r", "region") as col_replace,
                    reverse(tbl.manager) as col_reverse,
                    split(tbl.product,"r") as col_split,
                    upper(tbl.manager) as col_upper
             FROM test_db.sales_spark as tbl
             ORDER BY tbl.region, tbl.manager
             LIMIT 10""").show()

+------+-------+-------+------+--------------+----------+-----------+-----------+---------+---------+
|region|manager|product|amount|    col_concat|col_printf|col_replace|col_reverse|col_split|col_upper|
+------+-------+-------+------+--------------+----------+-----------+-----------+---------+---------+
|    r1|     m1|    pr5|    46| r1 - m1 - pr5| 46 amount|    region1|         1m|   [p, 5]|       M1|
|    r1|     m4|    pr3|    50| r1 - m4 - pr3| 50 amount|    region1|         4m|   [p, 3]|       M4|
|    r1|     m5|    pr3|    59| r1 - m5 - pr3| 59 amount|    region1|         5m|   [p, 3]|       M5|
|   r10|     m2|    pr2|    74|r10 - m2 - pr2| 74 amount|   region10|         2m|   [p, 2]|       M2|
|    r2|     m1|    pr3|    12| r2 - m1 - pr3| 12 amount|    region2|         1m|   [p, 3]|       M1|
|    r2|     m2|    pr3|    85| r2 - m2 - pr3| 85 amount|    region2|         2m|   [p, 3]|       M2|
|    r2|     m3|    pr1|    49| r2 - m3 - pr1| 49 amount|    region2|         3m| 

### Conditional Functions

In [41]:
spark.sql("""SELECT tbl.region,
                    tbl.manager,
                    tbl.product,
                    tbl.amount,
                    if(tbl.amount>40,'yes','no') as col_if,
                    case tbl.region WHEN 'r1' THEN 1
                                    WHEN 'r2' THEN 2
                                    ELSE 0
                                    END as col_case
             FROM test_db.sales_spark as tbl
             ORDER BY tbl.region, tbl.manager
             LIMIT 10""").show()

+------+-------+-------+------+------+--------+
|region|manager|product|amount|col_if|col_case|
+------+-------+-------+------+------+--------+
|    r1|     m1|    pr5|    46|   yes|       1|
|    r1|     m4|    pr3|    50|   yes|       1|
|    r1|     m5|    pr3|    59|   yes|       1|
|   r10|     m2|    pr2|    74|   yes|       0|
|    r2|     m1|    pr3|    12|    no|       2|
|    r2|     m2|    pr3|    85|   yes|       2|
|    r2|     m3|    pr1|    49|   yes|       2|
|    r2|     m5|    pr2|    21|    no|       2|
|    r2|     m5|    pr1|    57|   yes|       2|
|    r2|     m5|    pr1|    21|    no|       2|
+------+-------+-------+------+------+--------+



### Collection Functions

In [42]:
spark.sql("""SELECT tbl.region,
                    tbl.manager,
                    tbl.product,
                    tbl.amount,
                    split(tbl.product,"r") as col_split,
                    size(split(tbl.product,"r")) as col_size,
                    array_contains(split(tbl.product,"r"),'3') as col_array_contains
             FROM test_db.sales_spark as tbl
             ORDER BY tbl.region, tbl.manager
             LIMIT 10""").show()

+------+-------+-------+------+---------+--------+------------------+
|region|manager|product|amount|col_split|col_size|col_array_contains|
+------+-------+-------+------+---------+--------+------------------+
|    r1|     m1|    pr5|    46|   [p, 5]|       2|             false|
|    r1|     m4|    pr3|    50|   [p, 3]|       2|              true|
|    r1|     m5|    pr3|    59|   [p, 3]|       2|              true|
|   r10|     m2|    pr2|    74|   [p, 2]|       2|             false|
|    r2|     m1|    pr3|    12|   [p, 3]|       2|              true|
|    r2|     m2|    pr3|    85|   [p, 3]|       2|              true|
|    r2|     m3|    pr1|    49|   [p, 1]|       2|             false|
|    r2|     m5|    pr2|    21|   [p, 2]|       2|             false|
|    r2|     m5|    pr1|    57|   [p, 1]|       2|             false|
|    r2|     m5|    pr1|    21|   [p, 1]|       2|             false|
+------+-------+-------+------+---------+--------+------------------+



### Aggregate Functions

In [43]:
spark.sql("""SELECT tbl.region,
                    tbl.manager,
                    sum(tbl.amount) as col_sum,
                    count(tbl.amount) as col_count,
                    avg(tbl.amount) as col_avg,
                    min(tbl.amount) as col_min,
                    max(tbl.amount) as col_max,
                    collect_set(tbl.amount) as col_collect_set,
                    collect_list(tbl.amount) as col_collect_list
             FROM test_db.sales_spark as tbl
             GROUP BY tbl.region, tbl.manager
             ORDER BY tbl.region, tbl.manager
             LIMIT 10""").show()

+------+-------+-------+---------+-------+-------+-------+----------------+--------------------+
|region|manager|col_sum|col_count|col_avg|col_min|col_max| col_collect_set|    col_collect_list|
+------+-------+-------+---------+-------+-------+-------+----------------+--------------------+
|    r1|     m1|     46|        1|   46.0|     46|     46|            [46]|                [46]|
|    r1|     m4|     50|        1|   50.0|     50|     50|            [50]|                [50]|
|    r1|     m5|     59|        1|   59.0|     59|     59|            [59]|                [59]|
|   r10|     m2|     74|        1|   74.0|     74|     74|            [74]|                [74]|
|    r2|     m1|     12|        1|   12.0|     12|     12|            [12]|                [12]|
|    r2|     m2|     85|        1|   85.0|     85|     85|            [85]|                [85]|
|    r2|     m3|     49|        1|   49.0|     49|     49|            [49]|                [49]|
|    r2|     m5|    163|      