-
Notifications
You must be signed in to change notification settings - Fork 28k
/
broadcast.R
84 lines (76 loc) · 2.88 KB
/
broadcast.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# S4 class representing Broadcast variables
# Hidden environment that holds values for broadcast variables
# This will not be serialized / shipped by default
.broadcastNames <- new.env()
.broadcastValues <- new.env()
.broadcastIdToName <- new.env()
# S4 class that represents a Broadcast variable
#
# Broadcast variables can be created using the broadcast
# function from a \code{SparkContext}.
#
# @rdname broadcast-class
# @seealso broadcast
#
# @param id Id of the backing Spark broadcast variable
setClass("Broadcast", slots = list(id = "character"))
# @rdname broadcast-class
# @param value Value of the broadcast variable
# @param jBroadcastRef reference to the backing Java broadcast object
# @param objName name of broadcasted object
Broadcast <- function(id, value, jBroadcastRef, objName) {
.broadcastValues[[id]] <- value
.broadcastNames[[as.character(objName)]] <- jBroadcastRef
.broadcastIdToName[[id]] <- as.character(objName)
new("Broadcast", id = id)
}
# @description
# \code{value} can be used to get the value of a broadcast variable inside
# a distributed function.
#
# @param bcast The broadcast variable to get
# @rdname broadcast
setMethod("value",
signature(bcast = "Broadcast"),
function(bcast) {
if (exists(bcast@id, envir = .broadcastValues)) {
get(bcast@id, envir = .broadcastValues)
} else {
NULL
}
})
# Internal function to set values of a broadcast variable.
#
# This function is used internally by Spark to set the value of a broadcast
# variable on workers. Not intended for use outside the package.
#
# @rdname broadcast-internal
# @seealso broadcast, value
# @param bcastId The id of broadcast variable to set
# @param value The value to be set
setBroadcastValue <- function(bcastId, value) {
bcastIdStr <- as.character(bcastId)
.broadcastValues[[bcastIdStr]] <- value
}
# Helper function to clear the list of broadcast variables we know about
# Should be called when the SparkR JVM backend is shutdown
clearBroadcastVariables <- function() {
bcasts <- ls(.broadcastNames)
rm(list = bcasts, envir = .broadcastNames)
}