Skip to content

Commit

Permalink
Merge branch 'release/r115-sigiriya'
Browse files Browse the repository at this point in the history
  • Loading branch information
BenFradet committed Jul 18, 2019
2 parents 6c294da + 8b1cd7e commit 07a4ef8
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 68 deletions.
2 changes: 1 addition & 1 deletion 3-enrich/emr-etl-runner/lib/snowplow-emr-etl-runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@
module Snowplow
module EmrEtlRunner
NAME = "snowplow-emr-etl-runner"
VERSION = "0.34.2"
VERSION = "0.34.3"
end
end
81 changes: 21 additions & 60 deletions 3-enrich/emr-etl-runner/lib/snowplow-emr-etl-runner/emr_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -717,17 +717,31 @@ def get_elasticsearch_steps(config, assets, enrich, shred, failure_storages)
def run(config)

snowplow_tracking_enabled = ! config[:monitoring][:snowplow].nil?
if snowplow_tracking_enabled
Monitoring::Snowplow.parameterize(config)
end

@pending_jobflow_steps.each do |jobflow_step|
begin
retries ||= 0
# if the job flow is already running this triggers an HTTP call
@jobflow.add_step(jobflow_step)
rescue Elasticity::ThrottlingException, RestClient::RequestTimeout, RestClient::InternalServerError, RestClient::ServiceUnavailable, RestClient::SSLCertificateNotVerified
logger.warn "Got an error while trying to submit a jobflow step: #{jobflow_step.name}"
retries += 1
sleep(2 ** retries + 30)
retry if retries < 3
rescue Elasticity::ThrottlingException, RestClient::RequestTimeout, RestClient::InternalServerError, RestClient::ServiceUnavailable, RestClient::SSLCertificateNotVerified => e
if retries < 3
retries += 1
delay = 2 ** retries + 30
logger.warn "Got error [#{e.message}] while trying to submit jobflow step [#{jobflow_step.name}] to jobflow [#{@jobflow.jobflow_id}]. Retrying in #{delay} seconds"
sleep(delay)
retry
else
if snowplow_tracking_enabled
step_status = Elasticity::ClusterStepStatus.new
step_status.name = "Add step [#{jobflow_step.name}] to jobflow [#{@jobflow.jobflow_id}]. (Error: [#{e.message}])"
step_status.state = "FAILED"
Monitoring::Snowplow.instance.track_single_step(step_status)
end
raise EmrExecutionError, "Can't add step [#{jobflow_step.name}] to jobflow [#{@jobflow.jobflow_id}] (retried 3 times). Error: [#{e.message}]."
end
end
end

Expand All @@ -746,7 +760,6 @@ def run(config)
logger.debug "EMR jobflow #{jobflow_id} started, waiting for jobflow to complete..."

if snowplow_tracking_enabled
Monitoring::Snowplow.parameterize(config)
Monitoring::Snowplow.instance.track_job_started(jobflow_id, cluster_status(@jobflow), cluster_step_status_for_run(@jobflow))
end

Expand Down Expand Up @@ -1107,34 +1120,6 @@ def wait_for
JobResult.new(success, bootstrap_failure, rdb_loader_failure, rdb_loader_cancellation)
end

# Prettified string containing failure details
# for this job flow.
Contract String => String
def get_failure_details(jobflow_id, cluster_status, cluster_step_status_for_run)
[
"EMR jobflow #{jobflow_id} failed, check Amazon EMR console and Hadoop logs for details (help: https://github.com/snowplow/snowplow/wiki/Troubleshooting-jobs-on-Elastic-MapReduce). Data files not archived.",
"#{@jobflow.name}: #{cluster_status.state} [#{cluster_status.last_state_change_reason}] ~ #{self.class.get_elapsed_time(cluster_status.ready_at, cluster_status.ended_at)} #{self.class.get_timespan(cluster_status.ready_at, cluster_status.ended_at)}"
].concat(cluster_step_status_for_run
.sort { |a,b|
self.class.nilable_spaceship(a.started_at, b.started_at)
}
.each_with_index
.map { |s,i|
" - #{i + 1}. #{s.name}: #{s.state} ~ #{self.class.get_elapsed_time(s.started_at, s.ended_at)} #{self.class.get_timespan(s.started_at, s.ended_at)}"
})
.join("\n")
end

# Gets the time span.
#
# Parameters:
# +start+:: start time
# +_end+:: end time
Contract Maybe[Time], Maybe[Time] => String
def self.get_timespan(start, _end)
"[#{start} - #{_end}]"
end

# Spaceship operator supporting nils
#
# Parameters:
Expand All @@ -1154,32 +1139,6 @@ def self.nilable_spaceship(a, b)
end
end

# Gets the elapsed time in a
# human-readable format.
#
# Parameters:
# +start+:: start time
# +_end+:: end time
Contract Maybe[Time], Maybe[Time] => String
def self.get_elapsed_time(start, _end)
if start.nil? or _end.nil?
"elapsed time n/a"
else
# Adapted from http://stackoverflow.com/a/19596579/255627
seconds_diff = (start - _end).to_i.abs

