In [30]:
using Pkg

# Pkg.add("CSV")
# Pkg.add("Tables")
# Pkg.add("DataFrames")
# Pkg.add("JSON")
# Pkg.add("Statistics")

using CSV
using Tables
using DataFrames
using JSON
using Statistics

# Large DataSet Staggered Load

In [18]:
function Master(file_path::String, chunk_size::Int)
    row_start = 2

    threads = []
    println("starting")

    while true
        # Load the current chunk of data
        data = CSV.File(file_path; header=true, limit=chunk_size, skipto=row_start)
        if Tables.isempty(data)
            break
        end
        t = (row_start + chunk_size-2)/chunk_size
        println("Launching worker $t")
        # Launch a worker for the current chunk
        push!(threads,Threads.@spawn Worker(DataFrame(data),movies))

        # Move to the next chunk
        row_start += chunk_size
    end

    # Collect the results from each worker
    local_metrics_with_time = [fetch(thread) for thread in threads]

    # Aggregate the local metrics from each worker
    
    # Initialize global metrics dictionary
    global_metrics = Dict{String, Dict{String, Float64}}()
    worker_times = []

    # Aggregate metrics across all chunks
    for (local_metric, elapsed_time) in local_metrics_with_time
        local_data = JSON.parse(local_metric)  # Parse JSON string to dictionary
        push!(worker_times, elapsed_time)

        for (genre, metrics) in local_data
            if !haskey(global_metrics, genre)
                # Initialize metrics if genre not in global metrics
                global_metrics[genre] = Dict("count" => 0.0, "total_rating" => 0.0, "avg_rating" => 0.0)
            end

            # Update global metrics for the genre
            global_metrics[genre]["count"] += metrics["count"]
            global_metrics[genre]["total_rating"] += metrics["total_rating"]
            global_metrics[genre]["avg_rating"] = global_metrics[genre]["total_rating"] / global_metrics[genre]["count"]
        end
    end

    return global_metrics, worker_times
end

Master (generic function with 1 method)

In [19]:
function Worker(ratings_chunk::DataFrame, movies::DataFrame)
    start_time = time_ns()  # Start time in nanoseconds

    # Join ratings and movies on `movieId`
    merged_data = innerjoin(ratings_chunk, movies, on = :movieId)

    # Initialize an empty dictionary to store metrics per genre
    genre_metrics = Dict{String, Dict{String, Float64}}()

    # Process each row in the merged data
    for row in eachrow(merged_data)
        rating = row[:rating]
        genres = lowercase.(strip.(split(row[:genres], '|'))) # Split genres and normalize

        for genre in genres
            # Initialize genre metrics if not already present
            if !haskey(genre_metrics, genre)
                genre_metrics[genre] = Dict("count" => 0.0, "total_rating" => 0.0)
            end
            
            # Update metrics for this genre
            genre_metrics[genre]["count"] += 1
            genre_metrics[genre]["total_rating"] += rating
        end
    end

    elapsed_time = (time_ns() - start_time) / 1e9  # Time in seconds
    return (JSON.json(genre_metrics), elapsed_time) 
end

Worker (generic function with 1 method)

In [24]:
function save_metrics_to_csv(global_metrics::Dict{String, Dict{String, Float64}}, file_path::String)
    # Convert global_metrics to a DataFrame
    metrics_data = DataFrame(
        genre = String[], 
        count = Float64[], 
        ratings_sum = Float64[], 
        ratings_avg = Float64[]
    )
    
    for (genre, metrics) in global_metrics
        # Append each genre's metrics as a new row
        push!(metrics_data, (
            genre, 
            metrics["count"], 
            metrics["total_rating"], 
            metrics["avg_rating"]
        ))
    end

    # Write the DataFrame to CSV
    CSV.write(file_path, metrics_data)
end

# save_metrics_to_csv(global_metrics, "global_metrics.csv")

save_metrics_to_csv (generic function with 1 method)

In [20]:
dyn_file_path = "ratings.csv"
stat_file_path = "movies.csv"

movies=CSV.read(stat_file_path,DataFrame)

# Total rows: 25_000_095
chunk_size=2500010
map_reduce_time = @elapsed global_metrics, worker_times = Master(file_path=dyn_file_path, chunk_size=chunk_size)

starting
Launching worker 1.0
Launching worker 2.0
Launching worker 3.0
Launching worker 4.0
Launching worker 5.0
Launching worker 6.0
Launching worker 7.0
Launching worker 8.0
Launching worker 9.0
Launching worker 10.0


420.480890181

### Map Reduce Time: 420.48 sec

In [25]:
save_metrics_to_csv(global_metrics, "global_metrics_staggered.csv")

"global_metrics_staggered.csv"

In [32]:
println("Avg worker time: $(mean(worker_times))")
worker_times

Avg worker time: 9.8461586344


10-element Vector{Any}:
  9.966648177
  9.857586486
  9.836530276
  9.832534818
  9.860832938
  9.582332391
  9.81158842
  9.718204708
 10.113913467
  9.881414663

# Large DataSet Full Load

In [42]:
function Master(; chunks::Vector{DataFrame})
    
    # Launch a worker for the current chunk
    threads = [Threads.@spawn Worker(DataFrame(chunk),movies) for chunk in chunks]

    # Collect the results from each worker
    local_metrics_with_time = [fetch(thread) for thread in threads]

    # Aggregate the local metrics from each worker
    
    # Initialize global metrics dictionary
    global_metrics = Dict{String, Dict{String, Float64}}()
    worker_times = []

    # Aggregate metrics across all chunks
    for (local_metric, elapsed_time) in local_metrics_with_time
        local_data = JSON.parse(local_metric)  # Parse JSON string to dictionary
        push!(worker_times, elapsed_time)

        for (genre, metrics) in local_data
            if !haskey(global_metrics, genre)
                # Initialize metrics if genre not in global metrics
                global_metrics[genre] = Dict("count" => 0.0, "total_rating" => 0.0, "avg_rating" => 0.0)
            end

            # Update global metrics for the genre
            global_metrics[genre]["count"] += metrics["count"]
            global_metrics[genre]["total_rating"] += metrics["total_rating"]
            global_metrics[genre]["avg_rating"] = global_metrics[genre]["total_rating"] / global_metrics[genre]["count"]
        end
    end

    return global_metrics, worker_times
end

Master (generic function with 3 methods)

In [35]:
function split_dataframe(df::DataFrame, n::Int)
    chunk_size = ceil(Int, nrow(df) / n)
    return [df[(i-1)*chunk_size+1:min(i*chunk_size, nrow(df)), :] for i in 1:n]
end

split_dataframe (generic function with 1 method)

In [46]:
movies=CSV.read(stat_file_path,DataFrame)
@elapsed ratings=CSV.read(dyn_file_path,DataFrame)

4.986152028

### Full DataSet Load Time: 4.98 sec

In [43]:
split_time = @elapsed rating_chunks = split_dataframe(ratings, 10)
map_reduce_time = @elapsed global_metrics, worker_times = Master(chunks=rating_chunks)

103.349820718

### Map Reduce Time: 103.35 sec

In [44]:
save_metrics_to_csv(global_metrics, "global_metrics_full_load.csv")

"global_metrics_full_load.csv"

In [45]:
println("Avg worker time: $(mean(worker_times))")
worker_times

Avg worker time: 10.307402001099998


10-element Vector{Any}:
 10.482768456
 10.585821978
 10.279204803
 10.372483014
 10.030660162
 10.124822824
 10.555043213
 10.429826119
 10.075788191
 10.137601251