This repository has been archived by the owner on Jun 30, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
spark_job.rb
117 lines (110 loc) · 4.61 KB
/
spark_job.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# frozen_string_literal: true
# Methods for Basic Spark Jobs.
class SparkJob < ApplicationJob
queue_as :spark
after_perform do
update_dashboard = Dashboard.find_by(job_id: job_id)
update_dashboard.end_time = DateTime.now.utc
update_dashboard.save
end
def perform(user_id, collection_id)
Dashboard.find_or_create_by!(
job_id: job_id,
user_id: user_id,
collection_id: collection_id,
queue: 'spark',
start_time: DateTime.now.utc
)
Collection.where('user_id = ? AND collection_id = ?', user_id, collection_id).each do |c|
collection_path = ENV['DOWNLOAD_PATH'] +
'/' + c.account.to_s +
'/' + c.collection_id.to_s + '/'
collection_warcs = collection_path + 'warcs'
collection_derivatives = collection_path + c.user_id.to_s + '/derivatives'
collection_logs = collection_path + c.user_id.to_s + '/logs'
FileUtils.mkdir_p collection_logs
FileUtils.rm_rf collection_derivatives
FileUtils.mkdir_p collection_derivatives
aut_version = ENV['AUT_VERSION']
aut_jar_path = ENV['AUT_PATH']
spark_home = ENV['SPARK_HOME']
spark_threads = ENV['SPARK_THREADS']
spark_domains = spark_home +
'/bin/spark-submit --master local[' +
spark_threads +
'] --class io.archivesunleashed.app.CommandLineAppRunner ' +
aut_jar_path +
'/aut-' +
aut_version +
'-fatjar.jar --extractor DomainFrequencyExtractor --input ' +
collection_warcs +
' --output ' +
collection_derivatives +
'/all-domains/output 2>&1 | tee ' +
collection_logs +
'/' +
collection_id.to_s +
'-domains-' +
DateTime.now.utc.strftime('%Y%m%d%H%M') +
'.log'
spark_text = spark_home +
'/bin/spark-submit --master local[' +
spark_threads +
'] --class io.archivesunleashed.app.CommandLineAppRunner ' +
aut_jar_path +
'/aut-' +
aut_version +
'-fatjar.jar --extractor WebPagesExtractor --input ' +
collection_warcs +
' --output ' +
collection_derivatives +
'/all-text/output 2>&1 | tee ' +
collection_logs +
'/' +
collection_id.to_s +
'-text-' +
DateTime.now.utc.strftime('%Y%m%d%H%M') +
'.log'
spark_gephi = spark_home +
'/bin/spark-submit --master local[' +
spark_threads +
'] --class io.archivesunleashed.app.CommandLineAppRunner ' +
aut_jar_path +
'/aut-' +
aut_version +
'-fatjar.jar --extractor DomainGraphExtractor --input ' +
collection_warcs +
' --output ' +
collection_derivatives +
'/gephi --output-format graphml 2>&1 | tee ' +
collection_logs +
'/' +
collection_id.to_s +
'-gephi-' +
DateTime.now.utc.strftime('%Y%m%d%H%M') +
'.log'
Parallel.map([spark_domains, spark_text, spark_gephi], in_threads: 3) do |auk_job|
logger.info 'Executing: ' + auk_job
system(auk_job)
end
domain_success = collection_derivatives + '/all-domains/output/_SUCCESS'
fulltext_success = collection_derivatives + '/all-text/output/_SUCCESS'
graphml_success = collection_derivatives + '/gephi/GRAPHML.graphml'
graphml = collection_derivatives +
'/gephi/' +
collection_id.to_s +
'-gephi.graphml'
if File.exist?(domain_success) && File.exist?(fulltext_success) &&
File.exist?(graphml_success) && !File.empty?(graphml_success)
FileUtils.mv(graphml_success, graphml)
logger.info 'Executed: Domain Graph cleanup.'
GraphpassJob.set(queue: :graphpass)
.perform_later(user_id, collection_id)
else
UserMailer.notify_collection_failed(c.user_id.to_s,
c.collection_id.to_s).deliver_now
raise 'Collections spark job failed.'
end
end
end
end