In [4]:
using JSON
import GZip
#Spark-Like Functions
function mapByKey(f,d)
    nd = Dict{keytype(d),Any}()
    for k in keys(d)
        nd[k] = f(d[k])
    end
    return nd
end


function reduceByKeyAndWindow(f,win,slid,dstr)
    #Reduce by window and Key
    #Takes in a collection of values of a key and a certain window size (win:Size of window in array slots)
    #with slide factor (slid), and reduce by a function, (f:binary reduce function).
    #takes in key-value dstream (dstr: Array{Dict,1})
    
    #initialize new dstr
    newDstrSize = round(Int64,ceil(length(dstr)/slid))
    newDstr = Array{Dict,1}(newDstrSize)
    #for loop with sliding factor
    for i in collect(1:newDstrSize)
        #for each sliding loop creates an array of keys,
        currI = (i-1)*slid+1
        currKeys = reduce(vcat,map(x -> collect(keys(x)), dstr[max(currI-win,1):(currI)]))
        #subsequently populate an dict for each keys
        currDict = Dict{keytype(dstr[1]),valtype(dstr[1])}()
        for k in currKeys
            #collect all value in the window
            currKeyCol = Array{valtype(dstr[1]),1}()
            for j in collect(max(currI-win,1):currI)
                if haskey(dstr[j],k)
                    push!(currKeyCol,dstr[j][k])
                end
            end
            #Then reduce.
            currDict[k] = reduce(f,currKeyCol)
        end
        newDstr[i] = currDict
    end
    return newDstr
end

function mapReduceByKey(mapF,reduceF, dict)
    return Dict(map(x -> Pair(x[1],mapreduce(mapF,reduceF,x[2])),dict))
end

function recurrentFixedWindow(t,dstr)
    #construct numeric recurrent vector from a fixed length sliding window
    #with default values of zero
    #assume all vector within any single slot is of same length.
    #input is an array of Dicts of string to (numeric array or number)
    
    #First let's determine n the number of array of a single slot.
    n = length(first(dstr[1])[2])
    #let's determine l, the number of windows the operation will result in.
    l = length(dstr) - t
    newdst = Array{Dict{keytype(dstr[1]),Array{Number,1}},1}(l)
    for i in collect(1:l)
        #TODO make sliding window operation to optimize performance
        #create an array of possible keys
        currKeys = reduce(vcat,map(x -> collect(keys(x)), dstr[i:(i+t)]))
        currDict = Dict{keytype(dstr[1]),Array{Number,1}}()
        #for each of the keys we'll populate a recurrent vector.
        for k in currKeys
            currArr = Array{Number,1}()
            for j in collect(i:(i+t))
                if haskey(dstr[j],k)
                    currArr = vcat(currArr,dstr[j][k])
                else
                    #Note that default value is filled if the key doesn't exist within a slot.
                    append!(currArr,zeros(n))
                end
            end
            currDict[k]=currArr
        end
        newdst[i] = currDict
    end
    return newdst
end


#File Read In
#x = [JSON.parse(readlines(GZip.open("../16-06-17/"*x))) for x in readdir("../16-06-17")]; #File Readin
#x = mapreduce(x->map(y -> JSON.parse(y),readlines(GZip.open("../16-06-17/"*x))),vcat,readdir("../16-06-17")[1:6])
#This is the old file read in.  works well for 1 file.  Thus we will have to load&parse each file independently.

recurrentFixedWindow (generic function with 1 method)

