In [2]:
%load_ext autoreload
%autoreload 2

from spalah.dataframe import slice_dataframe
from pyspark.sql import SparkSession

In [3]:
spark_jars = (
    "org.apache.spark:spark-avro_2.12:3.2.1"
    ",io.delta:delta-core_2.12:2.1.0"
    ",com.databricks:spark-xml_2.12:0.14.0"
)
spark = (
    SparkSession.builder.master("local[*]")
    .appName("MyApp")
    .config("spark.jars.packages", spark_jars)
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .config("spark.sql.shuffle.partitions", "1")
    .config("spark.databricks.delta.snapshotPartitions", "2")
    .config("spark.ui.showConsoleProgress", "false")
    .config("spark.ui.enabled", "false")
    .config("spark.ui.dagGraph.retainedRootRDDs", "1")
    .config("spark.ui.retainedJobs", "1")
    .config("spark.ui.retainedStages", "1")
    .config("spark.ui.retainedTasks", "1")
    .config("spark.sql.ui.retainedExecutions", "1")
    .config("spark.worker.ui.retainedExecutors", "1")
    .config("spark.worker.ui.retainedDrivers", "1")
    .config("spark.driver.memory", "2g")
    .config("spark.driver.extraJavaOptions", "-Ddelta.log.cacheSize=3")
    .config(
        "spark.driver.extraJavaOptions",
        "-XX:+CMSClassUnloadingEnabled -XX:+UseCompressedOops",
    )
).getOrCreate()

23/05/20 18:18:17 WARN Utils: Your hostname, Alexandrs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.68.126 instead (on interface en0)
23/05/20 18:18:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/Users/alexandrvolok/repos/.envs/psdt/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/alexandrvolok/.ivy2/cache
The jars for the packages stored in: /Users/alexandrvolok/.ivy2/jars
org.apache.spark#spark-avro_2.12 added as a dependency
io.delta#delta-core_2.12 added as a dependency
com.databricks#spark-xml_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2e64133a-4349-4e11-aefc-a5c9b3a1fb29;1.0
	confs: [default]
	found org.apache.spark#spark-avro_2.12;3.2.1 in spark-list
	found org.tukaani#xz;1.8 in spark-list
	found org.spark-project.spark#unused;1.0.0 in spark-list
	found io.delta#delta-core_2.12;2.1.0 in central
	found io.delta#delta-storage;2.1.0 in central
	found org.antlr#antlr4-runtime;4.8 in spark-list
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
	found com.databricks#spark-xml_2.12;0.14.0 in central
	found commons-io#commons-io;2.8.0 in central
	found org.glassfish.jaxb#txw2;2.3.4 in central
	found org.apache.ws.xmlschema#xmlschema-core;2.2.5 in central
:: resolution 

23/05/20 18:18:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### SchemaComparer

In [4]:
df_source = spark.sql(
    'SELECT 1 as ID, "John" AS Name, struct("line1" AS Line1, "line2" AS Line2) AS Address'
)
df_source.printSchema()

df_target = spark.sql(
    'SELECT "a" as ID, "John" AS name, struct("line1" AS Line1) AS Address'
)
df_target.printSchema()




root
 |-- ID: integer (nullable = false)
 |-- Name: string (nullable = false)
 |-- Address: struct (nullable = false)
 |    |-- Line1: string (nullable = false)
 |    |-- Line2: string (nullable = false)

root
 |-- ID: string (nullable = false)
 |-- name: string (nullable = false)
 |-- Address: struct (nullable = false)
 |    |-- Line1: string (nullable = false)



In [12]:
schema_comparer = SchemaComparer(
    source_schema = df_source.schema,
    target_schema = df_target.schema
)

schema_comparer.compare()

schema_comparer.matched
schema_comparer.not_matched