hours = seconds_diff / 3600
seconds_diff -= hours * 3600

minutes = seconds_diff / 60
seconds_diff -= minutes * 60

seconds = seconds_diff

"#{hours.to_s.rjust(2, '0')}:#{minutes.to_s.rjust(2, '0')}:#{seconds.to_s.rjust(2, '0')}"
end
end

# Recursively change the keys of a YAML from symbols to strings
def recursive_stringify_keys(h)
if h.class == [].class
Expand All @@ -1200,6 +1159,7 @@ def deep_copy(o)
#
# Parameters:
# +jobflow+:: The jobflow to extract steps from
Contract Elasticity::JobFlow => ArrayOf[Elasticity::ClusterStepStatus]
def cluster_step_status_for_run(jobflow)
begin
retries ||= 0
Expand All @@ -1213,6 +1173,7 @@ def cluster_step_status_for_run(jobflow)
end
end

Contract Elasticity::JobFlow => Elasticity::ClusterStatus
def cluster_status(jobflow)
begin
retries ||= 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,21 @@ def track_job_failed(jobflow_id, jobflow_status, jobflow_steps)
)
end

# Track a single step
Contract Elasticity::ClusterStepStatus => SnowplowTracker::Tracker
def track_single_step(step_status)
@tracker.track_unstruct_event(
SnowplowTracker::SelfDescribingJson.new(
STEP_STATUS_SCHEMA,
{
:name => step_status.name,
:state => step_status.state,
:created_at => Time.now.to_i
}
)
)
end

end
end
end
Expand Down
54 changes: 54 additions & 0 deletions 3-enrich/emr-etl-runner/lib/snowplow-emr-etl-runner/utils.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# License:: Apache License Version 2.0

require 'contracts'
require 'elasticity'

# Module with diverse utilities dealing with a few quirks in EmrEtlRunner
module Snowplow
Expand Down Expand Up @@ -224,6 +225,59 @@ def parse_duration(input)
time
end

# Prettified string containing failure details
# for this job flow.
Contract String, Elasticity::ClusterStatus, ArrayOf[Elasticity::ClusterStepStatus] => String
def get_failure_details(jobflow_id, cluster_status, cluster_step_status_for_run)
[
"EMR jobflow #{jobflow_id} failed, check Amazon EMR console and Hadoop logs for details (help: https://github.com/snowplow/snowplow/wiki/Troubleshooting-jobs-on-Elastic-MapReduce). Data files not archived.",
"#{jobflow_id}: #{cluster_status.state} [#{cluster_status.last_state_change_reason}] ~ #{get_elapsed_time(cluster_status.ready_at, cluster_status.ended_at)} #{get_timespan(cluster_status.ready_at, cluster_status.ended_at)}"
].concat(cluster_step_status_for_run
.sort { |a,b|
self.class.nilable_spaceship(a.started_at, b.started_at)
}
.each_with_index
.map { |s,i|
" - #{i + 1}. #{s.name}: #{s.state} ~ #{get_elapsed_time(s.started_at, s.ended_at)} #{get_timespan(s.started_at, s.ended_at)}"
})
.join("\n")
end

# Gets the elapsed time in a
# human-readable format.
#
# Parameters:
# +start+:: start time
# +_end+:: end time
Contract Maybe[Time], Maybe[Time] => String
def get_elapsed_time(start, _end)
if start.nil? or _end.nil?
"elapsed time n/a"
else
# Adapted from http://stackoverflow.com/a/19596579/255627
seconds_diff = (start - _end).to_i.abs

hours = seconds_diff / 3600
seconds_diff -= hours * 3600

minutes = seconds_diff / 60
seconds_diff -= minutes * 60

seconds = seconds_diff

"#{hours.to_s.rjust(2, '0')}:#{minutes.to_s.rjust(2, '0')}:#{seconds.to_s.rjust(2, '0')}"
end
end

# Gets the time span.
#
# Parameters:
# +start+:: start time
# +_end+:: end time
Contract Maybe[Time], Maybe[Time] => String
def get_timespan(start, _end)
"[#{start} - #{_end}]"
end
end
end
end
112 changes: 112 additions & 0 deletions 3-enrich/emr-etl-runner/spec/snowplow-emr-etl-runner/utils_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,116 @@
expect(subject.parse_duration("1w 5d 3h 13m")).to eq(1048380)
end
end

