In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.master("local[2]") \
.appName("StringOps") \
.config("spark.executor.memory","4g") \
.config("spark.driver.memory","2g") \
.getOrCreate()

In [3]:
simple_df = spark.read \
.option("header","True") \
.option("inferSchema","True") \
.option("sep",",") \
.csv("C:\\Users\\Muhammed\\Desktop\\Spark\\8\\simple_dirty_data.txt")

In [4]:
simple_df.show()

+------+---------+---+--------+-----------+-----------+-----------+--------------------+
|sirano|     isim|yas|cinsiyet|     meslek|      sehir|aylik_gelir|            mal_mulk|
+------+---------+---+--------+-----------+-----------+-----------+--------------------+
|     1|    Cemal| 35|       E|       Isci|     Ankara|     3500.0|               araba|
|     2|   ceyda | 42|       K|      Memur|    Kayseri|     4200.0|            araba|ev|
|     3|    Timur| 30|    null|   Müzüsyen|Istanbul   |     9000.0|     araba|ev|yazlık|
|     4|   Burcu | 29|       K|Pazarlamacı|     Ankara|     4200.0|               araba|
|     5|  Yasemin| 23|       K|Pazarlamaci|      Bursa|     4800.0|               araba|
|     6|      Ali| 33|       E|      Memur|     Ankara|     4250.0|                  ev|
|     7|    Dilek| 29|       K|Pazarlamaci|   Istanbul|     7300.0|        araba|yazlık|
|     8|    Murat| 31|       E|   Müzüsyen|   Istanbul|    12000.0|araba|ev|dükkan|y...|
|     9|    Ahmet| 33

In [5]:
from pyspark.sql.functions import *

# Concat

In [6]:
df_concat = simple_df.withColumn("aylik_gelir_format", concat(col("meslek"), lit(" - "), col("sehir")))
df_concat.show(n=5, truncate=False)

+------+-------+---+--------+-----------+-----------+-----------+---------------+------------------------+
|sirano|isim   |yas|cinsiyet|meslek     |sehir      |aylik_gelir|mal_mulk       |aylik_gelir_format      |
+------+-------+---+--------+-----------+-----------+-----------+---------------+------------------------+
|1     |Cemal  |35 |E       |Isci       |Ankara     |3500.0     |araba          |Isci - Ankara           |
|2     |ceyda  |42 |K       |Memur      |Kayseri    |4200.0     |araba|ev       |Memur - Kayseri         |
|3     |Timur  |30 |null    |Müzüsyen   |Istanbul   |9000.0     |araba|ev|yazlık|Müzüsyen - Istanbul     |
|4     |Burcu  |29 |K       |Pazarlamacı|    Ankara |4200.0     |araba          |Pazarlamacı -     Ankara|
|5     |Yasemin|23 |K       |Pazarlamaci|Bursa      |4800.0     |araba          |Pazarlamaci - Bursa     |
+------+-------+---+--------+-----------+-----------+-----------+---------------+------------------------+
only showing top 5 rows



# Number Format

In [7]:
df_number_format = simple_df.withColumn("aylik_gelir_format", format_number(col("aylik_gelir"),2))
df_number_format.show(n=5, truncate=False)

+------+-------+---+--------+-----------+-----------+-----------+---------------+------------------+
|sirano|isim   |yas|cinsiyet|meslek     |sehir      |aylik_gelir|mal_mulk       |aylik_gelir_format|
+------+-------+---+--------+-----------+-----------+-----------+---------------+------------------+
|1     |Cemal  |35 |E       |Isci       |Ankara     |3500.0     |araba          |3,500.00          |
|2     |ceyda  |42 |K       |Memur      |Kayseri    |4200.0     |araba|ev       |4,200.00          |
|3     |Timur  |30 |null    |Müzüsyen   |Istanbul   |9000.0     |araba|ev|yazlık|9,000.00          |
|4     |Burcu  |29 |K       |Pazarlamacı|    Ankara |4200.0     |araba          |4,200.00          |
|5     |Yasemin|23 |K       |Pazarlamaci|Bursa      |4800.0     |araba          |4,800.00          |
+------+-------+---+--------+-----------+-----------+-----------+---------------+------------------+
only showing top 5 rows



# lower, initcap, length

In [8]:
df_lower = simple_df \
.withColumn("meslek_lower", lower(col("meslek"))) \
.withColumn("isim_initcap", initcap(col("isim"))) \
.withColumn("sehir_length", length(col("sehir")))

df_lower.show(n=5, truncate=False)