[NotMatchedColumn(name='name', data_type='StringType', reason="The column exists in source and target schemas but it's name is case-mismatched"),
 NotMatchedColumn(name='ID', data_type='IntegerType <=> StringType', reason='The column exists in source and target schemas but it is not matched by a data type'),
 NotMatchedColumn(name='Address.Line2', data_type='StringType', reason='The column exists only in the source schema')]

### flatten_schema

In [9]:
from spalah.dataframe import flatten_schema

In [3]:
df_complex_schema = spark.sql(
    'SELECT 1 as ID, "John" AS Name, struct("line1" AS Line1, "line2" AS Line2) AS Address'
)
df_source.printSchema()

NameError: name 'df_source' is not defined

In [11]:
flatten_schema(
    schema=df_complex_schema.schema,
    include_datatype=True
)

[('ID', 'IntegerType'),
 ('Name', 'StringType'),
 ('Address.Line1', 'StringType'),
 ('Address.Line2', 'StringType')]

### script_dataframe

In [5]:
df = spark.sql(
    'SELECT 1 as ID, "John" AS Name, struct("line1" AS Line1, "line2" AS Line2) AS Address'
)
df.printSchema()

root
 |-- ID: integer (nullable = false)
 |-- Name: string (nullable = false)
 |-- Address: struct (nullable = false)
 |    |-- Line1: string (nullable = false)
 |    |-- Line2: string (nullable = false)



In [12]:
from spalah.dataframe import script_dataframe

script = script_dataframe(df)

print(script)

from pyspark.sql import Row
import datetime
from decimal import Decimal
from pyspark.sql.types import *

# Scripted data and schema:
__data = [Row(ID=1, Name='John', Address=Row(Line1='line1', Line2='line2'))]

__schema = {'type': 'struct', 'fields': [{'name': 'ID', 'type': 'integer', 'nullable': False, 'metadata': {}}, {'name': 'Name', 'type': 'string', 'nullable': False, 'metadata': {}}, {'name': 'Address', 'type': {'type': 'struct', 'fields': [{'name': 'Line1', 'type': 'string', 'nullable': False, 'metadata': {}}, {'name': 'Line2', 'type': 'string', 'nullable': False, 'metadata': {}}]}, 'nullable': False, 'metadata': {}}]}

outcome_dataframe = spark.createDataFrame(__data, StructType.fromJson(__schema))


### slice_dataframe

In [16]:
from spalah.dataframe import slice_dataframe

df = spark.sql(
    'SELECT 1 as ID, "John" AS Name, struct("line1" AS Line1, "line2" AS Line2) AS Address'
)
df.printSchema()

root
 |-- ID: integer (nullable = false)
 |-- Name: string (nullable = false)
 |-- Address: struct (nullable = false)
 |    |-- Line1: string (nullable = false)
 |    |-- Line2: string (nullable = false)



In [18]:
df_result = slice_dataframe(
    input_dataframe=df,
    columns_to_include=["Name", "Address"],
    columns_to_exclude=["Address.Line2"]
)

df_result.printSchema()

root
 |-- Name: string (nullable = false)
 |-- Address: struct (nullable = false)
 |    |-- Line1: string (nullable = false)



In [20]:
# alternatively, excluded columns can be nullified instead of removed
df_result = slice_dataframe(
    input_dataframe=df,
    columns_to_include=["Name", "Address"],
    columns_to_exclude=["Address.Line2"],
    nullify_only=True
)

df_result.show()

+----+----+-------------+
|  ID|Name|      Address|
+----+----+-------------+
|null|John|{line1, null}|
+----+----+-------------+



### DeltaProperty

In [24]:
from spalah.dataset import DeltaProperty

df = spark.sql(
    'SELECT 1 as ID, "John" AS Name, struct("line1" AS Line1, "line2" AS Line2) AS Address'
)

df.write.format("delta").mode("overwrite").save("/tmp/nested_schema_dataset")

In [25]:
dp = DeltaProperty(table_path="/tmp/nested_schema_dataset")
dp.properties

{}

