In [3]:
from collections import defaultdict
from pprint import pformat

class FriendshipAnalyzer:
    def __init__(self, debug=True):
        self.debug = debug
        self.iteration = 0

    def log_step(self, step_name, data):
        """Helper to format and print debug information"""
        if self.debug:
            print(f"\n{'='*20} {step_name} {'='*20}")
            print(pformat(data, width=100, indent=2))

    def map_index(self, person, friendships, current_recommendations):
        """Mapper function that emits friendship data for analysis"""
        emitted_data = []

        # Emit current direct friendships
        direct_friends_data = {
            'type': 'direct_friends',
            'friends': friendships
        }
        emitted_data.append((person, direct_friends_data))

        if self.debug:
            print(f"\nMapper processing {person}:")
            print(f"  Emitting direct friends data: {direct_friends_data}")

        # Emit friendship information to friends-of-friends
        for friend, strength in friendships.items():
            indirect_data = {
                'type': 'indirect_connection',
                'through': person,
                'strength': strength,
                'original_person': person
            }
            emitted_data.append((friend, indirect_data))

            if self.debug:
                print(f"  Emitting indirect connection to {friend}: {indirect_data}")

        return emitted_data

    def reduce_index(self, person, values):
        """Reducer function that processes friendship data and makes recommendations"""
        if self.debug:
            print(f"\nReducer processing {person}:")
            print("  Input values:")
            print(pformat(values, indent=4))

        direct_friends = {}
        indirect_connections = []

        # First pass: separate direct friends and indirect connections
        for value in values:
            if value['type'] == 'direct_friends':
                direct_friends = value['friends']
                if self.debug:
                    print(f"  Found direct friends: {direct_friends}")
            elif value['type'] == 'indirect_connection':
                indirect_connections.append(value)
                if self.debug:
                    print(f"  Found indirect connection: {value}")

        # Process recommendations
        recommendations = defaultdict(lambda: {'strength': 0, 'common_friends': []})

        for connection in indirect_connections:
            if connection['original_person'] not in direct_friends and connection['original_person'] != person:
                rec = recommendations[connection['original_person']]
                rec['strength'] += connection['strength']
                rec['common_friends'].append(connection['through'])

                if self.debug:
                    print(f"  Adding recommendation for {connection['original_person']}:")
                    print(f"    Through: {connection['through']}")
                    print(f"    Added strength: {connection['strength']}")

        # Find best friend
        best_friend = max(direct_friends.items(), key=lambda x: x[1], default=(None, 0))

        result = {
            'direct_friends': direct_friends,
            'recommendations': dict(recommendations),
            'best_friend': best_friend[0]
        }

        if self.debug:
            print("\n  Reducer output:")
            print(pformat(result, indent=4))

        return person, result

    def single_mapreduce_pass(self, social_graph, current_recommendations=None):
        """Performs a single MapReduce pass over the social graph"""
        self.iteration += 1

        if current_recommendations is None:
            current_recommendations = {}

        self.log_step(f"Starting MapReduce Iteration {self.iteration}", {
            "Input Graph": social_graph,
            "Current Recommendations": current_recommendations
        })

        # MAP phase
        mapped_data = []
        for person, friendships in social_graph.items():
            self.log_step(f"Mapping {person}", friendships)
            person_mapped_data = self.map_index(person, friendships, current_recommendations)
            mapped_data.extend(person_mapped_data)

        self.log_step("All Mapped Data", mapped_data)

        # Group data by key for reduce phase
        grouped_data = defaultdict(list)
        for k, v in mapped_data:
            grouped_data[k].append(v)

        self.log_step("Grouped Data", dict(grouped_data))

        # REDUCE phase
        new_recommendations = {}
        for person, values in grouped_data.items():
            self.log_step(f"Reducing {person}", values)
            person, results = self.reduce_index(person, values)
            new_recommendations[person] = results

        self.log_step("Final Recommendations", new_recommendations)

        return new_recommendations

    def analyze_friendships(self, social_graph):
        """Main function to analyze friendships and generate recommendations"""

        # You might want to add further processing or analysis here
        # based on the recommendations from the MapReduce pass.
        # For now, we'll just return the raw recommendations.

        return self.single_mapreduce_pass(social_graph)

In [4]:
def main():
    # Create a smaller example social graph for clearer visualization
    social_graph = {
        'Alice': {'Bob': 90, 'Charlie': 70},
        'Bob': {'Alice': 90, 'David': 80},
        'Charlie': {'Alice': 70, 'David': 60},
        'David': {'Bob': 80, 'Charlie': 60}
    }

    print("\nAnalyzing social graph:")
    print(pformat(social_graph, indent=2))

    analyzer = FriendshipAnalyzer(debug=True)
    results = analyzer.analyze_friendships(social_graph)

    print("\nFinal Results Summary:")
    for person, data in sorted(results.items()):
        print(f"\n{person}:")
        print(f"  Best friend: {data['best_friend']}")
        print(f"  Best friend's best friend: {data.get('best_friend_of_best_friend')}")
        print("  Recommendations:")
        for recommended, rec_data in sorted(data['recommendations'].items()):
            print(f"    - {recommended} (strength: {rec_data['strength']}, common: {rec_data['common_friends']})")

if __name__ == "__main__":
    main()


Analyzing social graph:
{ 'Alice': {'Bob': 90, 'Charlie': 70},
  'Bob': {'Alice': 90, 'David': 80},
  'Charlie': {'Alice': 70, 'David': 60},
  'David': {'Bob': 80, 'Charlie': 60}}

{ 'Current Recommendations': {},
  'Input Graph': { 'Alice': {'Bob': 90, 'Charlie': 70},
                   'Bob': {'Alice': 90, 'David': 80},
                   'Charlie': {'Alice': 70, 'David': 60},
                   'David': {'Bob': 80, 'Charlie': 60}}}

{'Bob': 90, 'Charlie': 70}

Mapper processing Alice:
  Emitting direct friends data: {'type': 'direct_friends', 'friends': {'Bob': 90, 'Charlie': 70}}
  Emitting indirect connection to Bob: {'type': 'indirect_connection', 'through': 'Alice', 'strength': 90, 'original_person': 'Alice'}
  Emitting indirect connection to Charlie: {'type': 'indirect_connection', 'through': 'Alice', 'strength': 70, 'original_person': 'Alice'}

{'Alice': 90, 'David': 80}

Mapper processing Bob:
  Emitting direct friends data: {'type': 'direct_friends', 'friends': {'Alice': 90