+------+-------+---+--------+-----------+-----------+-----------+---------------+------------+------------+------------+
|sirano|isim   |yas|cinsiyet|meslek     |sehir      |aylik_gelir|mal_mulk       |meslek_lower|isim_initcap|sehir_length|
+------+-------+---+--------+-----------+-----------+-----------+---------------+------------+------------+------------+
|1     |Cemal  |35 |E       |Isci       |Ankara     |3500.0     |araba          |isci        |Cemal       |6           |
|2     |ceyda  |42 |K       |Memur      |Kayseri    |4200.0     |araba|ev       |memur       |Ceyda       |7           |
|3     |Timur  |30 |null    |Müzüsyen   |Istanbul   |9000.0     |araba|ev|yazlık|müzüsyen    |Timur       |11          |
|4     |Burcu  |29 |K       |Pazarlamacı|    Ankara |4200.0     |araba          |pazarlamacı |Burcu       |10          |
|5     |Yasemin|23 |K       |Pazarlamaci|Bursa      |4800.0     |araba          |pazarlamaci |Yasemin     |5           |
+------+-------+---+--------+---

# trim

In [12]:
df_trim = simple_df \
.withColumn("sehir_rtrim", rtrim(col("sehir"))) \
.withColumn("sehir_ltrim", ltrim(col("sehir"))) \
.withColumn("sehir_trim", trim(col("sehir"))) \
.withColumn("sehir_length", length(col("sehir_trim")))

df_trim.show(n=5, truncate=False)

+------+-------+---+--------+-----------+-----------+-----------+---------------+-----------+-----------+----------+------------+
|sirano|isim   |yas|cinsiyet|meslek     |sehir      |aylik_gelir|mal_mulk       |sehir_rtrim|sehir_ltrim|sehir_trim|sehir_length|
+------+-------+---+--------+-----------+-----------+-----------+---------------+-----------+-----------+----------+------------+
|1     |Cemal  |35 |E       |Isci       |Ankara     |3500.0     |araba          |Ankara     |Ankara     |Ankara    |6           |
|2     |ceyda  |42 |K       |Memur      |Kayseri    |4200.0     |araba|ev       |Kayseri    |Kayseri    |Kayseri   |7           |
|3     |Timur  |30 |null    |Müzüsyen   |Istanbul   |9000.0     |araba|ev|yazlık|Istanbul   |Istanbul   |Istanbul  |8           |
|4     |Burcu  |29 |K       |Pazarlamacı|    Ankara |4200.0     |araba          |    Ankara |Ankara     |Ankara    |6           |
|5     |Yasemin|23 |K       |Pazarlamaci|Bursa      |4800.0     |araba          |Bursa    

# replace, split

In [13]:

df_replace = simple_df \
.withColumn("sehir_ist", regexp_replace(col("sehir"), "Ist", "İST")) \
.withColumn("mal_mulk_split", split(col("mal_mulk"), "\\|")) \
.withColumn("mal_mulk_ilk_eleman", col("mal_mulk_split")[0])

df_replace.show(n=5, truncate=False)

+------+-------+---+--------+-----------+-----------+-----------+---------------+-----------+-------------------+-------------------+
|sirano|isim   |yas|cinsiyet|meslek     |sehir      |aylik_gelir|mal_mulk       |sehir_ist  |mal_mulk_split     |mal_mulk_ilk_eleman|
+------+-------+---+--------+-----------+-----------+-----------+---------------+-----------+-------------------+-------------------+
|1     |Cemal  |35 |E       |Isci       |Ankara     |3500.0     |araba          |Ankara     |[araba]            |araba              |
|2     |ceyda  |42 |K       |Memur      |Kayseri    |4200.0     |araba|ev       |Kayseri    |[araba, ev]        |araba              |
|3     |Timur  |30 |null    |Müzüsyen   |Istanbul   |9000.0     |araba|ev|yazlık|İSTanbul   |[araba, ev, yazlık]|araba              |
|4     |Burcu  |29 |K       |Pazarlamacı|    Ankara |4200.0     |araba          |    Ankara |[araba]            |araba              |
|5     |Yasemin|23 |K       |Pazarlamaci|Bursa      |4800.0   

In [14]:
df_replace.printSchema()

root
 |-- sirano: integer (nullable = true)
 |-- isim: string (nullable = true)
 |-- yas: integer (nullable = true)
 |-- cinsiyet: string (nullable = true)
 |-- meslek: string (nullable = true)
 |-- sehir: string (nullable = true)
 |-- aylik_gelir: double (nullable = true)
 |-- mal_mulk: string (nullable = true)
 |-- sehir_ist: string (nullable = true)
 |-- mal_mulk_split: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- mal_mulk_ilk_eleman: string (nullable = true)

