In [0]:
class POCOne:
    
    def readFile(self, file_location, file_type):
        try:
            return spark.read.format(file_type).option("inferSchema", "true").option("header", "true").load(file_location)
        except:
            print('Invalid file type or file location')
            
    
    def writeCsv(self, dataframe, file_location):
        try:
            dataframe.write.mode("overwrite").option("header", "true").csv(file_location)
        except:
            print('Invalid dataframe or file location')
            
        
    def writeParquet(self, dataframe, file_location):
        try:
            dataframe.write.mode("overwrite").option("header", "true").parquet(file_location)
        except:
            print('Invalid dataframe or file location')
            
            
    # Fixing Column names, removing whitespaces
    def transformColumnNames(self):
        try:
            df_bank = pocOne.readFile("/FileStore/tables/bankcsv.csv", "csv")
            new_column_name_list = list(map(lambda x: x.replace(" ", "_"), df_bank.columns))
            df_bank = df_bank.toDF(*new_column_name_list)
            df_bank = df_bank.withColumnRenamed("CHQ.NO.", "CHQ_NO")
            return df_bank
        except:
            print('Error transforming column names')
    
    
    # Drop the last column
    def dropDotColumn(self):
        try:
            
            return spark.sql("""
            SELECT Account_No, DATE, TRANSACTION_DETAILS, CHQ_NO, VALUE_DATE, WITHDRAWAL_AMT, DEPOSIT_AMT, BALANCE_AMT FROM bank
            """)
        except:
            print('Error dropping the dot column')
        
        
    # TransactionAmount column
    def addTransactionAmountColumn(self):
        try:
            
            return spark.sql("""
                SELECT Account_No, DATE, TRANSACTION_DETAILS, CHQ_NO, VALUE_DATE, WITHDRAWAL_AMT, DEPOSIT_AMT, BALANCE_AMT,
                CASE WHEN WITHDRAWAL_AMT IS NULL THEN DEPOSIT_AMT 
                WHEN DEPOSIT_AMT Is NULL THEN WITHDRAWAL_AMT
                ELSE NULL
                END AS TRANSACTION_AMOUNT
                FROM bank
            """)
        except:
            print('Error adding the TRANSACTION_AMOUNT column')
        
        
    # TransactionType column
    def addTransactionTypeColumn(self):
        try:
            
            return spark.sql("""
                SELECT Account_No, DATE, TRANSACTION_DETAILS, CHQ_NO, VALUE_DATE, WITHDRAWAL_AMT, DEPOSIT_AMT, BALANCE_AMT, TRANSACTION_AMOUNT,
                CASE WHEN WITHDRAWAL_AMT IS NULL THEN "CR" 
                WHEN DEPOSIT_AMT IS NULL THEN "DR"
                ELSE NULL
                END AS TRANSACTION_TYPE
                FROM bank
                """)
        except:
            print('Error adding the TRANSACTION_TYPE column')
        
        
    def getChequeTransactions(self):
        try:
            df_bank = pocOne.readFile("/bank_csv_after_step1", "csv")
            df_bank.createOrReplaceTempView("bank")
            return spark.sql("""
            SELECT * FROM bank
            WHERE CHQ_NO IS NOT NULL
            """)
        except:
            print('Error filtering out the cheque transactions')
        
        
    def getCRTransactions(self):
        try:
            return spark.sql("""
            SELECT * FROM bank
            WHERE TRANSACTION_TYPE='CR'
            """)
        except:
            print('Error filtering out the cr transactions')
        
        
    def getDRTransactions(self):
        try:
            return spark.sql("""
            SELECT * FROM bank
            WHERE TRANSACTION_TYPE='DR'
            """)
        except:
            print('Error filtering out the dr transactions')
        
        
    def getDuplicateTransactions(self):
        try:
            return spark.sql("""
            SELECT Account_No, DATE, TRANSACTION_AMOUNT, TRANSACTION_TYPE, COUNT(*)
            FROM bank
            GROUP BY Account_No, DATE, TRANSACTION_TYPE, TRANSACTION_AMOUNT
            HAVING COUNT(*)>1
            """)
        except:
            print('Error getting the duplicate transactions')
        
        
    def getTransactionsInRange(self, fromdate, todate):
        try:
            return spark.sql("""
            SELECT * FROM bank
            WHERE DATE BETWEEN '{0}' AND '{1}'
            """.format(fromdate, todate))
        except:
            print('Error getting the transactions in a date range')
        
        
    def getUniqueTransactions(self):
        try:
            return spark.sql("""
            WITH BankCTE AS
            (
            SELECT *, ROW_NUMBER() OVER (PARTITION BY Account_No, DATE, TRANSACTION_TYPE, TRANSACTION_AMOUNT ORDER BY Account_No, DATE, TRANSACTION_TYPE, TRANSACTION_AMOUNT) AS RN
            FROM bank_range
            )

            SELECT * FROM BankCTE
            WHERE RN = 1
            """)
        except:
            print('Error getting the unique transactions')
        
        
    def getTotalDepositWithdrawal(self):
        try:
            return spark.sql("""
            SELECT Account_No, SUM(DEPOSIT_AMT) AS TOTAL_DEPOSIT, SUM(WITHDRAWAL_AMT) AS TOTAL_WITHDRAW
            FROM bank_final
            GROUP BY Account_No
            """)
        except:
            print('Error calculating the total deposit and total withdrawal')
        
        
    def doStepFive(self):
        try:
            df_bank_daterange = pocOne.getTransactionsInRange('2017-06-29', '2017-08-16')
            df_bank_daterange.createOrReplaceTempView("bank_range")
            df_bank_daterange_wo_dup = pocOne.getUniqueTransactions()
            df_bank_daterange_wo_dup.createOrReplaceTempView("bank_range_wo_dup")

            df_bank_step5 = spark.sql("""
            SELECT Account_No, WITHDRAWAL_AMT,
            CASE WHEN DEPOSIT_AMT IS NULL THEN 0
            ELSE DEPOSIT_AMT
            END AS DEPOSIT_AMT
            FROM bank_range_wo_dup
            """)

            df_bank_step5.createOrReplaceTempView("bank_step5")

            df_bank_step5_f = spark.sql("""
            SELECT Account_No, DEPOSIT_AMT,
            CASE WHEN WITHDRAWAL_AMT IS NULL THEN 0
            ELSE WITHDRAWAL_AMT
            END AS WITHDRAWAL_AMT
            FROM bank_step5
            """)

            df_bank_step5_f.createOrReplaceTempView("bank_final")
            df_bank_final = pocOne.getTotalDepositWithdrawal()
            return df_bank_final
        except:
            print('Error doing the final step')
            
            
    def mainFunc(self):
        try:
            df_bank = self.transformColumnNames()
            df_bank.show(5)
            df_bank.createOrReplaceTempView("bank")
            df_bank = self.dropDotColumn()
            df_bank.show(5)
            df_bank.createOrReplaceTempView("bank")
            df_bank = self.addTransactionAmountColumn()
            df_bank.show(5)
            df_bank.createOrReplaceTempView("bank")
            df_bank = self.addTransactionTypeColumn()
            df_bank.show(5)
            pocOne.writeCsv(df_bank, "bank_csv_after_step1")
            df_bank_chq = self.getChequeTransactions()
            df_bank_chq.show(5)
            pocOne.writeParquet(df_bank_chq, "bank_csv_chq")
            df_bank_cr = self.getCRTransactions()
            df_bank_cr.show(5)
            pocOne.writeParquet(df_bank_cr, "bank_csv_cr")
            df_bank_dr = self.getDRTransactions()
            df_bank_dr.show(5)
            pocOne.writeParquet(df_bank_dr, "bank_csv_dr")
            df_bank_daterange = self.getTransactionsInRange('2017-06-29', '2017-08-16')
            df_bank_daterange.show(5)
            df_bank_dup = self.getDuplicateTransactions()
            df_bank_dup.show(5)
            pocOne.writeCsv(df_bank_dup, "bank_csv_dup")
            df_bank_final = self.doStepFive()
            df_bank_final.show()
        except:
            print('Something wrong with the main function')
        
    
    
