In [1]:
import sys, os

# Enable importing pysparky
sys.path.append(os.pardir)

In [2]:
import pyspark
from pyspark.sql import SparkSession, DataFrame, Column
from pyspark.sql import functions as F, types as T

print(pyspark.__version__)

spark = SparkSession.builder.getOrCreate()

3.5.2


24/10/17 15:21:36 WARN Utils: Your hostname, codespaces-0aafae resolves to a loopback address: 127.0.0.1; using 10.0.10.27 instead (on interface eth0)
24/10/17 15:21:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/17 15:21:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/10/17 15:21:38 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
from pysparky import spark_ext
from pysparky import functions as F_
from pysparky import enabler

from pysparky import transformations as te

In [4]:
import pandas as pd

data_pdf = pd.read_csv(
    "https://people.sc.fsu.edu/~jburkardt/data/csv/addresses.csv",
    names=["first_name", "last_name", "address", "region", "code", "postcode"],
)
data_sdf = spark.createDataFrame(data_pdf)

In [5]:
data_sdf.show()

                                                                                

+--------------------+---------+--------------------+-----------+----+--------+
|          first_name|last_name|             address|     region|code|postcode|
+--------------------+---------+--------------------+-----------+----+--------+
|                John|      Doe|   120 jefferson st.|  Riverside|  NJ|    8075|
|                Jack| McGinnis|        220 hobo Av.|      Phila|  PA|    9119|
|       John "Da Man"|   Repici|   120 Jefferson St.|  Riverside|  NJ|    8075|
|             Stephen|    Tyler|7452 Terrace "At ...|   SomeTown|  SD|   91234|
|                 NaN| Blankman|                 NaN|   SomeTown|  SD|     298|
|Joan "the bone", ...|      Jet| 9th, at Terrace plc|Desert City|  CO|     123|
+--------------------+---------+--------------------+-----------+----+--------+



In [6]:
criteria = {
    "first_name": F_.printable_only("first_name"),
    "last_name": F_.printable_only("last_name"),
    "address": F_.printable_only("address"),
    "region": F_.printable_only("region"),
    "code": [F_.two_character_only("code")],
    "postcode": F_.printable_only("postcode"),
}

In [7]:
def criteria_to_query(criteria):
    return {
        f"{key}_check": F_.condition_and(*enabler.ensure_list(value))
        for key, value in criteria.items()
    }

In [8]:
data_sdf.withColumns(criteria_to_query(criteria)).show()

+--------------------+---------+--------------------+-----------+----+--------+----------------+---------------+-------------+------------+----------+--------------+
|          first_name|last_name|             address|     region|code|postcode|first_name_check|last_name_check|address_check|region_check|code_check|postcode_check|
+--------------------+---------+--------------------+-----------+----+--------+----------------+---------------+-------------+------------+----------+--------------+
|                John|      Doe|   120 jefferson st.|  Riverside|  NJ|    8075|            true|           true|         true|        true|     false|          true|
|                Jack| McGinnis|        220 hobo Av.|      Phila|  PA|    9119|            true|           true|         true|        true|     false|          true|
|       John "Da Man"|   Repici|   120 Jefferson St.|  Riverside|  NJ|    8075|            true|           true|         true|        true|     false|          true|
|   

24/10/17 15:21:49 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [9]:
[F.col(column_name) == False for column_name in criteria_to_query(criteria).keys()]

[Column<'(first_name_check = false)'>,
 Column<'(last_name_check = false)'>,
 Column<'(address_check = false)'>,
 Column<'(region_check = false)'>,
 Column<'(code_check = false)'>,
 Column<'(postcode_check = false)'>]

In [12]:
te.filters(
    data_sdf.withColumns(criteria_to_query(criteria)),
    [F.col(column_name) == False for column_name in criteria_to_query(criteria).keys()],
    operator_="or",
).show()

+-------------+---------+-----------------+---------+----+--------+----------------+---------------+-------------+------------+----------+--------------+
|   first_name|last_name|          address|   region|code|postcode|first_name_check|last_name_check|address_check|region_check|code_check|postcode_check|
+-------------+---------+-----------------+---------+----+--------+----------------+---------------+-------------+------------+----------+--------------+
|         John|      Doe|120 jefferson st.|Riverside|  NJ|    8075|            true|           true|         true|        true|     false|          true|
|         Jack| McGinnis|     220 hobo Av.|    Phila|  PA|    9119|            true|           true|         true|        true|     false|          true|
|John "Da Man"|   Repici|120 Jefferson St.|Riverside|  NJ|    8075|            true|           true|         true|        true|     false|          true|
|          NaN| Blankman|              NaN| SomeTown|  SD|     298|         

In [13]:
te.filters(
    data_sdf.withColumns(criteria_to_query(criteria)),
    [F.col(column_name) == True for column_name in criteria_to_query(criteria).keys()],
    operator_="and",
).show()

+--------------------+---------+--------------------+-----------+----+--------+----------------+---------------+-------------+------------+----------+--------------+
|          first_name|last_name|             address|     region|code|postcode|first_name_check|last_name_check|address_check|region_check|code_check|postcode_check|
+--------------------+---------+--------------------+-----------+----+--------+----------------+---------------+-------------+------------+----------+--------------+
|             Stephen|    Tyler|7452 Terrace "At ...|   SomeTown|  SD|   91234|            true|           true|         true|        true|      true|          true|
|Joan "the bone", ...|      Jet| 9th, at Terrace plc|Desert City|  CO|     123|            true|           true|         true|        true|      true|          true|
+--------------------+---------+--------------------+-----------+----+--------+----------------+---------------+-------------+------------+----------+--------------+



