### Class for Managing Spark and Hive Sessions

In [None]:
from dataclasses import dataclass
from pyspark.sql import SparkSession
from os.path import abspath

@dataclass
class BigDataProcessing():

    warehouse_location = abspath('spark-warehouse\one')
    
    # Class variable
    spark = SparkSession.builder \
                        .appName("DataProcess") \
                        .enableHiveSupport() \
                        .getOrCreate()

                            # .config("spark.sql.warehouse.dir", warehouse_location) \

    @classmethod
    def get_data_frame(cls, path):
        """
            Upload the data to spark and return the Data Frame
        """
        return cls.spark.read.csv(path = path, header = True, inferSchema = True)

    @classmethod
    def create_hive_tables(cls) -> None:
        """
        """
        # Create a new Hive database
        cls.spark.sql("CREATE DATABASE IF NOT EXISTS sparks")

        # Use the database
        cls.spark.sql("USE sparks")

    @classmethod
    def retrive_hive_tables(cls, table_name: str) -> None:
        print("From Hive : \n")
        cls.spark.sql(f"SELECT COUNT(*) FROM {table_name}").show()
        cls.spark.sql(f"SELECT * FROM {table_name} WHERE cust_address > \"chennai\"").show()

    @classmethod
    def show_hive_tables(cls, table_name: str) -> None:
        cls.spark.sql("SHOW DATABASES").show()
        cls.spark.sql("SHOW TABLES").show()
        cls.spark.sql(f"DESCRIBE {table_name}").show()
        # cls.spark.sql(f"SELECT * FROM {table_name}").show()

    @classmethod
    def drop_hive_tables(cls, table_name: str) -> None:
        cls.spark.sql(f"DROP TABLE {table_name}")
        
    @classmethod
    def stop_spark_session(cls) -> None:
        """
            Stops the spark session
        """
        if cls.spark is not None and cls.spark.sparkContext is not None:
            cls.spark.stop()

### Entry Point

In [None]:
import zipfile
import os
import sys
from pyspark.sql.functions import regexp_replace, col

def unzip_file(file: str, dest_path: str, password: str = None) -> None:

    """
    Unzips the specified .zip file into the given directory.

    :param file: file we need to extract
    :dest_path: destination path were extracted files will be stored
    :password: password if the zip is password protected else it was None
    """
    try:
        # Create the directory if it doesn't exist for storing the extracted files
        os.makedirs(dest_path, exist_ok=True)
    
        # If the file was protected with password
        if password is not None:
            with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                # Extract all contents into the specified directory
                zip_ref.extractall(dest_path, pwd=password)
        else:
            with zipfile.ZipFile(file, 'r') as zip_ref: 
                # Extract all contents into the specified directory
                zip_ref.extractall(dest_path)
    except:
        print("Exception occurred in unzip_file")

def file_check(file_path: str, file: str) -> int:
    """
        Get the file and return its size
    """
    try:
        return os.path.getsize(file_path+'/'+file)
    except:
        return 0
        
# Execution Starts Here
if __name__ == "__main__":
    
    zip_file: str = "/home/neon/Essentials/Jupyter/Hands-on-Project/a.zip"
    dest_file_path: str = "/home/neon/Essentials/Jupyter/Hands-on-Project/a/"
    ack_filename: str = "a.ack"
    hive_table_name: str = "transaction_master"

    # Unzip the specified file and store in destination path
    unzip_file(zip_file, dest_file_path)

    # Check the destination that necessary file were present or not 
    if file_check(dest_file_path, ack_filename) <= 0:
        sys.exit(1)    # Exit with failure status code

    try:
        # If the file not empty, convert the ack file to Data Frame
        df = BigDataProcessing.get_data_frame(dest_file_path+'/'+ack_filename)
        df = df.withColumn("filesize", regexp_replace(col("filesize"), r"\D+", ""))
    
        # Collect the data
        rows = df.collect()
        list_of_files = [list(row.asDict().values()) for row in rows]

        # Counter for skipping new table creation
        count: int = 0
        for file_props in list_of_files:
            
            # Check the sanity of extracted files
            if ( file_check(dest_file_path, file_props[0]) == int(file_props[1])):
                
                # Check whether rows were matching or not
                df = BigDataProcessing.get_data_frame(dest_file_path+'/'+file_props[0])
                if (df.count() == file_props[2]):

                    if count == 0:
                        # Create new databases and then store the data into file
                        BigDataProcessing.create_hive_tables()
                        df.write.mode("overwrite").saveAsTable(hive_table_name)
                        count += 1
                    else:
                        df.write.mode("append").saveAsTable(hive_table_name)

        # Show hive table details
        BigDataProcessing.show_hive_tables(hive_table_name)

        # Retrive tables from hive and validate it
        BigDataProcessing.retrive_hive_tables(hive_table_name)
        
    except Exception as e:
        print("Some exception occurred in main!", e)
                   
    # Stopping the spark session which is initialized while getting data frame
    BigDataProcessing.stop_spark_session()
    

### Others

In [None]:
# df.write.mode("append").csv(dest_file_path+"/Output") if count != 0 else df.write.mode("overwrite").csv(dest_file_path+"/Output")
# count += 1

In [None]:
        # # Create table
        # cls.spark.sql("""
        #     CREATE TABLE IF NOT EXISTS sparks.transaction(cust_id INT,
        #                                     cust_name STRING,
        #                                     cust_address STRING,
        #                                     Trans_amt STRING,
        #                                     Trans_id INT,
        #                                     Trans_date STRING)""")
        
        # Describe the table to check its schema
        # cls.spark.sql("DESCRIBE transaction_master").show()