pocOne = POCOne()

In [0]:
pocOne.mainFunc()

+-------------+----------+--------------------+------+----------+--------------+-----------+-----------+---+
|   Account_No|      DATE| TRANSACTION_DETAILS|CHQ_NO|VALUE_DATE|WITHDRAWAL_AMT|DEPOSIT_AMT|BALANCE_AMT|  .|
+-------------+----------+--------------------+------+----------+--------------+-----------+-----------+---+
|409000611074'|2017-06-29|TRF FROM  Indiafo...|  null|2017-06-29|          null|  1000000.0|  1000000.0|  .|
|409000611074'|2017-07-05|TRF FROM  Indiafo...|  null|2017-07-05|          null|  1000000.0|  2000000.0|  .|
|409000611074'|2017-07-18|FDRL/INTERNAL FUN...|  null|2017-07-18|          null|   500000.0|  2500000.0|  .|
|409000611074'|2017-08-01|TRF FRM  Indiafor...|  null|2017-08-01|          null|  3000000.0|  5500000.0|  .|
|409000611074'|2017-08-16|FDRL/INTERNAL FUN...|  null|2017-08-16|          null|   500000.0|  6000000.0|  .|
+-------------+----------+--------------------+------+----------+--------------+-----------+-----------+---+
only showing top 5 