In [1]:
using Distributed, LibPQ, DataStreams, DataFrames, SharedArrays, Suppressor, Distributions, SparseArrays, IJulia, Plots

### Create workers

In [2]:
addprocs(4)

@everywhere using LinearAlgebra, LibPQ, DataFrames, SparseArrays, SharedArrays

### Define utility functions

In [None]:
@inline function predict(i::Int64, u::Int64, μ::Float64, P::SharedMatrix{Float64,}, Q::SharedMatrix{Float64})::Float64
    @views μ + P[i,2] + Q[u,2] + dot(P[i, 3:end],Q[u, 3:end])
end

In [None]:
function cost(items_map::SparseVector{Int64,Int64}, users_map::SparseVector{Int64,Int64},
        P::SharedMatrix{Float64,}, Q::SharedMatrix{Float64},
        μ::Float64)::Float64

    LibPQ.Connection("host=localhost dbname=postgres"; type_map=Dict(:int4=>Int64, :float4=>Float64)) do conn
        table::String = "ml_small"
        limit::Int64 = 10000
        offset::Int64 = 0
        total::Float64 = 0.0
        i::Int64 = 0
        u::Int64 = 0
        
        while true
            rt = fetch!(Data.RowTable, LibPQ.execute(conn, """
                SELECT
                    "movieId" AS "itemId",
                    "userId", "rating"
                FROM $table
                LIMIT $limit
                OFFSET $offset;
            """));

            for row in rt
                i = items_map[row.itemId]
                u = users_map[row.userId]
                total += abs2(predict(i, u, μ, P, Q) - row.rating)
            end
            
            if size(rt, 1) < limit
                break
            end
            
            offset += limit
        end
        
        total
    end
end

In [None]:
@everywhere function lstq(conn::LibPQ.Connection, table::Symbol, item::Int64,
        src_column::Symbol, dst_column::Symbol,
        src_map::SparseVector{Int64,Int64}, dst_map::SparseVector{Int64,Int64},
        S::SharedMatrix{Float64,}, D::SharedMatrix{Float64},
        μ::Float64, reg::Float64)
    
    nt = fetch!(NamedTuple, LibPQ.execute(conn, """
        SELECT
            "$src_column",
            "rating"
        FROM $table
        WHERE "$dst_column" = $item
        ORDER BY "$src_column";
    """));
    
    src_filtered_by_item = src_map[nt[src_column]]
    ratings_filtered_by_item = nt[:rating]
    A = @view S[src_filtered_by_item,[1; 3:end]]
    b = ratings_filtered_by_item .- μ .- @view S[src_filtered_by_item, 2]
    D[dst_map[item], 2:end] = cholesky(A'A + reg*I) \ (A'b)
end

### Define ALS function

