In [1]:
# Import PySpark
from pyspark.sql import SparkSession

#Create SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/30 22:52:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/11/30 22:52:58 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [2]:
# Data
records = [("50000.0#0#0#", "#"),("0@1000.0@", "@"), ("1$", "$"), ("1000.00^Test_string", "^"), 
           ("dog$cat^mouse", "^"), ("@0@1000.0@", "@")]

# Columns
columns = ["VALUES", "separator"]

# Create a spark dataframe
df_records = spark.createDataFrame(records).toDF(*columns)

# Show df
df_records.show()

                                                                                

+-------------------+---------+
|             VALUES|separator|
+-------------------+---------+
|       50000.0#0#0#|        #|
|          0@1000.0@|        @|
|                 1$|        $|
|1000.00^Test_string|        ^|
|      dog$cat^mouse|        ^|
|         @0@1000.0@|        @|
+-------------------+---------+



In [3]:
from pyspark.sql import functions as F

# Using the string split
df_records.createOrReplaceTempView("strings")


result = spark.sql("""
                        select VALUES, separator, 
                        length(values) as string_length,
                        (
                         CASE 
                         WHEN separator not in ('$','^') THEN SPLIT(VALUES, separator)
                         WHEN separator = '$' THEN array(split_part(VALUES, "$", 1), "")
                         WHEN separator = '^' THEN array(split_part(VALUES, "^", 1), split_part(VALUES, "^", 2))
                         END
                        ) AS VALUES_ARRAY
                        from strings;
                """)

# Show df
result.show(truncate=False)

+-------------------+---------+-------------+----------------------+
|VALUES             |separator|string_length|VALUES_ARRAY          |
+-------------------+---------+-------------+----------------------+
|50000.0#0#0#       |#        |12           |[50000.0, 0, 0, ]     |
|0@1000.0@          |@        |9            |[0, 1000.0, ]         |
|1$                 |$        |2            |[1, ]                 |
|1000.00^Test_string|^        |19           |[1000.00, Test_string]|
|dog$cat^mouse      |^        |13           |[dog$cat, mouse]      |
|@0@1000.0@         |@        |10           |[, 0, 1000.0, ]       |
+-------------------+---------+-------------+----------------------+



In [4]:
extra = result.withColumn("VALUES_FILTERED", F.udf(lambda fname: [x for x in fname if x != ""])("VALUES_ARRAY"))

extra.show(truncate=False)

+-------------------+---------+-------------+----------------------+----------------------+
|VALUES             |separator|string_length|VALUES_ARRAY          |VALUES_FILTERED       |
+-------------------+---------+-------------+----------------------+----------------------+
|50000.0#0#0#       |#        |12           |[50000.0, 0, 0, ]     |[50000.0, 0, 0]       |
|0@1000.0@          |@        |9            |[0, 1000.0, ]         |[0, 1000.0]           |
|1$                 |$        |2            |[1, ]                 |[1]                   |
|1000.00^Test_string|^        |19           |[1000.00, Test_string]|[1000.00, Test_string]|
|dog$cat^mouse      |^        |13           |[dog$cat, mouse]      |[dog$cat, mouse]      |
|@0@1000.0@         |@        |10           |[, 0, 1000.0, ]       |[0, 1000.0]           |
+-------------------+---------+-------------+----------------------+----------------------+

