Permalink
Browse files

Download conversation history for all correspondents

  • Loading branch information...
1 parent abb067d commit bf360ae1b1b2d60011b2e586f7c4c6074e51970d @ept committed Feb 6, 2012
Showing with 69 additions and 2 deletions.
  1. +1 −0 .gitignore
  2. +1 −0 Gemfile
  3. +2 −0 Gemfile.lock
  4. +2 −1 lib/cotweet-export.rb
  5. +4 −0 lib/cotweet/connection.rb
  6. +46 −0 lib/cotweet/download_queue.rb
  7. +13 −1 lib/cotweet/export.rb
View
@@ -1,3 +1,4 @@
closed.json
messages.json
sent.json
+conversations
View
@@ -3,6 +3,7 @@ source :rubygems
gem 'em-http-request'
gem 'highline'
gem 'json'
+gem 'andand'
# TODO: Update this when mainline DG has the features we want.
gem 'deferrable_gratification', :git => 'git://github.com/ConradIrwin/deferrable_gratification.git'
View
@@ -9,6 +9,7 @@ GEM
remote: http://rubygems.org/
specs:
addressable (2.2.6)
+ andand (1.3.1)
cookiejar (0.3.0)
em-http-request (1.0.1)
addressable (>= 2.2.3)
@@ -27,6 +28,7 @@ PLATFORMS
ruby
DEPENDENCIES
+ andand
deferrable_gratification!
em-http-request
highline
@@ -4,13 +4,14 @@
require 'eventmachine'
require 'deferrable_gratification'
require 'em-http'
+require 'set'
DG.enhance! EM::HttpClient
$stdout.sync = true
%w{
- connection export
+ connection download_queue export
}.each do |filename|
require File.join(File.dirname(__FILE__), 'cotweet', filename)
end
@@ -82,5 +82,9 @@ def each_message(timeline_name, &block)
end
end
end
+
+ def get_conversation(user_id)
+ get_json "/api/1/twitterers/#{user_id}/conversations.json", :limit => 20
+ end
end
end
@@ -0,0 +1,46 @@
+module CoTweet
+ class DownloadQueue
+ MAX_CONCURRENCY = 10
+
+ def initialize(&block)
+ @operation = block
+ @items_seen = Set.new
+ @active = {}
+ @queued = []
+ end
+
+ def <<(item)
+ return if @items_seen.include? item
+ @items_seen << item
+
+ if has_capacity?
+ start(item)
+ else
+ @queued << item
+ end
+ end
+
+ def finish
+ return DG.success if idle?
+ @finishing ||= DG.blank
+ end
+
+ private
+
+ def idle?
+ @active.empty?
+ end
+
+ def has_capacity?
+ @active.size < MAX_CONCURRENCY
+ end
+
+ def start(item)
+ @active[item] = @operation.call(item).bothback do
+ @active.delete item
+ start(@queued.shift) if has_capacity? && !@queued.empty?
+ @finishing.succeed if @finishing && idle?
+ end
+ end
+ end
+end
View
@@ -16,14 +16,26 @@ def export_timeline(timeline_name)
file.puts ',' unless first_message
file.write JSON.pretty_generate(msg)
first_message = false
+ msg['twitterer'].andand['id'].andand {|user_id| conversation_downloads << user_id }
end.bothback do
file.puts ']}'
file.close
end
end
+ def conversation_downloads
+ @conversation_downloads ||= DownloadQueue.new do |user_id|
+ connection.get_conversation(user_id).safe_callback do |json|
+ File.open("conversations/#{user_id}.json", 'w') do |file|
+ file.write JSON.pretty_generate(json)
+ end
+ end
+ end
+ end
+
def run
- JoinCarefully.setup! *%w(messages sent closed).map(&method(:export_timeline))
+ JoinCarefully.setup!(*%w(messages sent closed).map(&method(:export_timeline))).
+ bind! { conversation_downloads.finish }
end
def self.run!

0 comments on commit bf360ae

Please sign in to comment.