In [None]:
function als(dbstr::String, table::Symbol, k::Int64=10;
        nepochs::Int64=10,
        reg::Float64=0.0,
        cb::Union{Nothing, Function}=nothing)
    
    μ::Float64, rating_users::Array, rated_items = LibPQ.Connection(dbstr; type_map=Dict(:int4=>Int64, :float4=>Float64)) do conn
        LibPQ.execute(conn, """
            SET work_mem TO '1 GB';
        """);

        μ = fetch!(NamedTuple, LibPQ.execute(conn, """
            SELECT
                AVG("rating")
            FROM $table;
        """))[:avg][1];

        rating_users = collect(skipmissing(fetch!(NamedTuple, LibPQ.execute(conn, """
            SELECT DISTINCT
                "userId"
            FROM $table
            ORDER BY "userId";
        """))[:userId]));

        rated_items = collect(skipmissing(fetch!(NamedTuple, LibPQ.execute(conn, """
            SELECT DISTINCT
                "movieId" AS "itemId"
            FROM $table
            ORDER BY "movieId";
        """))[:itemId]));

        μ, rating_users, rated_items;
    end;

    m::Int64, n::Int64 = length(rated_items), length(rating_users)
    
    items_map::SparseVector{Int64,Int64} = sparsevec(rated_items, 1:m);
    users_map::SparseVector{Int64,Int64} = sparsevec(rating_users, 1:n);
    
    @everywhere workers() begin
        conn = LibPQ.Connection("host=localhost dbname=postgres"; type_map=Dict(:int4=>Int64, :float4=>Float64))

        LibPQ.execute(conn, """
            SET work_mem TO '1GB';
        """);
    end
    
    P::SharedMatrix{Float64} = SharedMatrix{Float64}([ones(m) zeros(m) rand(Normal(0.0, 1e-4), m, k)])
    Q::SharedMatrix{Float64} = SharedMatrix{Float64}([ones(n) zeros(n) rand(Normal(0.0, 1e-4), n, k)])
    
    for epoch::Int64 in 1:nepochs
        @sync @distributed for u::Int64 in rating_users
            global conn
            lstq(conn, table, u,
                :movieId, :userId,
                items_map, users_map,
                P, Q,
                μ, reg)
        end

        @sync @distributed for i::Int64 in rated_items
            global conn
            lstq(conn, table, i,
                :userId, :movieId,
                users_map, items_map,
                Q, P,
                μ, reg)
        end

        if cb !== nothing
            cb(epoch, cost(items_map, users_map, P, Q, μ))
        end
    end
    
    @everywhere workers() close(conn)
    P, Q
end

### Compute Latent Factors

In [None]:
costs = []
    
@time P, Q = als("host=localhost dbname=postgres", :ml_small, 100;
    nepochs=10,
    reg=0.001,
    cb=(epoch, cost)->begin
        IJulia.clear_output(true)
        println("epoch: $(epoch), cost: $(cost)")
        push!(costs, cost)
        end)

plot(costs)

k = 100
nepochs = 5
reg = 0.0001

m, n = length(rated_items), length(rating_users)
P = SharedMatrix{Float64}([ones(m) zeros(m) rand(Normal(0.0, 1e-4), m, k)])
Q = SharedMatrix{Float64}([ones(n) zeros(n) rand(Normal(0.0, 1e-4), n, k)])

@time for epoch in 1:nepochs
    println("Processing users in epoch #$epoch...")
    @sync @distributed for u in rating_users
        global conn
        table = "ml_small"
        df = dropmissing(fetch!(DataFrame, LibPQ.execute(conn, """
            SELECT "movieId" AS "itemId", "rating" FROM $table WHERE "userId" = $u ORDER BY "movieId";
        """)));
        items_rated_by_user = items_map[df[:itemId]]
        ratings_given_by_user = df[:rating]
        A = @view P[items_rated_by_user,[1; 3:end]]
        b = ratings_given_by_user .- μ .- @view P[items_rated_by_user, 2]
        Q[users_map[u], 2:end] = cholesky(A'A + reg*I) \ (A'b)
    end
    
    println("Processing items in epoch #$epoch...")
    @sync @distributed for i in rated_items
        global conn
        table = "ml_small"
        df = dropmissing(fetch!(DataFrame, LibPQ.execute(conn, """
            SELECT "userId", "rating" FROM $table WHERE "movieId" = $i ORDER BY "userId";
        """)));c
        users_who_rated_item = users_map[df[:userId]]
        ratings_given_to_item = df[:rating]
        A = @view Q[users_who_rated_item,[1; 3:end]]
        b = ratings_given_to_item .- μ .- @view Q[users_who_rated_item, 2]
        P[items_map[i], 2:end] = cholesky(A'A + reg*I) \ (A'b)
    end

    println("Processing cost in epoch #$epoch...")
    c = cost(items_map, users_map, P, Q, μ)
    IJulia.clear_output(true)
    println("epoch #$(epoch): $(c)")
end

### Destroy workers

In [None]:
@suppress_err rmprocs(workers())

### Tests