### Basic import & Helper function

In [1]:
import pickle
import re
import pandas as pd
import numpy as np
import random
from utils import * 

In [2]:
class RatingProcessor:
    def __init__(self, file_path):
        self.file_path = file_path
        self.df = None
        self.df_num = None
        self.df_diff = None
        self.final_ratings = {}

    def load_data(self):
        with open(self.file_path, 'rb') as f:
            data = pickle.load(f)
        self.df = pd.DataFrame(data).T

    def rating_to_numeric(self, rating):
        rating_order = [
            'F3', 'D', 'RD', 'C', 'CC', 'CCC-', 'CCC', 'CCC+',  
            'B-', 'B', 'B+', 'BB-', 'BB', 'BB+', 
            'BBB-', 'BBB', 'BBB+', 'A-', 'A', 'A+', 
            'AA-', 'AA', 'AA+', 'AAA'
        ]
        rating_to_num = {rating: i for i, rating in enumerate(rating_order)}
        if pd.isna(rating):
            return None
        return rating_to_num.get(rating, None)

    def convert_ratings(self):
        self.df_num = self.df.applymap(self.rating_to_numeric)

    def calculate_diff(self):
        self.df_diff = self.df_num.diff(axis=1)
        first_quarter = self.df.iloc[:, 0]
        self.df_diff.iloc[:, 0] = first_quarter.apply(lambda x: 0 if pd.notna(x) else None)

    def count_changes(self):
        no_change = (self.df_diff == 0).sum().sum()
        increase = (self.df_diff > 0).sum().sum()
        decrease = (self.df_diff < 0).sum().sum()
        return no_change, increase, decrease

    def extract_ratings_by_change_type(self):
        increase = []
        decrease = []
        no_change = []
        for company in self.df.index:
            for quarter in self.df.columns:
                if self.df_diff.loc[company, quarter] > 0:
                    increase.append((company, quarter))
                elif self.df_diff.loc[company, quarter] < 0:
                    decrease.append((company, quarter))
                elif self.df_diff.loc[company, quarter] == 0:
                    no_change.append((company, quarter))
        
        return increase, decrease, no_change

    def select_samples(self, increase, decrease, no_change):
        num_samples = min(len(increase), len(decrease), len(no_change))
        selected_increase = random.sample(increase, num_samples)
        selected_decrease = random.sample(decrease, num_samples)
        selected_no_change = random.sample(no_change, num_samples)
        return selected_increase + selected_decrease + selected_no_change

    def create_final_ratings(self, selected_data):
        for company, quarter in selected_data:
            if company not in self.final_ratings:
                self.final_ratings[company] = {}
            self.final_ratings[company][quarter] = self.df.loc[company, quarter]

    def save_final_ratings(self, output_path):
        pd.to_pickle(self.final_ratings, output_path)

    def process_ratings(self, output_path):
        self.load_data()
        self.convert_ratings()
        self.calculate_diff()
        no_change, increase, decrease = self.count_changes()
        print(f"No Change: {no_change}")
        print(f"Increase: {increase}")
        print(f"Decrease: {decrease}")

        increase, decrease, no_change = self.extract_ratings_by_change_type()
        selected_data = self.select_samples(increase, decrease, no_change)
        self.create_final_ratings(selected_data)
        self.save_final_ratings(output_path)
        return self.final_ratings


In [3]:
#Example
processor = RatingProcessor('data/ratings_US.pkl')
final_ratings = processor.process_ratings('output/ratings_US_Processed.pkl')
print("Final Ratings:", final_ratings)

feature = pd.read_pickle('data/features_US.pkl')
all_rating = pd.read_pickle('data/ratings_US.pkl')

merged_data = merge_input_output_dicts_k(feature, final_ratings, all_rating, 4, verbose=True)
save_pickle(merged_data, 'output/data_US.pkl')

No Change: 25586
Increase: 851
Decrease: 619
Final Ratings: {'WBS': {'2016Q3': 'BBB', '2012Q1': 'BBB-'}, 'DAL': {'2015Q4': 'BB+', '2015Q1': 'BB', '2013Q3': 'B+', '2014Q3': 'BB-'}, 'DG': {'2011Q4': 'BB+', '2010Q3': 'BB', '2016Q1': 'BBB', '2016Q3': 'BBB', '2015Q4': 'BBB-'}, 'TILE': {'2011Q4': 'BB', '2015Q1': 'BB+', '2015Q3': 'BB+'}, 'LYG': {'2015Q3': 'BBB+', '2015Q2': 'BBB', '2012Q1': 'A-', '2016Q4': 'BBB+', '2010Q4': 'A'}, 'VHI': {'2011Q3': 'B+', '2012Q3': 'BB-', '2010Q3': 'CCC+', '2011Q1': 'B'}, 'BATL': {'2013Q1': 'B', '2016Q4': 'B-', '2015Q3': 'B-'}, 'UGP': {'2013Q1': 'BBB', '2016Q2': 'BB+', '2015Q4': 'BBB-'}, 'PPC': {'2015Q3': 'BB+', '2016Q4': 'BB', '2011Q4': 'B', '2012Q2': 'B'}, 'SNV': {'2013Q4': 'BB-', '2016Q4': 'BBB-', '2012Q1': 'B'}, 'LRCX': {'2012Q3': 'BBB-', '2014Q1': 'BBB', '2011Q3': 'BB+'}, 'WIT': {'2012Q3': 'BBB+', '2014Q1': 'A-', '2016Q4': 'A-'}, 'GLDD': {'2015Q1': 'B', '2013Q4': 'B-', '2016Q1': 'B-'}, 'HLT': {'2015Q2': 'BB', '2016Q3': 'BB+'}, 'HST': {'2014Q2': 'BB+'}, 'MPW

KeyError: 'SD'

In [6]:
def get_unique_ratings(all_rating):
    unique_ratings = set()
    for company_ratings in all_rating.values():
        for rating in company_ratings.values():
            unique_ratings.add(rating)
    return unique_ratings

In [7]:
unique_ratings = get_unique_ratings(all_rating)

In [8]:
unique_ratings

{'A',
 'A+',
 'A-',
 'AA',
 'AA+',
 'AA-',
 'AAA',
 'B',
 'B+',
 'B-',
 'BB',
 'BB+',
 'BB-',
 'BBB',
 'BBB+',
 'BBB-',
 'CC',
 'CCC',
 'CCC+',
 'CCC-',
 'D',
 'SD'}