In [5]:
# Filter out Tweets from a mix of delete and tweets
#split into windows
function toDstream(inX,t) #t=120000
    println("Begin binning to 2 minutes slots")
    x= inX[findn(["created_at" in keys(y) for y in inX])];
    dt = join(split(x[1]["created_at"]," ")[[1,2,3,4,6]]," ")
    startTime = DateTime(dt,"e u d H:M:S y")
    dt = join(split(x[end]["created_at"]," ")[[1,2,3,4,6]]," ")
    endTime = DateTime(dt,"e u d H:M:S y")
    println(startTime)
    windowCount = 1+round(Int,ceil(Dates.value(endTime - startTime) /t ))#Divid into 2 minutes windows
    Tweets = Array{Array{Dict}}(windowCount)
    for i in collect(1:windowCount)
        Tweets[i] = Array{Dict,1}()
    end
    print("windowCount:")
    println(windowCount)
    for tweet in x
        dt = join(split(tweet["created_at"]," ")[[1,2,3,4,6]]," ")
        currTime = DateTime(dt,"e u d H:M:S y")
        windowIndex = round(Int,ceil(Dates.value(currTime - startTime) /t ))#Divid into 2 minutes windows 
        if (windowIndex >= 0) && (windowIndex < (windowCount))
            append!(Tweets[windowIndex+1],[tweet])
        else
            println(windowIndex)
        end
    end
    #Tweets[1][1] #First 10 sec Window's First tweet.
    return Tweets
end

toDstream (generic function with 1 method)

In [6]:
#Compile all tweets Per finds hashs within a single tweet
function findHash(x)
    words = split(x," ")
    return words[find(word -> (length(word) > 1)&& (word[1] == '#'),words)]
end
#findHash(x[20]["text"])
#x[20]["text"]
function extractHashMap(x) #calculates per window
    z = mapreduce(i-> [(y,i) for y in findHash(i["text"])],vcat,x);
    # z is the table of hash to a list of tweets
    c = Dict{AbstractString,Array}();
    for i in z
        if !haskey(c,i[1])
            c[i[1]] = []
        end
        append!(c[i[1]],[i[2]])
    end
    return c
end
function toHashMap(Tweetdstr)
    println("Begin sorting into Hashes")
    HashTweetMap = map(extractHashMap,Tweetdstr)#Dict{Hash, Tweets} per 10 secs window
end


toHashMap (generic function with 1 method)

In [13]:
#Compile ML input Vectors
function toVectorDstr(HashTweetMap)
    println("Conversion to Vectors")
    vectorSize = 1
    vectorIndMapping = Array{Tuple{Int64,AbstractString},1}()
    x_vector = Array{Array{Number,1},1}()
    
    ##Volume
    function calculateVolume(x) #calculates per window for all hash [tweets] key pairs
        map(v -> length(v[2]),x)
    end
    HashVolumeMap = map(x-> mapByKey(length,x),HashTweetMap)
    #HashVolumeMap = reduceByKeyAndWindow((+),10,1,HashVolumeMap) #This is sum into overlapping 100 secs windows
    
    ##Unique Users Count
    HashUserMap = map(z -> Dict(map(x-> Pair(x[1],unique(map(y-> y["user"]["name"],x[2]))),z)),HashTweetMap)
    #HashUserMap = reduceByKeyAndWindow(vcat,10,1,HashUserMap) #Organize into 100 secs windows
    HashUserMap = map(z -> Dict(map(x-> Pair(x[1],length(unique(x[2]))),z)),HashUserMap)
    
    
    ## 1st degree Connected Tweets
    HashDegreeMap = map(z -> Dict(map(x-> Pair(x[1],unique(map(y-> findHash(y["text"]),x[2]))),z)),HashTweetMap)
    #HashDegreeMap = reduceByKeyAndWindow(vcat,10,1,HashDegreeMap) #Organize into 100 secs windows
    HashDegreeMap = map(z -> Dict(map(x-> Pair(x[1],unique(x[2])),z)),HashDegreeMap)
    HashDegreeMap = map(z -> Dict(map(x-> Pair(x[1],mapreduce(y-> HashVolumeMap[z][y[1]],(+),vcat(x[2]))),HashDegreeMap[z])),1:length(HashDegreeMap))
    
    #Summarize into x_vector essentially flatmap into keyless
    HashVectorMap = Array{Dict{AbstractString,Any},1}(length(HashVolumeMap))
    
    for i in collect(1:length(HashVolumeMap))
        currDict = Dict{AbstractString,Any}()
        for k in keys(HashVolumeMap[i])
            currDict[k] = [HashUserMap[i][k],HashVolumeMap[i][k],HashDegreeMap[i][k]]
        end
        HashVectorMap[i] = currDict
    end
    
    return HashVectorMap
