In [0]:
#Using PySpark to Handle Hive Query

# 1. Import the pyspark and pyspark SQL modules and also specify the app name 
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
appName= "hive_pyspark"
master= "local"

In [0]:
# 2. Create a spark session and enable the Hive support to interact with the hive database
spark = SparkSession.builder \
	.master(master).appName(appName).enableHiveSupport().getOrCreate() 
spark.sql("CREATE DATABASE IF NOT EXISTS spark_example").show()
spark.sql("DESCRIBE DATABASE spark_example").show(truncate=False)
spark.sql("USE spark_example").show() 


++
||
++
++

+-------------------------+------------------------------------------+
|database_description_item|database_description_value                |
+-------------------------+------------------------------------------+
|Catalog Name             |spark_catalog                             |
|Namespace Name           |spark_example                             |
|Comment                  |                                          |
|Location                 |dbfs:/user/hive/warehouse/spark_example.db|
|Owner                    |root                                      |
+-------------------------+------------------------------------------+

++
||
++
++



In [0]:
#3. Verify the databases in hive using pyspark
df=spark.sql("show databases")
df.show()

+-------------+
| databaseName|
+-------------+
|      default|
|spark_example|
+-------------+



In [0]:
# 4. Read the CSV file from the local write to the table in hive using pyspark 
#datafile = spark.read.csv("dbfs:/FileStore/shared_uploads/pawansharma2045@gmail.com/Train.csv",header=True)
datafile = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/pawansharma2045@gmail.com/Train.csv")
datafile.show(5)
datafile.write.saveAsTable("Train")

+-----------+------+----+---------------+-------------------+-----------+--------------------+----------------------+---------------+--------------------+-----------+-----------+----------+---------+-------------------------+-----------------+----+-------+-------+----+----+----+----+--------------+
|Employee_ID|Gender| Age|Education_Level|Relationship_Status|   Hometown|                Unit|Decision_skill_possess|Time_of_service|Time_since_promotion|growth_rate|Travel_Rate|Post_Level|Pay_Scale|Compensation_and_Benefits|Work_Life_balance|VAR1|   VAR2|   VAR3|VAR4|VAR5|VAR6|VAR7|Attrition_rate|
+-----------+------+----+---------------+-------------------+-----------+--------------------+----------------------+---------------+--------------------+-----------+-----------+----------+---------+-------------------------+-----------------+----+-------+-------+----+----+----+----+--------------+
|  EID_23371|     F|42.0|              4|            Married|   Franklin|                  IT|      

In [0]:
# 5. Fetch rows from the table in hive using pyspark and store them in the dataframe
df=spark.sql("select * from Train limit 5")
df.show()

+-----------+------+----+---------------+-------------------+-----------+--------------------+----------------------+---------------+--------------------+-----------+-----------+----------+---------+-------------------------+-----------------+----+-------+-------+----+----+----+----+--------------+
|Employee_ID|Gender| Age|Education_Level|Relationship_Status|   Hometown|                Unit|Decision_skill_possess|Time_of_service|Time_since_promotion|growth_rate|Travel_Rate|Post_Level|Pay_Scale|Compensation_and_Benefits|Work_Life_balance|VAR1|   VAR2|   VAR3|VAR4|VAR5|VAR6|VAR7|Attrition_rate|
+-----------+------+----+---------------+-------------------+-----------+--------------------+----------------------+---------------+--------------------+-----------+-----------+----------+---------+-------------------------+-----------------+----+-------+-------+----+----+----+----+--------------+
|  EID_23371|     F|42.0|              4|            Married|   Franklin|                  IT|      

In [0]:
spark.sql("DESCRIBE FORMATTED Train").show(truncate=False)

+-------------------------+---------+-------+
|col_name                 |data_type|comment|
+-------------------------+---------+-------+
|Employee_ID              |string   |null   |
|Gender                   |string   |null   |
|Age                      |string   |null   |
|Education_Level          |string   |null   |
|Relationship_Status      |string   |null   |
|Hometown                 |string   |null   |
|Unit                     |string   |null   |
|Decision_skill_possess   |string   |null   |
|Time_of_service          |string   |null   |
|Time_since_promotion     |string   |null   |
|growth_rate              |string   |null   |
|Travel_Rate              |string   |null   |
|Post_Level               |string   |null   |
|Pay_Scale                |string   |null   |
|Compensation_and_Benefits|string   |null   |
|Work_Life_balance        |string   |null   |
|VAR1                     |string   |null   |
|VAR2                     |string   |null   |
|VAR3                     |string 

