Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
tomvanbussel committed May 2, 2024
1 parent fae8f70 commit 6ea0c20
Show file tree
Hide file tree
Showing 30 changed files with 10,034 additions and 0 deletions.
58 changes: 58 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import java.nio.file.Files
import Mima._
import Unidoc._

import sbtprotoc.ProtocPlugin.autoImport._

// Scala versions
val scala212 = "2.12.18"
val scala213 = "2.13.13"
Expand All @@ -38,6 +40,8 @@ val LATEST_RELEASED_SPARK_VERSION = "3.5.0"
val SPARK_MASTER_VERSION = "4.0.0-SNAPSHOT"
val sparkVersion = settingKey[String]("Spark version")
spark / sparkVersion := getSparkVersion()
connectClient / sparkVersion := getSparkVersion()
connectServer / sparkVersion := getSparkVersion()

// Dependent library versions
val defaultSparkVersion = LATEST_RELEASED_SPARK_VERSION
Expand All @@ -57,6 +61,9 @@ val hadoopVersionForHive2 = "2.7.2"
val hive2Version = "2.3.3"
val tezVersionForHive2 = "0.8.4"

val protoVersion = "3.25.1"
val grpcVersion = "1.62.2"

scalaVersion := default_scala_version.value

// crossScalaVersions must be set to Nil on the root project
Expand Down Expand Up @@ -180,6 +187,57 @@ def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match {
)
}

lazy val connectCommon = (project in file("connect/common"))
.settings(
name := "delta-connect-common",
commonSettings,
releaseSettings,
libraryDependencies ++= Seq(
"io.grpc" % "protoc-gen-grpc-java" % grpcVersion asProtocPlugin(),
"io.grpc" % "grpc-protobuf" % grpcVersion,
"io.grpc" % "grpc-stub" % grpcVersion,
"com.google.protobuf" % "protobuf-java" % protoVersion % "protobuf",
"com.google.protobuf" % "protobuf-java" % protoVersion,
"javax.annotation" % "javax.annotation-api" % "1.3.2",
),
PB.protocVersion := protoVersion,
Compile / PB.targets := Seq(
PB.gens.java -> (Compile / sourceManaged).value,
PB.gens.plugin("grpc-java") -> (Compile / sourceManaged).value
),
)

lazy val connectClient = (project in file("connect/client"))
.dependsOn(connectCommon)
// .dependsOn(spark % "compile->compile;test->test;provided->provided")
.settings(
name := "delta-connect-client",
commonSettings,
releaseSettings,
crossSparkSettings(),
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-connect-client-jvm" % sparkVersion.value % "provided",

// Test deps
// "org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.apache.spark" %% "spark-connect-client-jvm" % sparkVersion.value % "test" classifier "tests",
),
)

lazy val connectServer = (project in file("connect/server"))
.dependsOn(connectCommon)
.dependsOn(spark)
.dependsOn(spark % "compile->compile;test->test;provided->provided")
.settings(
name := "delta-connect-server",
commonSettings,
releaseSettings,
crossSparkSettings(),
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-connect" % sparkVersion.value % "provided",
),
)

lazy val spark = (project in file("spark"))
.dependsOn(storage)
.enablePlugins(Antlr4Plugin)
Expand Down
136 changes: 136 additions & 0 deletions connect/client/src/main/scala/io/delta/tables/DeltaColumnBuilder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.tables

import org.apache.spark.annotation.Evolving
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.parser.DataTypeParser
import org.apache.spark.sql.types.{DataType, MetadataBuilder, StructField}

/**
* :: Evolving ::
*
* Builder to specify a table column.
*
* See [[DeltaTableBuilder]] for examples.
*
* @since 2.5.0
*/
@Evolving
class DeltaColumnBuilder private[tables](private val colName: String) {
private var dataType: DataType = _
private var nullable: Boolean = true
private var generationExpr: Option[String] = None
private var comment: Option[String] = None

/**
* :: Evolving ::
*
* Specify the column data type.
*
* @param dataType string column data type
* @since 2.5.0
*/
@Evolving
def dataType(dataType: String): DeltaColumnBuilder = {
this.dataType = DataTypeParser.parseDataType(dataType)
this
}

/**
* :: Evolving ::
*
* Specify the column data type.
*
* @param dataType DataType column data type
* @since 2.5.0
*/
@Evolving
def dataType(dataType: DataType): DeltaColumnBuilder = {
this.dataType = dataType
this
}

/**
* :: Evolving ::
*
* Specify whether the column can be null.
*
* @param nullable boolean whether the column can be null or not.
* @since 2.5.0
*/
@Evolving
def nullable(nullable: Boolean): DeltaColumnBuilder = {
this.nullable = nullable
this
}

/**
* :: Evolving ::
*
* Specify a expression if the column is always generated as a function of other columns.
*
* @param expr string the the generation expression
* @since 2.5.0
*/
@Evolving
def generatedAlwaysAs(expr: String): DeltaColumnBuilder = {
this.generationExpr = Option(expr)
this
}

/**
* :: Evolving ::
*
* Specify a column comment.
*
* @param comment string column description
* @since 2.5.0
*/
@Evolving
def comment(comment: String): DeltaColumnBuilder = {
this.comment = Option(comment)
this
}

/**
* :: Evolving ::
*
* Build the column as a structField.
*
* @since 2.5.0
*/
@Evolving
def build(): StructField = {
val metadataBuilder = new MetadataBuilder()
if (generationExpr.nonEmpty) {
metadataBuilder.putString("delta.generationExpression", generationExpr.get)
}
if (comment.nonEmpty) {
metadataBuilder.putString("comment", comment.get)
}
val fieldMetadata = metadataBuilder.build()
if (dataType == null) {
// throw new AnalysisException(s"The data type of the column $colName is not provided")
}
StructField(
colName,
dataType,
nullable = nullable,
metadata = fieldMetadata)
}
}
Loading

0 comments on commit 6ea0c20

Please sign in to comment.