end

function toXVectors(VectorDStr)
    recurrentCount = 5
    #vectorIndMapping = Array{Tuple{Int64,AbstractString},1}()
    x_vector = Array{Array{Number,1},1}()
    HashVectorMap = recurrentFixedWindow(recurrentCount,VectorDStr)
    newdict = Array{Dict{keytype(VectorDStr[1]),Integer},1}(length(HashVectorMap))
    for i in collect(1:length(HashVectorMap))
        newdict[i] = Dict{keytype(VectorDStr[1]),Integer}() 
    end
    for i in collect(1:length(HashVectorMap))
        for k in keys(HashVectorMap[i])
            push!(x_vector, HashVectorMap[i][k])
            newdict[i][k] = length(x_vector)
        end
    end
    return (x_vector, newdict)
end

toXVectors (generic function with 1 method)

In [8]:
#Actually running each file one by one.  #First two hours
x_store = mapreduce(x->toVectorDstr(toHashMap(toDstream(map(y -> JSON.parse(y),readlines(GZip.open("/home/ac/TwitterData/16-06-17/"*x))),120000))),vcat,readdir("/home/ac/TwitterData/16-06-17")[1:2])
1

Begin binning to 2 minutes slots
2016-06-17T03:49:59
windowCount:6
Begin sorting into Hashes
Conversion to Vectors
Begin binning to 2 minutes slots
2016-06-17T03:59:46
windowCount:7
Begin sorting into Hashes
Conversion to Vectors


1

In [25]:
#getting training Truths
(x_vector,vectorIndMapping) = toXVectors(x_store)
using JSON
import GZip
trendx = mapreduce(x->map(y -> JSON.parse(y),readlines(GZip.open("/home/ac/TwitterData/trend16-06-17/"*x))),vcat,readdir("/home/ac/TwitterData/trend16-06-17")[1:6]) #File Readin
trendx = mapreduce(i -> mapreduce(j-> Pair(i["timestamp_ms"],j["Name"]),vcat,i["Trend"]),vcat,trendx)
trendingHashes = unique(trendx[find(i -> i[2][1] == '#',trendx)])
recurrentCount = 5
#dt = Dates.datetime2unix(startTime)*1000
dt = first(trendingHashes)[1]
trendingHashes = map(x -> Pair(round(Int64, (x[1] - dt)/120000),x[2]), trendingHashes)
y_vector = -ones(length(x_vector))
for x in trendingHashes
    recurrentIndex = x[1] - recurrentCount
    if (recurrentIndex > 0)
        for i in collect(0:10)#Window to declare subsignal a trend from declaration.
            if (x[1]-i > 0) && (x[1]-i < length(vectorIndMapping)) && haskey(vectorIndMapping[x[1]-i],x[2])
                y_vector[vectorIndMapping[x[1]-i][x[2]]] = 1
            end
        end
    end
end
1
#print(find(x-> x== (2,"#reddit"), vectorIndMapping))
#sum([!haskey(c,i) for i in trendingHashes]) #TODO

1

In [92]:
##### Initilzie ML

###First Logistical Regression Methods
using Regression
println("Attempt#1: Logistical Regression")
X = Array{Float64,2}(reduce(hcat,x_vector));
y = sign(y_vector);
(d,n) = size(X)
#Uniform training data
#X_train = X[:,1:round(Int64,n/2)]
#y_train = y[1:round(Int64,n/2)]
#Sampled Biased training data.
    trainN = length(find(x -> x == 1, y_train))
    trainSample = vcat(find(x -> x == -1, y_train)[1:trainN],find(x -> x == 1, y_train))
    y_train = y_train[trainSample]
    X_train = X_train[:,trainSample]
