/
SparkSession.cs
112 lines (97 loc) · 4.37 KB
/
SparkSession.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System;
using Microsoft.Spark.Interop.Ipc;
using Microsoft.Spark.Sql.Streaming;
namespace Microsoft.Spark.Sql
{
/// <summary>
/// The entry point to programming Spark with the Dataset and DataFrame API.
/// </summary>
public sealed class SparkSession : IDisposable, IJvmObjectReferenceProvider
{
private readonly JvmObjectReference _jvmObject;
private readonly Lazy<SparkContext> _sparkContext;
/// <summary>
/// Constructor for SparkSession.
/// </summary>
/// <param name="jvmObject">Reference to the JVM SparkSession object</param>
internal SparkSession(JvmObjectReference jvmObject)
{
_jvmObject = jvmObject;
_sparkContext = new Lazy<SparkContext>(
() => new SparkContext(
(JvmObjectReference)_jvmObject.Invoke("sparkContext")));
}
JvmObjectReference IJvmObjectReferenceProvider.Reference => _jvmObject;
/// <summary>
/// Returns SparkContext object associated with this SparkSession.
/// </summary>
public SparkContext SparkContext => _sparkContext.Value;
/// <summary>
/// Creates a Builder object for SparkSession.
/// </summary>
/// <returns>Builder object</returns>
public static Builder Builder() => new Builder();
/// <summary>
/// Synonym for Stop().
/// </summary>
public void Dispose()
{
Stop();
GC.SuppressFinalize(this);
}
/// <summary>
/// Start a new session with isolated SQL configurations, temporary tables, registered
/// functions are isolated, but sharing the underlying SparkContext and cached data.
/// </summary>
/// <remarks>
/// Other than the SparkContext, all shared state is initialized lazily.
/// This method will force the initialization of the shared state to ensure that parent
/// and child sessions are set up with the same shared state. If the underlying catalog
/// implementation is Hive, this will initialize the metastore, which may take some time.
/// </remarks>
/// <returns>New SparkSession object</returns>
public SparkSession NewSession() =>
new SparkSession((JvmObjectReference)_jvmObject.Invoke("newSession"));
/// <summary>
/// Returns the specified table/view as a DataFrame.
/// </summary>
/// <param name="tableName">Name of a table or view</param>
/// <returns>DataFrame object</returns>
public DataFrame Table(string tableName)
=> new DataFrame((JvmObjectReference)_jvmObject.Invoke("table", tableName));
/// <summary>
/// Executes a SQL query using Spark, returning the result as a DataFrame.
/// </summary>
/// <param name="sqlText">SQL query text</param>
/// <returns>DataFrame object</returns>
public DataFrame Sql(string sqlText)
=> new DataFrame((JvmObjectReference)_jvmObject.Invoke("sql", sqlText));
/// <summary>
/// Returns a DataFrameReader that can be used to read non-streaming data in
/// as a DataFrame.
/// </summary>
/// <returns>DataFrameReader object</returns>
public DataFrameReader Read() =>
new DataFrameReader((JvmObjectReference)_jvmObject.Invoke("read"));
/// <summary>
/// Returns a DataStreamReader that can be used to read streaming data in as a DataFrame.
/// </summary>
/// <returns>DataStreamReader object</returns>
public DataStreamReader ReadStream() =>
new DataStreamReader((JvmObjectReference)_jvmObject.Invoke("readStream"));
/// <summary>
/// Returns UDFRegistraion object with which user-defined functions (UDF) can
/// be registered.
/// </summary>
/// <returns>UDFRegistration object</returns>
public UdfRegistration Udf() =>
new UdfRegistration((JvmObjectReference)_jvmObject.Invoke("udf"));
/// <summary>
/// Stops the underlying SparkContext.
/// </summary>
public void Stop() => _jvmObject.Invoke("stop");
}
}