In [26]:
dp.properties = {
    "delta.logRetentionDuration": "interval 10 days",
    "delta.deletedFileRetentionDuration": "interval 15 days"
}

2023-05-20 18:34:06,325 INFO      Applying table properties on 'delta.`/tmp/nested_schema_dataset`':
2023-05-20 18:34:06,326 INFO      Checking if 'delta.logRetentionDuration = interval 10 days' is set on delta.`/tmp/nested_schema_dataset`
2023-05-20 18:34:06,525 INFO      The property has been set
2023-05-20 18:34:06,525 INFO      Checking if 'delta.deletedFileRetentionDuration = interval 15 days' is set on delta.`/tmp/nested_schema_dataset`
2023-05-20 18:34:06,746 INFO      The property has been set


In [27]:
dp.check_constraints = {'id_is_not_null': 'id is not null'} 

2023-05-20 18:34:08,315 INFO      Applying check constraints on 'delta.`/tmp/nested_schema_dataset`':
2023-05-20 18:34:08,315 INFO      Checking if constraint 'id_is_not_null' was already set on delta.`/tmp/nested_schema_dataset`
2023-05-20 18:34:08,621 INFO      The constraint id_is_not_null has been successfully added to 'delta.`/tmp/nested_schema_dataset`'


In [28]:
from spalah.dataset import DeltaProperty

dp = DeltaProperty(table_path="/tmp/nested_schema_dataset")
dp.keep_existing_check_constraints = True

dp.check_constraints = {'Name_is_not_null': 'Name is not null'} 

2023-05-20 18:34:10,051 INFO      Applying check constraints on 'delta.`/tmp/nested_schema_dataset`':
2023-05-20 18:34:10,051 INFO      Checking if constraint 'name_is_not_null' was already set on delta.`/tmp/nested_schema_dataset`
2023-05-20 18:34:10,378 INFO      The constraint name_is_not_null has been successfully added to 'delta.`/tmp/nested_schema_dataset`'


In [29]:
dp.check_constraints

{'id_is_not_null': 'id is not null', 'name_is_not_null': 'Name is not null'}

In [31]:
spark.sql("insert into delta.`/tmp/nested_schema_dataset` (ID, Name, Address) VALUES (NULL, 'Alex', NULL) ")

23/05/20 18:34:55 ERROR Utils: Aborting task
org.apache.spark.sql.delta.schema.DeltaInvariantViolationException: CHECK constraint id_is_not_null (id IS NOT NULL) violated by row with values:
 - id : null
	at org.apache.spark.sql.delta.schema.DeltaInvariantViolationException$.apply(InvariantViolationException.scala:60)
	at org.apache.spark.sql.delta.schema.DeltaInvariantViolationException$.apply(InvariantViolationException.scala:70)
	at org.apache.spark.sql.delta.schema.DeltaInvariantViolationException.apply(InvariantViolationException.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.delta.constraints.DeltaInvariantCheckerExec.$anonfun$doExecute$3(DeltaInvariantCheckerExec.scala:85)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:92)
	at org.apache.spark.sql.execution.da

Py4JJavaError: An error occurred while calling o59.sql.
: org.apache.spark.sql.delta.schema.DeltaInvariantViolationException: CHECK constraint id_is_not_null (id IS NOT NULL) violated by row with values:
 - id : null
	at org.apache.spark.sql.delta.schema.DeltaInvariantViolationException$.apply(InvariantViolationException.scala:60)
	at org.apache.spark.sql.delta.schema.DeltaInvariantViolationException$.apply(InvariantViolationException.scala:70)
	at org.apache.spark.sql.delta.schema.DeltaInvariantViolationException.apply(InvariantViolationException.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.delta.constraints.DeltaInvariantCheckerExec.$anonfun$doExecute$3(DeltaInvariantCheckerExec.scala:85)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:92)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:331)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:338)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$21(FileFormatWriter.scala:256)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
