# DATA EXTRACTION FROM APACHE HIVE AND 
# ANALYSIS USING APACHE SPARK- SPARK SQL

In [None]:
# LIBRARIES:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp,month,expr

### Connecting Spark and Hive

In [2]:
spark = SparkSession.builder.appName("Data_Extraction_and_Analysis") \
        .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
        .enableHiveSupport() \
        .getOrCreate()

# appName = Provide a name for your Spark application
# config  = Set Hive warehouse directory
# enableHiveSupport = Enable Hive support

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/10/06 14:31:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Uploading Data from Apache Hive

In [None]:
data = spark.sql("SELECT * FROM sales_database.sales_data_table")

### Calculating Statistics

In [3]:
statistics = data.describe(["no_of_units", "price", "amount"])
statistics .show()

# Save the analyzed data 
statistics.write.csv("/home/rizwan/Data_Engineering_Project/Analysed_Data/Sales_Statistics")

Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
                                                                                

+-------+------------------+-----------------+------------------+
|summary|       no_of_units|            price|            amount|
+-------+------------------+-----------------+------------------+
|  count|              1560|             1560|              1560|
|   mean|12.547435897435898|972.6666666666666| 5426.029487179487|
| stddev| 15.46123231746398|671.4694648673942|2686.7067453959307|
|    min|                 1|             75.0|             630.0|
|    max|                90|           2001.0|           12006.0|
+-------+------------------+-----------------+------------------+



Count : There are 1560 data points in each of the columns (no_of_units, price, and amount).

Mean  : The average number of units sold is approximately 12.55, the average price is 
        approximately 972.67,and the average amount is approximately 5426.03.

Standard Deviation : The standard deviation for the number of units sold is approximately 15.46,  
                     for the price  is approximately 671.47, and for the amount is approximately 2686.71.

Minimum : The minimum number of units sold is 1, the minimum price is 75.0, and the minimum amount is 630.0.
    
Maximum : The maximum number of units sold is 90, the maximum price is 2001.0, and the maximum amount is 12006.0.

These statistics provide a basic overview of the distribution and central tendencies of your numeric columns.

---

### GROUPING AND AGGREGATION :

#### Total Sales by Product  [ Top Selling Product ]

In [8]:
product_sales = spark.sql("SELECT product,SUM(no_of_units) AS total_units,\
                           SUM(price) AS total_price,SUM(amount) AS product_sales \
                           FROM sales_database.sales_data_table \
                           GROUP BY product ORDER BY product_sales DESC")
product_sales.show()

# Save the analyzed data 
product_sales.write.csv("/home/rizwan/Data_Engineering_Project/Analysed_Data/product_sales")

+-----------------+-----------+-----------+-------------+
|          product|total_units|total_price|product_sales|
+-----------------+-----------+-----------+-------------+
|      Dell XPS 13|       1181|   520260.0|    2363181.0|
|    iPhone 11 Pro|       1057|   409500.0|    1664775.0|
|    OnePlus 8 Pro|       2056|   205400.0|    1624240.0|
|     HP Envy x360|       1058|   280800.0|    1142640.0|
|     OnePlus Buds|      11709|    19500.0|     878175.0|
|Apple AirPods Pro|       2513|    81900.0|     791595.0|
+-----------------+-----------+-----------+-------------+



"Dell XPS 13" is the top-selling product with a total sales amount of 2,363,181.

---

#### Total Sales by Product Category

In [14]:
category_sales = spark.sql("SELECT Category,SUM(amount) AS total_sales \
                            FROM sales_database.sales_data_table \
                            GROUP BY Category ORDER BY total_sales DESC")
category_sales.show()

# Save the analyzed data 
category_sales.write.csv("/home/rizwan/Data_Engineering_Project/Analysed_Data/category_sales")

+---------+-----------+
| Category|total_sales|
+---------+-----------+
|   Laptop|  3505821.0|
|    Phone|  3289015.0|
|Headphone|  1669770.0|
+---------+-----------+



The category that recorded the highest total sales is "Laptop" with a sales figure of 3,505,821.

---

#### Total Sales by Sales Representative   [ Top Performing Sales Representative ]