In [0]:
# 6. Print the schema of the table in hive using pyspark
df.printSchema()

root
 |-- Employee_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Education_Level: string (nullable = true)
 |-- Relationship_Status: string (nullable = true)
 |-- Hometown: string (nullable = true)
 |-- Unit: string (nullable = true)
 |-- Decision_skill_possess: string (nullable = true)
 |-- Time_of_service: string (nullable = true)
 |-- Time_since_promotion: string (nullable = true)
 |-- growth_rate: string (nullable = true)
 |-- Travel_Rate: string (nullable = true)
 |-- Post_Level: string (nullable = true)
 |-- Pay_Scale: string (nullable = true)
 |-- Compensation_and_Benefits: string (nullable = true)
 |-- Work_Life_balance: string (nullable = true)
 |-- VAR1: string (nullable = true)
 |-- VAR2: string (nullable = true)
 |-- VAR3: string (nullable = true)
 |-- VAR4: string (nullable = true)
 |-- VAR5: string (nullable = true)
 |-- VAR6: string (nullable = true)
 |-- VAR7: string (nullable = true)
 |-- Attrition_rate: stri

In [0]:

#Read and Load DATA into Employee table 

df1 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/pawansharma2045@gmail.com/employees.csv")
df1.write.saveAsTable("Employees")

df2 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/pawansharma2045@gmail.com/sales_info.csv")
df2.write.saveAsTable("Sales1")

df3 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/pawansharma2045@gmail.com/sales_info1.csv")
df3.write.saveAsTable("Sales2")

In [0]:
#Print the dataframe schema (column and data type)

df1.printSchema()
df2.printSchema()
df3.printSchema()

root
 |-- employee_id: string (nullable = true)
 |-- employee_name: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- department: string (nullable = true)

root
 |-- employee_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_type: string (nullable = true)
 |-- sales_count: string (nullable = true)
 |-- sales_price: string (nullable = true)

root
 |-- employee_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_type: string (nullable = true)
 |-- sales_count: string (nullable = true)
 |-- sales_price: string (nullable = true)



In [0]:
# Retrieve Information from table Employee, Sales1, Sales2

spark.sql("select * from Employees").show(truncate=False)
spark.sql("select * from Sales1").show(truncate=False)
spark.sql("select * from Sales2").show(truncate=False)



+-----------+-------------+---------+-------------------------+
|employee_id|employee_name|salary   |department               |
+-----------+-------------+---------+-------------------------+
|101        |ALAX         |500000.00|CONSUMER_SALES           |
|102        |TOM          |500000.00|CONSUMER_SALES           |
|103        |JIM          |600000.00|CONSUMER_SALES           |
|104        |MARK         |500000.00|CONSUMER_SALES           |
|105        |TOMI         |700000.00|ELETRONIC_SALES          |
|106        |AMALI        |800000.00|CONSUMER_SALES           |
|107        |ROY          |900000.00|CONSUMER_SALES           |
|108        |FILIP        |300000.00|AGRICULTURE_PORDUCT_SALES|
|109        |MONNA        |800000.00|CONSUMER_SALES           |
|110        |RONME        |500000.00|ELETRONIC_SALES          |
|111        |SAMULE       |500000.00|AGRICULTURE_PORDUCT_SALES|
|112        |TOD          |900000.00|AGRICULTURE_PORDUCT_SALES|
|113        |MILI         |500000.00|ELE

In [0]:
#Show 
spark.sql("DESCRIBE FORMATTED Employees").show(truncate=False)
spark.sql("DESCRIBE FORMATTED Sales1").show(truncate=False)
spark.sql("DESCRIBE FORMATTED Sales2").show(truncate=False)


+----------------------------+----------------------------------------------------+-------+
|col_name                    |data_type                                           |comment|
+----------------------------+----------------------------------------------------+-------+
|employee_id                 |string                                              |null   |
|employee_name               |string                                              |null   |
|salary                      |string                                              |null   |
|department                  |string                                              |null   |
|                            |                                                    |       |
|# Detailed Table Information|                                                    |       |
|Catalog                     |spark_catalog                                       |       |
|Database                    |spark_example                                     