# perform estimation #Taken from Regression.jl FrontPage
ret = Regression.solve(
logisticreg(X_train, y_train; bias=1.0),   # construct a logistic regression problem
    #solver=GD(),
    reg=SqrL2Reg(1.0e-2),          # apply squared L2 regularization
options=Regression.Options(maxiter =200,verbosity=:final, grtol=1.0e-6 * n))  # set options
#Test Solution
X_test = X[:,round(Int64,n/2):end]
y_test = y[round(Int64,n/2):end]
w_hat = ret.sol
y_hat = sign(X_test'w_hat[1:d] +w_hat[d+1] )
errorVec = y_test-y_hat
errorRate = sum(abs(errorVec))*0.5 / (length(errorVec)) 
print("Logistical Regression Grand Error Rate Against Test = ")
println(errorRate)
errorVec = errorVec[find(x -> x == 1, y_test)]
errorRate = sum(abs(errorVec))*0.5 / (length(errorVec)) 
print("Logistical Regression False Positive Rate Against Test = ")
println(errorRate)

###Neural Net  Mocha.jl




Attempt#1: Logistical Regression


LoadError: LoadError: MethodError: `size` has no method matching size(::Pair{Int64,ASCIIString})
Closest candidates are:
  size(::Any, !Matched::Integer, !Matched::Integer, !Matched::Integer...)
while loading In[92], in expression starting on line 8

In [33]:
#ProtoType Shop
using Mocha
y_vector = (y_vector +1) ./2
trainingSet = vcat(find(y-> y==1,y_vector)[1:25],collect(1:75))
testingSet = vcat(find(y-> y==1,y_vector)[1:100],collect(1:200))
testingSetPos = vcat(find(y-> y==1,y_vector))
testingSetNeg = vcat(find(y-> y==0,y_vector))
data  =  MemoryDataLayer(name="train-data",tops=[:data,:label],data=Array[mapreduce(i -> Array{Float64}(i),hcat,x_vector[trainingSet]), 
y_vector[trainingSet]],batch_size=25)
#conv  = ConvolutionLayer(name="conv1",n_filter=20,kernel=(5,5),bottoms=[:data],tops=[:conv])
#pool  = PoolingLayer(name="pool1",kernel=(2,2),stride=(2,2),bottoms=[:conv],tops=[:pool])
#conv2 = ConvolutionLayer(name="conv2",n_filter=50,kernel=(5,5),bottoms=[:pool],tops=[:conv2])
#pool2 = PoolingLayer(name="pool2",kernel=(2,2),s  tride=(2,2),bottoms=[:conv2],tops=[:pool2])
fc1   = InnerProductLayer(name="ip1",output_dim=500,neuron=Neurons.ReLU(),bottoms=[:data],
                          tops=[:ip1])
fc3   = InnerProductLayer(name="ip3",output_dim=700,neuron=Neurons.ReLU(),bottoms=[:ip1],
                          tops=[:ip3])
fc2   = InnerProductLayer(name="ip2",output_dim=2,bottoms=[:ip3],tops=[:ip2])
loss  = SoftmaxLossLayer(name="loss",bottoms=[:ip2,:label])

backend = DefaultBackend()
init(backend)

common_layers = [fc1,fc3, fc2]
net = Net("TweetDetect-train", backend, [data, common_layers..., loss])

exp_dir = "snapshots"
solver_method = SGD()
params = make_solver_parameters(solver_method, max_iter=10000, regu_coef=0.0005,
    mom_policy=MomPolicy.Fixed(0.9),
    lr_policy=LRPolicy.Inv(0.01, 0.0001, 0.75))
solver = Solver(solver_method, params)

#setup_coffee_lounge(solver, save_into="$exp_dir/statistics.jld", every_n_iter=1000)

#report training progress every 100 iterations
add_coffee_break(solver, TrainingSummary(), every_n_iter=1000)

# save snapshots every 5000 iterations
#add_coffee_break(solver, Snapshot(exp_dir), every_n_iter=5000)

# show performance on test data every 1000 iterations
#data_test = HDF5DataLayer(name="test-data",source="test-data-list.txt",batch_size=100)
data_test =  MemoryDataLayer(name="test-data",data=Array[mapreduce(i -> Array{Float64}(i),hcat,x_vector[testingSet]), 
    y_vector[testingSet]],batch_size=50)
accuracy = AccuracyLayer(name="test-accuracy",bottoms=[:ip2, :label])
test_net = Net("TweetDetect-test", backend, [data_test, common_layers..., accuracy])
add_coffee_break(solver, ValidationPerformance(test_net), every_n_iter=1000)

data_test =  MemoryDataLayer(name="testPos-data",data=Array[mapreduce(i -> Array{Float64}(i),hcat,x_vector[testingSetPos]), 
    y_vector[testingSetPos]],batch_size=50)
accuracy = AccuracyLayer(name="testPos-accuracy",bottoms=[:ip2, :label])
test_net = Net("TweetDetect-testPos", backend, [data_test, common_layers..., accuracy])
add_coffee_break(solver, ValidationPerformance(test_net), every_n_iter=1000)



data_test =  MemoryDataLayer(name="testNeg-data",data=Array[mapreduce(i -> Array{Float64}(i),hcat,x_vector[testingSetNeg]), 
    y_vector[testingSetNeg]],batch_size=50)
accuracy = AccuracyLayer(name="testNeg-accuracy",bottoms=[:ip2, :label])
test_net = Net("TweetDetect-testNeg", backend, [data_test, common_layers..., accuracy])
add_coffee_break(solver, ValidationPerformance(test_net), every_n_iter=1000)



solve(solver, net)

destroy(net)
destroy(test_net)
shutdown(backend)

19-Jul 14:42:07:INFO:root:Constructing net TweetDetect-train on Mocha.CPUBackend...
19-Jul 14:42:07:INFO:root:Topological sorting 5 layers...
19-Jul 14:42:07:INFO:root:Setup layers...
19-Jul 14:42:07:INFO:root:Network constructed!
19-Jul 14:42:07:INFO:root:Constructing net TweetDetect-test on Mocha.CPUBackend...
19-Jul 14:42:07:INFO:root:Topological sorting 5 layers...
19-Jul 14:42:07:INFO:root:Setup layers...
19-Jul 14:42:07:DEBUG:root:InnerProductLayer(ip1): sharing weights and bias
19-Jul 14:42:07:DEBUG:root:InnerProductLayer(ip3): sharing weights and bias
19-Jul 14:42:07:DEBUG:root:InnerProductLayer(ip2): sharing weights and bias
19-Jul 14:42:07:INFO:root:Network constructed!
19-Jul 14:42:07:INFO:root:Constructing net TweetDetect-testPos on Mocha.CPUBackend...
19-Jul 14:42:07:INFO:root:Topological sorting 5 layers...
19-Jul 14:42:07:INFO:root:Setup layers...
19-Jul 14:42:07:DEBUG:root:InnerProductLayer(ip1): sharing weights and bias
19-Jul 14:42:07:DEBUG:root:InnerProductLayer(ip3)

LoadError: LoadError: InterruptException:
while loading In[33], in expression starting on line 66

In [38]:
# Hash Tweet Freq plot
function tohashFreq(HashTweetMap)
    #return map(j->map(z -> Pair(z[1],map(i -> i["timestamp_ms"],z[2])),j),HashTweetMap)
    return  map(i -> mapByKey(z->map(j -> j["timestamp_ms"],z),i),HashTweetMap)
end
x_source = mapreduce(x->tohashFreq(toHashMap(toDstream(map(y -> JSON.parse(y),readlines(GZip.open("../16-06-17/"*x))),120000))),vcat,readdir("../16-06-17")[1:2])
1




Begin binning to 2 minutes slots
2016-06-17T03:49:59
Begin sorting into Hashes
Begin binning to 2 minutes slots
2016-06-17T03:59:46
Begin sorting into Hashes


1

In [51]:
x = toHashMap(toDstream(map(y -> JSON.parse(y),readlines(GZip.open("../16-06-17/00-00.json.gz"))[1:50]),120000));
1

Begin binning to 2 minutes slots


1

2016-06-17T03:49:59
windowCount:1
Begin sorting into Hashes


In [None]:
## x_pos_data[:,find(x_pos_data[1,:])]


In [18]:
println(size(unique(map(x->x[2],trendingHashes))))
println(size(map(x->x[2],trendingHashes)))
println(size(map(x->x[2],trendingHashes))[1]/size(unique(map(x->x[2],trendingHashes)))[1]/30)

(303,)
(12863,)
1.415071507150715


In [9]:
x = mapreduce(x->map(y -> JSON.parse(y),readlines(GZip.open("../16-06-17/"*x))),vcat,readdir("../16-06-17"))
y = toHashMap(toDstream(x,1200000))
map(j -> map(z -> Pair(z[1],map(i -> i["timestamp_ms"],z[2])),j), y)
1

Begin binning to 2 minutes slots
2016-06-17T03:49:59


1

Begin sorting into Hashes


In [28]:
z = y[2]["#NBAFinals"]
map(j -> map(z -> Pair(z[1],map(i -> i["timestamp_ms"],z[2])),j), y)

2-element Array{Array{Any,1},1}:
 Any["#KD2DC"=>Any["1466135399659"],"#anarchy"=>Any["1466135399657"],"#creature"=>Any["1466135399657"],"#메이저놀이터추천\n#메이저놀이터추천\n사-설-토-토-추-천-사-이-트\n👔\n👔\n👔\n👔\n👔\n🔮🐌🔮"=>Any["1466135399661"],"#creatureart"=>Any["1466135399657"],"#विलंबित_न्याय_है_अन्याय"=>Any["1466135399664"],"#art"=>Any["1466135399657"],"#PS4live"=>Any["1466135399657"],"#alien"=>Any["1466135399657"],"#MTVAWARDSSTAR"=>Any["1466135399659","1466135399657"],"#ViajoConAlexTienda"=>Any["1466135399658"],"#OrangeIsTheNewBlack?"=>Any["1466135399659"]]
 Any["#ドッカンバトル"=>Any["1466135400664"],"#Game7"=>Any["1466135400661"],"#11yearsWithTVXQ"=>Any["1466135400665"],"#NBAFinals"=>Any["1466135400662"],"#ArjonaEnDibujos"=>Any["1466135400661"],"#1/14\n\nPor"=>Any["1466135400661"],"#KLiteVote\""=>Any["1466135400660"],"#ALLin216"=>Any["1466135400660"],"#ポートレート\n#青森県内の被写体さんと繋がりたい\n#被写体さんと繋がりたい\n#ファインダー越しの私の世界\n#雰囲気好きな人RT"=>Any["1466135400660"],"#DesafioWarcraft"=>Any["1466135400666"],"#AllInCavs"=>Any["14661354

In [1]:
using JSON
import GZip
trendx = mapreduce(x->map(y -> JSON.parse(y),readlines(GZip.open("../trend16-06-17/"*x))),vcat,readdir("../trend16-06-17")[1:6]) #File Readin
trendx = mapreduce(i -> mapreduce(j-> Pair(i["timestamp_ms"],j["Name"]),vcat,i["Trend"]),vcat,trendx)
trendingHashes = unique(trendx[find(i -> i[2][1] == '#',trendx)])
recurrentCount = 5
#dt = Dates.datetime2unix(startTime)*1000
dt = first(trendingHashes)[1]

1466135608963

In [9]:
trendingHashes

LoadError: LoadError: type Array has no field val
while loading In[9], in expression starting on line 1

In [36]:
[x_store[i]["\#OceanMovies"] for i in collect(2:6)]

5-element Array{Any,1}:
 [5,5,5]   
 [9,9,9]   
 [10,10,10]
 [15,15,15]
 [20,20,20]

In [39]:
x_source[1]

Dict{AbstractString,Any} with 12 entries:
  "#KD2DC"                 => Any["1466135399659"]
  "#anarchy"               => Any["1466135399657"]
  "#creature"              => Any["1466135399657"]
  "#메이저놀이터추천\n#메… => Any["1466135399661"]
  "#creatureart"           => Any["1466135399657"]
  "#विलंबित_न्याय_है_अ… => Any["1466135399664"]
  "#art"                   => Any["1466135399657"]
  "#PS4live"               => Any["1466135399657"]
  "#alien"                 => Any["1466135399657"]
  "#MTVAWARDSSTAR"         => Any["1466135399659","1466135399657"]
  "#ViajoConAlexTienda"    => Any["1466135399658"]
  "#OrangeIsTheNewBlack?"  => Any["1466135399659"]

In [46]:
#Hash Freq analysis
function freqAnalaysis(dateString)
    println("Begin analysis")
    trendx = mapreduce(x->map(y -> JSON.parse(y),readlines(GZip.open("Step0_Raw/Json/trend"*dateString*"/"*x))),vcat,readdir("Step0_Raw/Json/trend"*dateString));
    trendx = mapreduce(i -> mapreduce(j-> Pair(i["timestamp_ms"],j["Name"]),vcat,i["Trend"]),vcat,trendx)
    trendingHashes = unique(trendx[find(i -> i[2][1] == '#',trendx)]);
    trendingHashes = reduce(vcat, map(x-> hcat(x[2],x[1]),trendingHashes))
    println("found all trends")
    trendingHashtags = unique(trendingHashes[:,1]) #Important A list of trending hashes during this 24 hour window.
    trendingHashtagsFirstTimestamp = hcat(trendingHashes[map(i->findfirst(trendingHashes[:,1],i),trendingHashtags),:],collect(1:length(trendingHashtags)))
    writecsv("trendingHashtagsFirstTimestamp.csv",trendingHashtagsFirstTimestamp)
    println("Saved trending Hashes and their first time stamp")
    trendingHashtags = vcat(trendingHashtags,["#love";"#Pokemon";"#happy"]) #Adding non trending hashtags
    x_source = mapreduce(x->tohashFreq(toHashMap(toDstream(map(y -> JSON.parse(y),readlines(GZip.open("Step0_Raw/Json/"*dateString*"/"*x))),120000))),vcat,readdir("Step0_Raw/Json/"*dateString)[10:15]);
    println("Begin construct Freq Analysis for the chosen hashtags")
    z = map(k -> Pair(k,map(y-> y[2],mapreduce(x-> x[find(i -> i[1] == k,x)],vcat,x_source))),trendingHashtags);
    print(length(find(i -> ! isempty(i[2]),z))) print("/")
    print(length(z))
    println(" Empty")
    z = z[find(i -> ! isempty(i[2]),z)];
    z = map(k -> Pair(k[1],reduce(vcat,k[2])),z);
    sol = mapreduce(z -> hcat(z[1],z[2]),vcat,mapreduce(x->map(i->(i,find(j -> j==x[1],trendingHashtags)[1]),x[2]),vcat ,z));
    writecsv("HashFreqAnalaysis.csv",sol)
    #Sol is set to chart hash freq 
end

true

In [15]:
(x_vector,vectorIndMapping) = toXVectors(x_store);
1

1

In [31]:
sum(x_vector[find(x -> x == 1,y_vector)])

18-element Array{Number,1}:
 117.0
 117.0
 186.0
 137.0
 137.0
 215.0
 129.0
 129.0
 193.0
 109.0
 109.0
 158.0
 110.0
 111.0
 160.0
 108.0
 109.0
 157.0