In [61]:
list(criteria_to_query(criteria).keys())

['first_name_check',
 'last_name_check',
 'address_check',
 'region_check',
 'code_check',
 'postcode_check']

In [60]:
data_sdf.withColumns(criteria_to_query(criteria)).select(
    F.create_map(list(criteria_to_query(criteria).keys())).alias("map")
).show()

24/10/17 15:06:28 ERROR Executor: Exception in task 0.0 in stage 27.0 (TID 28)
org.apache.spark.SparkRuntimeException: [DUPLICATED_MAP_KEY] Duplicate map key true was found, please check the input data. If you want to remove the duplicated keys, you can set "spark.sql.mapKeyDedupPolicy" to "LAST_WIN" so that the key inserted at last takes precedence.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.duplicateMapKeyFoundError(QueryExecutionErrors.scala:1273)
	at org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder.put(ArrayBasedMapBuilder.scala:69)
	at org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder.putAll(ArrayBasedMapBuilder.scala:94)
	at org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder.from(ArrayBasedMapBuilder.scala:122)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.exec

Py4JJavaError: An error occurred while calling o574.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 27.0 failed 1 times, most recent failure: Lost task 0.0 in stage 27.0 (TID 28) (72b7081a-1653-4521-8022-c71449b0bb6e.internal.cloudapp.net executor driver): org.apache.spark.SparkRuntimeException: [DUPLICATED_MAP_KEY] Duplicate map key true was found, please check the input data. If you want to remove the duplicated keys, you can set "spark.sql.mapKeyDedupPolicy" to "LAST_WIN" so that the key inserted at last takes precedence.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.duplicateMapKeyFoundError(QueryExecutionErrors.scala:1273)
	at org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder.put(ArrayBasedMapBuilder.scala:69)
	at org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder.putAll(ArrayBasedMapBuilder.scala:94)
	at org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder.from(ArrayBasedMapBuilder.scala:122)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4334)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4324)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4322)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4322)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3539)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.spark.SparkRuntimeException: [DUPLICATED_MAP_KEY] Duplicate map key true was found, please check the input data. If you want to remove the duplicated keys, you can set "spark.sql.mapKeyDedupPolicy" to "LAST_WIN" so that the key inserted at last takes precedence.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.duplicateMapKeyFoundError(QueryExecutionErrors.scala:1273)
	at org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder.put(ArrayBasedMapBuilder.scala:69)
	at org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder.putAll(ArrayBasedMapBuilder.scala:94)
	at org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder.from(ArrayBasedMapBuilder.scala:122)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more


In [49]:
data_sdf.withColumns(
    {
        key: F_.condition_and(*enabler.ensure_list(value))
        for key, value in criteria.items()
    }
).show()

+----------+---------+-------+------+-----+--------+
|first_name|last_name|address|region| code|postcode|
+----------+---------+-------+------+-----+--------+
|      true|     true|   true|  true|false|    true|
|      true|     true|   true|  true|false|    true|
|      true|     true|   true|  true|false|    true|
|      true|     true|   true|  true| true|    true|
|      true|     true|   true|  true|false|    true|
|      true|     true|   true|  true| true|    true|
+----------+---------+-------+------+-----+--------+



In [51]:
data_sdf.select("code").collect()

[Row(code=' NJ'),
 Row(code=' PA'),
 Row(code=' NJ'),
 Row(code='SD'),
 Row(code=' SD'),
 Row(code='CO')]

In [22]:
import functools
from pysparky.transformations import transforms, apply_cols
from pysparky.schema_ext import filter_columns_by_datatype

upper_cols_partial = functools.partial(apply_cols, col_func=F.upper)

In [23]:
pipeline = (
    (
        upper_cols_partial,
        {"cols": data_sdf.schema.filter_columns_by_datatype(T.StringType)},
    ),
    (lambda sdf: sdf.withColumn("postcode_plus_1", F.col("postcode") + 1), {}),
)

In [24]:
data_sdf.transforms(pipeline).show()

+--------------------+---------+--------------------+-----------+----+--------+---------------+
|          first_name|last_name|             address|     region|code|postcode|postcode_plus_1|
+--------------------+---------+--------------------+-----------+----+--------+---------------+
|                John|      Doe|   120 jefferson st.|  Riverside|  NJ|    8075|           8076|
|                Jack| McGinnis|        220 hobo Av.|      Phila|  PA|    9119|           9120|
|       John "Da Man"|   Repici|   120 Jefferson St.|  Riverside|  NJ|    8075|           8076|
|             Stephen|    Tyler|7452 Terrace "At ...|   SomeTown|  SD|   91234|          91235|
|                 NaN| Blankman|                 NaN|   SomeTown|  SD|     298|            299|
|Joan "the bone", ...|      Jet| 9th, at Terrace plc|Desert City|  CO|     123|            124|
+--------------------+---------+--------------------+-----------+----+--------+---------------+