In [0]:
#Import spark sql types 
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

#Create spark schema by StructType and StructField

employees_info = StructType([
    StructField('employee_id', IntegerType(), True),
    StructField('employee_name', StringType(), True),
    StructField('salary', DoubleType(), True),
    StructField('department', StringType(), True)
])

#Load data from CSV to Spark Dataframe
employees_info = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/pawansharma2045@gmail.com/employees.csv")

#Check and Show dataframe data by .show() method
employees_info.show(20, truncate=False)

#Print The schema of dataframe by .printSchema()
employees_info.printSchema()


+-----------+-------------+---------+-------------------------+
|employee_id|employee_name|salary   |department               |
+-----------+-------------+---------+-------------------------+
|101        |ALAX         |500000.00|CONSUMER_SALES           |
|102        |TOM          |500000.00|CONSUMER_SALES           |
|103        |JIM          |600000.00|CONSUMER_SALES           |
|104        |MARK         |500000.00|CONSUMER_SALES           |
|105        |TOMI         |700000.00|ELETRONIC_SALES          |
|106        |AMALI        |800000.00|CONSUMER_SALES           |
|107        |ROY          |900000.00|CONSUMER_SALES           |
|108        |FILIP        |300000.00|AGRICULTURE_PORDUCT_SALES|
|109        |MONNA        |800000.00|CONSUMER_SALES           |
|110        |RONME        |500000.00|ELETRONIC_SALES          |
|111        |SAMULE       |500000.00|AGRICULTURE_PORDUCT_SALES|
|112        |TOD          |900000.00|AGRICULTURE_PORDUCT_SALES|
|113        |MILI         |500000.00|ELE

In [0]:
sales1_info = StructType([\
    StructField('employee_id', IntegerType(), True),\
    StructField('product_name', StringType(), True),\
    StructField('product_type', StringType(), True),\
    StructField('sales_count', IntegerType(), True),\
    StructField('sales_price', DoubleType(), True)\
  ])
  
sales1_info = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/pawansharma2045@gmail.com/sales_info.csv")
sales1_info.show(20, truncate=False)
sales1_info.printSchema()

+-----------+-------------------+------------+-----------+-----------+
|employee_id|product_name       |product_type|sales_count|sales_price|
+-----------+-------------------+------------+-----------+-----------+
|101        |MANGO              |Food        |300        |400.50     |
|101        |BANANA             |Food        |300        |200.60     |
|102        |MANGO              |Food        |350        |400.50     |
|103        |ORANGE             |Food        |500        |350.67     |
|104        |GRAPE              |Food        |700        |800.45     |
|103        |MANGO              |Food        |500        |400.50     |
|105        |SMART TV           |ELETRONIC   |5          |50000      |
|105        |MOBILE             |ELETRONIC   |8          |60000      |
|113        |RICE COOKER        |ELETRONIC   |5          |4000       |
|112        |RICE SEED 10 KG BAG|AGRICULTURE |30         |2000       |
+-----------+-------------------+------------+-----------+-----------+

root


In [0]:
sales2_info = StructType([\
    StructField('employee_id', IntegerType(), True),\
    StructField('product_name', StringType(), True),\
    StructField('product_type', StringType(), True),\
    StructField('sales_count', IntegerType(), True),\
    StructField('sales_price', DoubleType(), True)\
  ])
sales2_info = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/pawansharma2045@gmail.com/sales_info1.csv")
sales2_info.show(20, truncate=False)
sales2_info.printSchema()


+-----------+-------------------+------------+-----------+-----------+
|employee_id|product_name       |product_type|sales_count|sales_price|
+-----------+-------------------+------------+-----------+-----------+
|101        |MANGO              |Food        |300        |400.50     |
|101        |BANANA             |Food        |300        |200.60     |
|102        |MANGO              |Food        |350        |400.50     |
|103        |ORANGE             |Food        |500        |350.67     |
|104        |GRAPE              |Food        |700        |800.45     |
|103        |MANGO              |Food        |500        |400.50     |
|105        |SMART TV           |ELETRONIC   |5          |50000      |
|105        |MOBILE             |ELETRONIC   |8          |60000      |
|113        |RICE COOKER        |ELETRONIC   |5          |4000       |
|112        |RICE SEED 10 KG BAG|AGRICULTURE |30         |2000       |
+-----------+-------------------+------------+-----------+-----------+

root