In [9]:
sale_rep_totalsales = spark.sql("SELECT sales_rep, SUM(amount) AS sale_rep_totalsales \
                                 FROM sales_database.sales_data_table \
                                 GROUP BY sales_rep ORDER BY sale_rep_totalsales DESC")
sale_rep_totalsales.show()

# Save the analyzed data 
sale_rep_totalsales.write.csv("/home/rizwan/Data_Engineering_Project/Analysed_Data/sale_rep_totalsales")

+---------+-------------------+
|sales_rep|sale_rep_totalsales|
+---------+-------------------+
|     Amar|          1141490.0|
|     Kate|          1022509.0|
|     Tara|           995525.0|
|    Aryan|           870300.0|
|    Leila|           846425.0|
|     Asif|           804762.0|
|    Giana|           742570.0|
|    Krish|           725580.0|
|    Bruce|           717555.0|
|    Laxmi|           597890.0|
+---------+-------------------+



The results are sorted in descending order of the "sale_rep_totalsales" column, showing the sales representatives with the highest total sales at the top.  
"Amar" is the top-performing sales representative with a total sales amount of 1,141,490.

---

#### Total Sales by city

In [10]:
sales_by_city = spark.sql("SELECT city, SUM(amount) AS totalsales_by_city \
                           FROM sales_database.sales_data_table \
                           GROUP BY city ORDER BY totalsales_by_city DESC")
sales_by_city.show()

# Save the analyzed data 
sales_by_city.write.csv("/home/rizwan/Data_Engineering_Project/Analysed_Data/sales_by_city")

+---------+------------------+
|     city|totalsales_by_city|
+---------+------------------+
|   Mumbai|         1748089.0|
|Hyderabad|         1739380.0|
|    Delhi|         1716725.0|
|Bangalore|         1713080.0|
|   Cochin|         1547332.0|
+---------+------------------+



"Mumbai" is the city with the highest total sales, with a total sales amount of 1,748,089.

---

#### Monthly Sales Analysis

In [11]:
df = data.withColumn('dte', to_timestamp(data['dte'], 'dd-MM-yyyy'))
# df.printSchema()

df = df.withColumn('month', month(df['dte']))  # Extract month
# df.show()

# Create a new column using 'expr' and 'if-elif-else' conditions
df = df.withColumn(
    'month_name',
    expr(
        "CASE "
        "WHEN month = 1 THEN 'January' "
        "WHEN month = 2 THEN 'February' "
        "WHEN month = 3 THEN 'March' "
        "WHEN month = 4 THEN 'April' "
        "WHEN month = 5 THEN 'May' "
        "WHEN month = 6 THEN 'June' "
        "WHEN month = 7 THEN 'July' "
        "WHEN month = 8 THEN 'August' "
        "WHEN month = 9 THEN 'September' "
        "WHEN month = 10 THEN 'October' "
        "WHEN month = 11 THEN 'November' "
        "WHEN month = 12 THEN 'December' "
        "ELSE NULL END"
    )
)
# df.show()

# Save the analyzed data 
df.write.csv("/home/rizwan/Data_Engineering_Project/Analysed_Data/Processed_Sales_Data")

                                                                                

In [12]:
# Group by month and calculate total sales for each month
monthly_sales = df.groupBy('month','month_name').agg({'amount':'sum'})
# monthly_sales = monthly_sales.sort(monthly_sales['month'])
# monthly_sales.show()

# Sort by total sales in descending order to find the month with the highest sales
monthly_sales = monthly_sales.sort(monthly_sales['sum(amount)'].desc())
monthly_sales.show()

# Save the analyzed data in ORC format
monthly_sales.write.csv("/home/rizwan/Data_Engineering_Project/Analysed_Data/monthly_sales")

+-----+----------+-----------+
|month|month_name|sum(amount)|
+-----+----------+-----------+
|   12|  December|   847373.0|
|    7|      July|   833267.0|
|   10|   October|   832298.0|
|    4|     April|   805897.0|
|    5|       May|   679908.0|
|   11|  November|   660067.0|
|    6|      June|   655666.0|
|    2|  February|   648716.0|
|    1|   January|   640162.0|
|    9| September|   632641.0|
|    8|    August|   618466.0|
|    3|     March|   610145.0|
+-----+----------+-----------+



"December" is the month with highest total sales with 847,373.

---
---