# Task #1: use implemented map-reduce framework for aggregation

Having in mind data/clicks dataset with "date" column, count how many clicks there were for each date and write the results to data/total_clicks dataset with "date" and "count" columns.

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Importing required libraries

In [13]:
import csv
import glob
from collections import defaultdict
import os
import concurrent.futures


In [14]:
# Function to read multiple CSV files and store their data in a list.
def read_csv(files):
    data = []
    for file in files:
        with open(file, 'r') as csvfile:
            reader = csv.DictReader(csvfile) # Read CSV file as dictionary, so that we can get value for each row
            for row in reader:
                data.append(row)
    return data

# Create a new directory for the output
os.makedirs('data/total_clicks', exist_ok=True)

# Function to map the input data to a list of dictionaries with keys and values
def mapper(clicks):
    result = []
    for click in clicks: # Loop through each click in the input list
        result.append({'key': click['date'], 'value': click}) # Append a dictionary with 'key' as the date and 'value' as the click
    return result

# Function to group the mapped data by key (date)
def group_by_key(mapped_data):
    result = defaultdict(list) # Initialize a defaultdict with lists as default values
    for item in mapped_data:
        result[item['key']].append(item['value']) # Append (add) the item's value to the list associated with its key
    return result

# Function to reduce the grouped data by counting the values associated with each key
def reducer(key, values):
    return {'date': key, 'count': len(values)} # Return a dictionary with the date and the count of values

# Function to perform the MapReduce operation on the input files
def map_reduce(files):
    data = read_csv(files) # Read the input CSV files and store their data in a list
    mapped_data = mapper(data) # Map the input data to a list of dictionaries with keys and values
    grouped_data = group_by_key(mapped_data) # Group the mapped data by key (date)
    
    result = [] #Empty list to store the reduced data
    for key, values in grouped_data.items():
        reduced_item = reducer(key, values) # Reduce the values by counting them
        result.append(reduced_item) # Append (add) the reduced item to the result list
    
    return result

def write_output(output_file, data, fieldnames=None):
    if not data:
        print("No data to write.")
        return

    if fieldnames is None:
        fieldnames = ['date', 'user_id', 'click_target']

    sorted_data = sorted(data, key=lambda x: x['date'])

    with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(sorted_data)

Finally, we output the results to a new .csv file

In [15]:
# Set the input and output file paths
input_files = glob.glob(os.path.join('data/clicks', '*.csv'))
output_file = 'data/total_clicks/clicks_per_day.csv'

# Call the "map_reduce" function on the input files to get the total clicks per day
result = map_reduce(input_files)

#Lastly, write the results to the output file.
write_output(output_file, result, fieldnames=['date', 'count'])

---------------------------------------------------------------------------------------------------------------------------------------------------------

# Data has to be processed in parallel, e.g. by using threads.

We can start by adding function map_reduce_parallel to perform MapReduce operation with parallel processing:

In [16]:
def map_reduce_parallel(files):
    data = read_csv(files)  # Read the input CSV files and store their data in a list
    mapped_data = [] #create new empty list
    
    # Helper function to divide data into chunks
    def chunks(lst, n):
        for i in range(0, len(lst), n):
            yield lst[i:i + n]

    # Divide data into smaller chunks for parallel processing
    chunk_size = 10 # Adjustable.  
    data_chunks = list(chunks(data, chunk_size))

    with concurrent.futures.ThreadPoolExecutor() as executor: # Create a ThreadPoolExecutor instance and use it as a context manager named executor
        # For each data_chunk in the data list, submit the mapper function to the executor with the data_chunk as its argument. 
        futures = [executor.submit(mapper, data_chunk) for data_chunk in data_chunks]
        for future in concurrent.futures.as_completed(futures):
            # For each completed Future, call the result() method to retrieve the result of the mapper function (a list of dictionaries)
            mapped_data.extend(future.result()) 

        grouped_data = group_by_key(mapped_data)  # Group the mapped data by key (date)
        result = []  # Empty list to store the reduced data
    # Parallel processing for the reducer function

    with concurrent.futures.ThreadPoolExecutor() as executor:
        # For each key-value pair in the grouped_data dictionary, submit the reducer function to the executor with the key and values as its arguments.
        futures = [executor.submit(reducer, key, values) for key, values in grouped_data.items()] 
        for future in concurrent.futures.as_completed(futures): # Iterate over the Future objects in the futures list as they complete.
            result.append(future.result())

    return result


Let's test it

In [17]:
input_folder = 'data/clicks'
files = [os.path.join(input_folder, file) for file in os.listdir(input_folder) if file.endswith('.csv')]

In [18]:
result_data = map_reduce_parallel(files)


In [19]:
output_file = 'data/total_clicks/output.csv'
write_output(output_file, result, fieldnames=['date', 'count'])

The file is now accessible Data/Clicks/total_clicks/output.csv

The code first reads the CSV files and divides the data into smaller chunks. It then processes the data in parallel using a ThreadPoolExecutor, applying the mapper function to each chunk. Afterward, it groups the mapped data by key (date) and processes it in parallel again, applying the reducer function to each group. Finally, the reduced data is written to an output file.