In [58]:
using Pkg
using BenchmarkTools
using DataFrames
using CSV
using Base.Threads
using Parquet

In [66]:
function readStuff(filename)
    if lowercase(reverse(reverse(filename)[1:4])) == ".csv"
        ratings = DataFrame(CSV.File(filename,header=false))
        ratings = rename(ratings, :Column1 => :userId, :Column2 => :movieId,:Column3 => :rating)
    elseif lowercase(reverse(reverse(filename)[1:8])) == ".parquet"
        ratings = DataFrame(Parquet.read_parquet(filename))
    else
        println("pasame un csv plis")
        return 
    end
    return ratings
end

readStuff (generic function with 1 method)

In [67]:
function readStuffOld(filename)
    if lowercase(reverse(reverse(filename)[1:4])) == ".csv"
        ratings = DataFrame(CSV.File(filename))
    elseif lowercase(reverse(reverse(filename)[1:8])) == ".parquet"
        ratings = DataFrame(Parquet.read_parquet(filename))
    else
        println("pasame un csv o parquet plis")
        return 
    end
    return ratings
end

readStuffOld (generic function with 1 method)

In [53]:
function countGenres(ratingsIn,moviesIn)
    ratings = select(ratingsIn,:movieId,:rating)
    movies = transform!(moviesIn, :genres => ByRow(x -> ismissing(x) ? [missing] : string.(split(x, "|"))) => :flattened_genres)
    movies = flatten(movies,:flattened_genres)
    movies = select(movies,:movieId,:flattened_genres =>:genres)
    movies = innerjoin(movies,ratings, on = :movieId)
    movies = combine(groupby(movies,:genres), nrow => :count,:rating => mean => :rating)
    movies = sort(movies,:genres)
    return movies
end

countGenres (generic function with 1 method)

In [54]:
function  chunkPostProcessing(finres)
    finres = select(finres,:genres,:count,[:count,:rating] => ((cnt,rtng) -> cnt.*rtng) => :unweighted_mean)
    finres = sort(combine(groupby(finres,[:genres]), :count => sum => :count, :unweighted_mean => sum => :unweighted_mean),:genres)
    finres = select(finres,:genres,:count,[:count,:unweighted_mean]=>((cnt,umn) -> umn./cnt)=> :rating)
    return finres
end

chunkPostProcessing (generic function with 1 method)

In [80]:
function bufferRatings(numberOfChunks,start_w_zero,format)
    res = [DataFrame() for _ in 1:10]
    movieso = DataFrame(CSV.File("movies.csv"))
    if start_w_zero == true
        eff_range = range(0, step=1, length=numberOfChunks)
        offset = 1
    else
        eff_range = range(1, step=1, length=numberOfChunks)
        offset = 0
    end
    
    @threads for i in eff_range
        if format == ".parquet"
            filenameCounter = "_"*lpad(i, 2, '0')
        else
            filenameCounter = i
        end
        chunkFilename = string("ratings",filenameCounter,format)
        res[i+offset] = countGenres(readStuff(chunkFilename),movieso)
    end
    
    finres = DataFrame()
    for i in eff_range
        finres = [finres;res[i+offset]]
    end

    return chunkPostProcessing(finres)
end

bufferRatings (generic function with 2 methods)

In [69]:
@btime a = bufferRatings(10,true,".csv")

  6.690 s (6863927 allocations: 8.64 GiB)


Row,genres,count,rating
Unnamed: 0_level_1,String,Int64,Float64
1,(no genres listed),26627,3.32638
2,Action,7446918,3.46659
3,Adventure,5832424,3.51744
4,Animation,1630987,3.61495
5,Children,2124258,3.43251
6,Comedy,8926230,3.42399
7,Crime,4190259,3.68504
8,Documentary,322449,3.70528
9,Drama,10962833,3.67718
10,Fantasy,2831585,3.51159


In [81]:
@btime a = bufferRatings(10,false,".parquet")

  4.249 s (6869312 allocations: 8.84 GiB)


Row,genres,count,rating
Unnamed: 0_level_1,String,Int64,Float64
1,(no genres listed),26627,3.32638
2,Action,7446918,3.46659
3,Adventure,5832424,3.51744
4,Animation,1630987,3.61495
5,Children,2124258,3.43251
6,Comedy,8926230,3.42399
7,Crime,4190259,3.68504
8,Documentary,322449,3.70528
9,Drama,10962833,3.67718
10,Fantasy,2831585,3.51159


In [57]:
@btime res = countGenres(readStuffOld("ratings.csv"),readStuffOld("movies.csv"))

  6.579 s (799186 allocations: 7.07 GiB)


Row,genres,count,rating
Unnamed: 0_level_1,String,Int64,Float64
1,(no genres listed),26627,3.32638
2,Action,7446918,3.46659
3,Adventure,5832424,3.51744
4,Animation,1630987,3.61495
5,Children,2124258,3.43251
6,Comedy,8926230,3.42399
7,Crime,4190259,3.68504
8,Documentary,322449,3.70528
9,Drama,10962833,3.67718
10,Fantasy,2831585,3.51159


In [None]:
# Código funcional para bufferear directamente los resultados del archivo original CSV sin necesidad de partirlo en múltiples archivos
# Es funcional pero sumamente lento a causa de la librería CSV de julia, una mejor alternativa es parsearlo directamente

#
# function bufferRatings(filename)
#     rows = 250000
#     res = DataFrame()
#     buffer = DataFrame()
#     movieso = DataFrame(CSV.File("movies.csv"))
#     for row in CSV.Rows("ratings.csv")
#         push!(buffer, row)
#         println(nrow(buffer))
#         if nrow(buffer) > rows
#             println("processing chunk")
#             res = [res;countGenres(buffer,movieso)]
#             buffer = DataFrame()
#         end
#     res = [res;countGenres(buffer,movieso)]
#     res = chunkPostProcessing(res)
#     end
# end