### Credit Card Skippers/Defaulters

1. **Assigning Points for Short Payments**:
    - Assign 1 point to a customer for a short payment if the customer fails to clear at least 70% of their monthly spend.

2. **Assigning Points for Max Limit Utilization**:
    - Assign 1 point to a customer if they have spent 100% of their max limit but did not clear the full amount.

3. **Additional Points for Both Conditions**:
    - If a customer meets both of the above conditions in any given month, assign 1 additional point.

4. **Summarizing Points**:
    - Sum up all the points for each customer and output the results in a file.

### Loan File Key Points

1. **Personal Loan**:
    - The bank does not accept short or late payments.
    - If a person has not paid the monthly installment, that month's entry will not be present in the file.

2. **Medical Loan**:
    - The bank accepts late payments but the full amount must be paid.
    - It is assumed that there is data for every month’s record for Medical Loans.

### Loan Defaulters

1. **Medical Loan Defaulters**:
    - If a customer has made a total of 3 or more late payments, they are classified as a defaulter.

2. **Personal Loan Defaulters**:
    - If a customer has missed a total of 4 or more installments OR missed 2 consecutive installments, they are classified as a defaulter.


In [1]:
from datetime import datetime
import apache_beam as beam

pipeline = beam.Pipeline()


In [2]:
def adding_defualter_point_to_customers(element):
    customer_id, first_name, last_name, relationship_no, card_type, max_credit_limit, total_spent, cash_withdrawn, cleard_amount, last_date = element.split(',')

    spent = int(total_spent)
    cleard_amount = int(cleard_amount)
    max_limit = int(max_credit_limit)

    key_name = customer_id + ", " + first_name + ", " + last_name

    default_points = 0

    if cleard_amount < (spent * 0.7):
        default_points += 1

    if (spent == max_limit) and (cleard_amount < spent):
        default_points += 1

    if (spent == max_limit) and (cleard_amount < (spent * 0.7)):
        default_points += 1

    return key_name, default_points

def format_result(sum_pair):
  key_name, points = sum_pair
  return str(key_name) + ', ' + str(points) + ' fraud_points' 

def defaulters_filtering(element):
    if element[1] > 0:
        return element
    
def calculate_late_payment(elements):               
  
  due_date = datetime.strptime(elements[6].rstrip().lstrip(), '%d-%m-%Y')            
  payment_date = datetime.strptime(elements[8].rstrip().lstrip(), '%d-%m-%Y')        
  
  if payment_date <= due_date:
    elements.append('0') 
  else:
    elements.append('1')                            
    
  return elements


def format_output(sum_pair):
  key_name, miss_months = sum_pair
  return str(key_name) + ', ' + str(miss_months) + ' missed'

def calculate_month(input_list):         
                                       
   
  payment_date = datetime.strptime(input_list[8].rstrip().lstrip(), '%d-%m-%Y')  
  input_list.append(str(payment_date.month))                                    
  
  return input_list 

def calculate_personal_loan_defaulter(input):       
    max_allowed_missed_months = 4
    max_allowed_consecutive_missing = 2
    
    name, months_list = input                                   
      
    months_list.sort()
    sorted_months = months_list                                 
    total_payments = len(sorted_months)                         
    
    missed_payments = 12 - total_payments                      

    if missed_payments > max_allowed_missed_months:             
       return name, missed_payments                             
    
    consecutive_missed_months = 0

    temp = sorted_months[0] - 1                                 
    if temp > consecutive_missed_months:                       
        consecutive_missed_months = temp                        

    temp = 12 - sorted_months[total_payments-1]                  
    if temp > consecutive_missed_months:
        consecutive_missed_months = temp                        

    for i in range(1, len(sorted_months)):                      
        temp = sorted_months[i] - sorted_months[i-1] -1         
        if temp > consecutive_missed_months:
            consecutive_missed_months = temp                    
    
    if consecutive_missed_months > max_allowed_consecutive_missing:
       return name, consecutive_missed_months                   
    
    return name, 0 

def return_tuple(element):
  thisTuple=element.split(',')
  return (thisTuple[0],thisTuple[1:]) 

In [3]:
card_defaulter = (
                  pipeline
                  | 'Read credit card data' >> beam.io.ReadFromText('L:\Learning\Apache Beam\Data\cards.txt', skip_header_lines=1)
                  | 'Calculate defaulter points' >> beam.Map(adding_defualter_point_to_customers)                            
                  | 'Combine points for defaulters' >> beam.CombinePerKey(sum)                           
                  | 'Filter card defaulters' >> beam.Filter(defaulters_filtering)
                  | 'Format output' >> beam.Map(format_result)     
                  | 'tuple ' >> beam.Map(return_tuple)
                  )

medical_loan_defaulter = (
                            pipeline
                            |  beam.io.ReadFromText('L:\Learning\Apache Beam\Data\loan.txt',skip_header_lines=1)   
                            | 'Split Row' >> beam.Map(lambda row : row.split(','))
                            | 'Filter medical loan' >> beam.Filter(lambda element : (element[5]).rstrip().lstrip() == 'Medical Loan')
                            | 'Calculate late payment' >> beam.Map(calculate_late_payment)
                            | 'Make key value pairs' >> beam.Map(lambda elements: (elements[0] + ', ' + elements[1]+' '+elements[2], int(elements[9])) ) 
                            | 'Group medical loan based on month' >> beam.CombinePerKey(sum)                        
                            | 'Check for medical loan defaulter' >> beam.Filter(lambda element: element[1] >= 3)
                            | 'Format medical loan output' >> beam.Map(format_output)       
                         )     

personal_loan_defaulter = (
                            pipeline
                            | 'Read' >> beam.io.ReadFromText('L:\Learning\Apache Beam\Data\loan.txt',skip_header_lines=1)   
                            | 'Split' >> beam.Map(lambda row : row.split(','))
                            | 'Filter personal loan' >> beam.Filter(lambda element : (element[5]).rstrip().lstrip() == 'Personal Loan')
                            | 'Split and Append New Month Column' >> beam.Map(calculate_month)   
                            | 'Make key value pairs loan' >> beam.Map(lambda elements: (elements[0] + ', ' + elements[1]+' '+elements[2], int(elements[9])) ) 
                            | 'Group personal loan based on month' >> beam.GroupByKey()                                  
                            | 'Check for personal loan defaulter' >> beam.Map(calculate_personal_loan_defaulter)           
                            | 'Filter only personal loan defaulters' >> beam.Filter(lambda element: element[1] > 0)
                            | 'Format personal loan output' >> beam.Map(format_output)     
                          )   
                          
final_loan_defaulters = (
                          ( personal_loan_defaulter, medical_loan_defaulter )
                          | 'Combine all defaulters' >> beam.Flatten()
                          | 'tuple for loan' >> beam.Map(return_tuple)
                        )  
                        
both_defaulters =  (
                    {'card_defaulter': card_defaulter, 'loan_defaulter': final_loan_defaulters}
                    | beam.CoGroupByKey()
                    |'Write p3 results' >> beam.io.WriteToText('Result/defaulter_card_and_loan')
                   )  

pipeline.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x11f93181c40>