describe '#get_failure_details' do
let(:jobflow_id) { "JOB ID" }
let(:cluster_state) { 'TERMINATED' }
let(:timeline) do
<<-JSON
{
"CreationDateTime": 1436788464.415,
"EndDateTime": 1436791032.097,
"ReadyDateTime": 1436788842.195
}
JSON
end
let(:aws_cluster_status) do
<<-JSON
{
"Cluster": {
"Applications": [
{
"Name": "hadoop",
"Version": "1.0.3"
}
],
"AutoTerminate": true,
"Configurations": [
],
"Ec2InstanceAttributes": {
"Ec2AvailabilityZone": "us-east-1a",
"EmrManagedMasterSecurityGroup": "sg-b7de0adf",
"EmrManagedSlaveSecurityGroup": "sg-89de0ae1"
},
"Id": "j-3T0PHNUXCY7SX",
"MasterPublicDnsName": "ec2-54-81-173-103.compute-1.amazonaws.com",
"Name": "Elasticity Job Flow",
"NormalizedInstanceHours": 2,
"RequestedAmiVersion": "latest",
"RunningAmiVersion": "2.4.2",
"Status": {
"State": "#{cluster_state}",
"StateChangeReason": {
"Code": "ALL_STEPS_COMPLETED",
"Message": "Steps completed"
},
"Timeline": #{timeline}
},
"Tags": [
{
"Key": "key",
"Value": "value"
}
],
"TerminationProtected": false,
"VisibleToAllUsers": false
}
}
JSON
end

let (:cluster_status) { Elasticity::ClusterStatus.from_aws_data(JSON.parse(aws_cluster_status)) }

let(:aws_cluster_steps) do
<<-JSON
{
"Steps": [
{
"ActionOnFailure": "TERMINATE_CLUSTER",
"Config": {
"Args": [
"36",
"3",
"0"
],
"Jar": "s3n://elasticmapreduce/samples/cloudburst/cloudburst.jar",
"MainClass" : "MAIN_CLASS",
"Properties": {
"Key1" : "Value1",
"Key2" : "Value2"
}
},
"Id": "s-OYPPAC4XPPUC",
"Name": "Elasticity Custom Jar Step",
"Status": {
"State": "COMPLETED",
"StateChangeReason": {
"Code": "ALL_STEPS_COMPLETED",
"Message": "Steps completed"
},
"Timeline": #{timeline}
}
}
]
}
JSON
end

let(:cluster_step_statuses) { Elasticity::ClusterStepStatus.from_aws_list_data(JSON.parse(aws_cluster_steps)) }

let(:expected_output) {
[
"EMR jobflow JOB ID failed, check Amazon EMR console and Hadoop logs for details (help: https://github.com/snowplow/snowplow/wiki/Troubleshooting-jobs-on-Elastic-MapReduce). Data files not archived.",
"JOB ID: TERMINATED [ALL_STEPS_COMPLETED] ~ 00:36:29 #{subject.get_timespan(cluster_status.ready_at, cluster_status.ended_at)}",
" - 1. Elasticity Custom Jar Step: COMPLETED ~ elapsed time n/a [ - #{cluster_status.ended_at}]"
]
.join("\n")
}

it { should respond_to(:get_failure_details).with(3).argument }

it 'should create a string containing a summary of the failure' do
expect(subject.get_failure_details(jobflow_id, cluster_status, cluster_step_statuses)).to eq(expected_output)
end
end
end
5 changes: 1 addition & 4 deletions 5-data-modeling/event-manifest-populator/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
lazy val root = project.in(file("."))
.settings(
name := "event-manifest-populator",
version := "0.1.1",
version := "0.1.2",
scalaVersion := "2.11.8"
)
.settings(BuildSettings.buildSettings)
Expand All @@ -40,6 +40,3 @@ lazy val root = project.in(file("."))
Dependencies.scalaCheck
)
)



Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ object BackpopulateJob {
val runs = getRuns(jobConfig)
runs.foreach { runId =>
println(s"Processing $runId")
val events = sc.textFile(s"s3a://${jobConfig.enrichedInBucket}run=$runId/part-*")
val events = sc.textFile(s"s3a://${jobConfig.enrichedInBucket}run=$runId/*")
events.map(lineToTriple).foreach { triple => store(triple, storage) }
}
}
Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
Release 115 Sigiriya (2019-07-17)
---------------------------------
EmrEtlRunner: update Contracts for get_failure_details (#4088)
EmrEtlRunner: make sure all steps are successfully submitted in case of a transient cluster (#4092)
EmrEtlRunner: bump to 0.34.3 (#4089)
Event Manifest Populator: bump to 0.1.2 (#4082)
Event Manifest Populator: remove part-* pattern (#4081)

Release 114 Polonnaruwa (2019-05-17)
------------------------------------
Beam Enrich: bump to 0.3.0 (#4061)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ limitations under the License.
[travis-image]: https://travis-ci.org/snowplow/snowplow.png?branch=master
[travis]: http://travis-ci.org/snowplow/snowplow

[release-image]: https://img.shields.io/badge/release-114_Polonnaruwa-orange.svg?style=flat
[release-image]: https://img.shields.io/badge/release-115_Sigiriya-orange.svg?style=flat
[releases]: https://github.com/snowplow/snowplow/releases

[license-image]: http://img.shields.io/badge/license-Apache--2-blue.svg?style=flat
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
r114-polonnaruwa
r115-sigiriya

0 comments on commit 07a4ef8

Please sign in to comment.