Skip to content

Commit

Permalink
Merge pull request #98 from tubular/OPS-5046-support-spark-3.4.0-on-SPOK
Browse files Browse the repository at this point in the history
OPS-5046 support spark 3.4.0 on SPOK
  • Loading branch information
xuzeng012 committed Jun 26, 2023
2 parents 2c5d50e + f5a8595 commit 393d134
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 72 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Expand Up @@ -21,6 +21,7 @@ LABEL maintainer="dev@tubularlabs.com"
# Install Java 8
RUN apt-get update && apt-get install -y software-properties-common
RUN apt-add-repository 'deb http://security.debian.org/debian-security stretch/updates main'
RUN apt-add-repository 'deb http://deb.debian.org/debian/ sid main'
RUN apt-get update && apt-get install -y openjdk-8-jdk

# Python env
Expand Down
71 changes: 4 additions & 67 deletions sparkly/utils.py
Expand Up @@ -175,73 +175,10 @@ def parse_schema(schema):
...
sparkly.exceptions.UnsupportedDataType: Cannot parse type from string: "unsupported"
"""
field_type, args_string = re.match('(\w+)<?(.*)>?$', schema).groups()
args = _parse_args(args_string) if args_string else []

if field_type in ATOMIC_TYPES:
return ATOMIC_TYPES[field_type]()
elif field_type in COMPLEX_TYPES:
return COMPLEX_TYPES[field_type](*args)
else:
message = 'Cannot parse type from string: "{}"'.format(field_type)
raise UnsupportedDataType(message)


def _parse_args(args_string):
args = []
balance = 0
pos = 0
for i, ch in enumerate(args_string):
if ch == '<':
balance += 1
elif ch == '>':
balance -= 1
elif ch == ',' and balance == 0:
args.append(args_string[pos:i])
pos = i + 1

args.append(args_string[pos:])

return args


def _is_atomic_type(obj):
return inspect.isclass(obj) and issubclass(obj, T.AtomicType) and obj is not T.DecimalType


ATOMIC_TYPES = {
_type[1]().simpleString(): _type[1]
for _type in inspect.getmembers(T, _is_atomic_type)
}


def _init_map(*args):
return T.MapType(
keyType=parse_schema(args[0]),
valueType=parse_schema(args[1]),
)


def _init_struct(*args):
struct = T.StructType()
for item in args:
field_name, field_type = item.split(':', 1)
field_type = parse_schema(field_type)
struct.add(field_name, field_type)

return struct


def _init_array(*args):
return T.ArrayType(parse_schema(args[0]))


COMPLEX_TYPES = {
'map': _init_map,
'struct': _init_struct,
'array': _init_array,
}

try:
return T._parse_datatype_string(schema)
except Exception as e:
raise UnsupportedDataType(f'Cannot parse schema: {schema}: {e}')

def schema_has(t, required_fields):
"""Check whether a complex dataType has specific fields.
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/base.py
Expand Up @@ -18,6 +18,7 @@

from pyspark.sql.types import StringType

import pyspark
from sparkly import SparklySession
from sparkly.utils import absolute_path

Expand All @@ -26,7 +27,7 @@ class SparklyTestSession(SparklySession):
packages = [
'com.datastax.spark:spark-cassandra-connector_2.12:3.2.0',
'org.elasticsearch:elasticsearch-spark-30_2.12:7.17.8',
'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1',
'org.apache.spark:spark-sql-kafka-0-10_2.12:{}'.format(pyspark.__version__),
'mysql:mysql-connector-java:8.0.31',
'io.confluent:kafka-avro-serializer:3.0.1',
]
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/test_catalog.py
Expand Up @@ -108,11 +108,11 @@ def test_rename_table_non_default_db(self):
self.assertTrue(self.spark.catalog_ext.has_table('test_db.test_table'))
self.assertFalse(self.spark.catalog_ext.has_table('test_db.new_test_table'))

self.spark.catalog_ext.rename_table('test_db.test_table', 'new_test_table')
self.spark.catalog_ext.rename_table('test_db.test_table', 'test_db.new_test_table')

self.assertFalse(self.spark.catalog_ext.has_table('test_db.test_table'))
self.assertTrue(self.spark.catalog_ext.has_table('default.new_test_table'))
self.assertEqual(self.spark.table('default.new_test_table').count(), 2)
self.assertTrue(self.spark.catalog_ext.has_table('test_db.new_test_table'))
self.assertEqual(self.spark.table('test_db.new_test_table').count(), 2)

def test_get_table_properties(self):
properties = self.spark.catalog_ext.get_table_properties('test_table')
Expand Down
10 changes: 9 additions & 1 deletion tox.ini
Expand Up @@ -15,7 +15,7 @@
#

[tox]
envlist = spark32,spark33,no_extras,docs
envlist = spark32,spark33,spark34,no_extras,docs

[testenv:spark32]
commands = py.test --cov=sparkly --cov-report term-missing tests/integration tests/unit
Expand All @@ -33,6 +33,14 @@ deps =
-rrequirements_extras.txt
pyspark==3.3.1

[testenv:spark34]
commands = py.test --cov=sparkly --cov-report term-missing tests/integration tests/unit
deps =
-rrequirements.txt
-rrequirements_dev.txt
-rrequirements_extras.txt
pyspark==3.4.0

[testenv:no_extras]
commands = py.test tests/no_extras
deps =
Expand Down

0 comments on commit 393d134

Please sign in to comment.