Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
03b7939
Adding section for UDF serialization
Niharikadutta Apr 20, 2020
4ef693d
removing guides from master
Niharikadutta Apr 20, 2020
81145ca
Merge latest from master
Niharikadutta May 6, 2020
e4b81af
merging latest from master
Niharikadutta May 7, 2020
4c32173
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 2, 2020
4987a09
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 14, 2020
ca9612e
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 16, 2020
20a3313
Removing ThreadStatic for JvmBroadcastRegistry
Niharikadutta Jun 18, 2020
af0ba4d
Making member threadlocal
Niharikadutta Jun 18, 2020
f581c86
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 20, 2020
5442bf0
Merge branch 'master' into BroadcastBugFix
Niharikadutta Jun 20, 2020
e5becc8
Fixing UDF null reference issue
Niharikadutta Jun 20, 2020
bff1cdb
Adding test to define udfs in separate threads
Niharikadutta Jun 21, 2020
4596776
Joining threads
Niharikadutta Jun 21, 2020
bfb6553
Trying to catch exception thrown at UDF definition
Niharikadutta Jun 22, 2020
9f69bbf
Merge branch 'master' into BroadcastBugFix
imback82 Jun 22, 2020
330ce7f
Final test
Niharikadutta Jun 22, 2020
3235da7
Merge branch 'BroadcastBugFix' of github.com:Niharikadutta/spark into…
Niharikadutta Jun 22, 2020
f7830db
missed changes
Niharikadutta Jun 22, 2020
4a3841f
PR comment changes
Niharikadutta Jun 22, 2020
1b82cdc
PR review changes
Niharikadutta Jun 22, 2020
4923e74
PR review changes
Niharikadutta Jun 22, 2020
7c9194e
PR review comment
Niharikadutta Jun 22, 2020
0ca1642
Update src/csharp/Microsoft.Spark/Broadcast.cs
Niharikadutta Jun 22, 2020
6242ac3
PR comment
Niharikadutta Jun 22, 2020
fa88376
Merge branch 'BroadcastBugFix' of github.com:Niharikadutta/spark into…
Niharikadutta Jun 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfSimpleTypesTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using Xunit;
Expand Down Expand Up @@ -166,5 +167,29 @@ public void TestUdfWithReturnAsTimestampType()
}
}
}

/// <summary>
/// Test to validate UDFs defined in separate threads work.
/// </summary>
[Fact]
public void TestUdfWithMultipleThreads()
{
try
{
void DefineUdf() => Udf<string, string>(str => str);

// Define a UDF in the main thread.
Udf<string, string>(str => str);

// Verify a UDF can be defined in a separate thread.
Thread t = new Thread(DefineUdf);
t.Start();
t.Join();
}
catch (Exception)
{
Assert.True(false);
}
}
}
}
13 changes: 6 additions & 7 deletions src/csharp/Microsoft.Spark/Broadcast.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.IO;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using System.Threading;
using Microsoft.Spark.Interop;
using Microsoft.Spark.Interop.Ipc;
using Microsoft.Spark.Services;
Expand Down Expand Up @@ -261,28 +262,26 @@ internal static void Remove(long bid)
/// </summary>
internal static class JvmBroadcastRegistry
{
[ThreadStatic]
private static readonly List<JvmObjectReference> s_jvmBroadcastVariables =
new List<JvmObjectReference>();
private static ThreadLocal<List<JvmObjectReference>> s_jvmBroadcastVariables =
new ThreadLocal<List<JvmObjectReference>>(() => new List<JvmObjectReference>());

/// <summary>
/// Adds a JVMObjectReference object of type <see cref="Broadcast{T}"/> to the list.
/// </summary>
/// <param name="broadcastJvmObject">JVMObjectReference of the Broadcast variable</param>
internal static void Add(JvmObjectReference broadcastJvmObject) =>
s_jvmBroadcastVariables.Add(broadcastJvmObject);
s_jvmBroadcastVariables.Value.Add(broadcastJvmObject);

/// <summary>
/// Clears s_jvmBroadcastVariables of all the JVMObjectReference objects of type
/// <see cref="Broadcast{T}"/>.
/// </summary>
internal static void Clear() => s_jvmBroadcastVariables.Clear();
internal static void Clear() => s_jvmBroadcastVariables.Value.Clear();

/// <summary>
/// Returns the static member s_jvmBroadcastVariables.
/// </summary>
/// <returns>A list of all broadcast objects of type <see cref="JvmObjectReference"/></returns>
internal static List<JvmObjectReference> GetAll() => s_jvmBroadcastVariables;
internal static List<JvmObjectReference> GetAll() => s_jvmBroadcastVariables.Value;
}
}