<a href="https://colab.research.google.com/github/ArtemBaron/baron_vergeles/blob/main/spark_when_expr_lit_split_concat_ws_substring_regexp_replace.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
! pip install pyspark
#https://sparkbyexamples.com/pyspark/pyspark-convert-array-column-to-string-column/?expand_article=1
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('test').getOrCreate()

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=ae11353ee0d28eea72e783e870af979359b7b2e5a2a706495bd532fc9fe45e46
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [2]:
from pyspark.sql import functions as f

In [3]:
data = [("James","M",60000),("Michael","M",70000),
        ("Robert",None,400000),("Maria","F",500000),
        ("Jen","",None)]

columns = ["name","gender","salary"]
df = spark.createDataFrame(data = data, schema = columns)

In [4]:
df.show()

+-------+------+------+
|   name|gender|salary|
+-------+------+------+
|  James|     M| 60000|
|Michael|     M| 70000|
| Robert|  null|400000|
|  Maria|     F|500000|
|    Jen|      |  null|
+-------+------+------+



In [5]:
df2 = df.withColumn("new_gender", f.when(df.gender == "M","Male")
                                 .when(df.gender == "F","Female")
                                 .when(df.gender.isNull() ,"")
                                 .otherwise(df.gender))
df2.show()

+-------+------+------+----------+
|   name|gender|salary|new_gender|
+-------+------+------+----------+
|  James|     M| 60000|      Male|
|Michael|     M| 70000|      Male|
| Robert|  null|400000|          |
|  Maria|     F|500000|    Female|
|    Jen|      |  null|          |
+-------+------+------+----------+



In [6]:
df2=df.select("*",f.when(df.gender == "M","Male")
                  .when(df.gender == "F","Female")
                  .when(df.gender.isNull() ,"")
                  .otherwise(df.gender).alias("new_gender"))
df2.show()

+-------+------+------+----------+
|   name|gender|salary|new_gender|
+-------+------+------+----------+
|  James|     M| 60000|      Male|
|Michael|     M| 70000|      Male|
| Robert|  null|400000|          |
|  Maria|     F|500000|    Female|
|    Jen|      |  null|          |
+-------+------+------+----------+



In [7]:
df4 = df.select("*", f.expr("CASE WHEN gender = 'M' THEN 'Male' " +
           "WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ''" +
           "ELSE gender END").alias("new_gender"))
df4.show()

+-------+------+------+----------+
|   name|gender|salary|new_gender|
+-------+------+------+----------+
|  James|     M| 60000|      Male|
|Michael|     M| 70000|      Male|
| Robert|  null|400000|          |
|  Maria|     F|500000|    Female|
|    Jen|      |  null|          |
+-------+------+------+----------+



In [8]:
data=[("James","Bond"),("Scott","Varsa")]
df=spark.createDataFrame(data).toDF("col1","col2")
df.withColumn("Name",f.expr(" col1 ||','|| col2")).show()

+-----+-----+-----------+
| col1| col2|       Name|
+-----+-----+-----------+
|James| Bond| James,Bond|
|Scott|Varsa|Scott,Varsa|
+-----+-----+-----------+



In [9]:
data = [("James","M"),("Michael","F"),("Jen","")]
columns = ["name","gender"]
df = spark.createDataFrame(data = data, schema = columns)
df2=df.withColumn("gender", f.expr("CASE WHEN gender = 'M' THEN 'Male' " +
           "WHEN gender = 'F' THEN 'Female' ELSE 'unknown' END"))
df2.show()

+-------+-------+
|   name| gender|
+-------+-------+
|  James|   Male|
|Michael| Female|
|    Jen|unknown|
+-------+-------+



In [10]:
data=[("2019-01-23",1),("2019-06-24",2),("2019-09-20",3)]
df=spark.createDataFrame(data).toDF("date","increment")

#Add Month value from another column
df.select(df.date,df.increment,
     f.expr("add_months(date,increment)")
    .alias("inc_date")).show()

+----------+---------+----------+
|      date|increment|  inc_date|
+----------+---------+----------+
|2019-01-23|        1|2019-02-23|
|2019-06-24|        2|2019-08-24|
|2019-09-20|        3|2019-12-20|
+----------+---------+----------+



In [11]:
# Providing alias using 'as'
from pyspark.sql.functions import expr
df.select(df.date,df.increment,
     f.expr("""add_months(date,increment) as inc_date""")
  ).show()
# This yields same output as above

+----------+---------+----------+
|      date|increment|  inc_date|
+----------+---------+----------+
|2019-01-23|        1|2019-02-23|
|2019-06-24|        2|2019-08-24|
|2019-09-20|        3|2019-12-20|
+----------+---------+----------+



In [12]:
# Using Cast() Function to change data type
df.select("increment",f.expr("cast(increment as string) as str_increment")) \
  .printSchema()

root
 |-- increment: long (nullable = true)
 |-- str_increment: string (nullable = true)



In [13]:
data=[(100,2),(200,3000),(500,500)]
df=spark.createDataFrame(data).toDF("col1","col2")
df.filter(f.expr("col1 == col2")).show()

+----+----+
|col1|col2|
+----+----+
| 500| 500|
+----+----+



In [14]:
data = [("111",50000),("222",60000),("333",40000)]
columns= ["EmpId","Salary"]
df = spark.createDataFrame(data = data, schema = columns)

df2 = df.select("EmpId","Salary",f.lit("1").alias("lit_value1"))
df2.show(truncate=False)

+-----+------+----------+
|EmpId|Salary|lit_value1|
+-----+------+----------+
|111  |50000 |1         |
|222  |60000 |1         |
|333  |40000 |1         |
+-----+------+----------+



In [15]:
df3 = df2.withColumn("lit_value2", f.when((f.col("Salary") >=40000) & (f.col("Salary") <=50000),f.lit("100")).otherwise(f.lit("200")))
df3.show(truncate=False)

+-----+------+----------+----------+
|EmpId|Salary|lit_value1|lit_value2|
+-----+------+----------+----------+
|111  |50000 |1         |100       |
|222  |60000 |1         |200       |
|333  |40000 |1         |100       |
+-----+------+----------+----------+



In [16]:
data = [("James, A, Smith","2018","M",3000),
            ("Michael, Rose, Jones","2010","M",4000),
            ("Robert,K,Williams","2010","M",4000),
            ("Maria,Anne,Jones","2005","F",4000),
            ("Jen,Mary,Brown","2010","",-1)
            ]

columns=["name","dob_year","gender","salary"]
df=spark.createDataFrame(data,columns)
df.printSchema()

df2 = df.select(f.split(f.col("name"),",").alias("NameArray")) \
    .drop("name")
df2.printSchema()
df2.show()

root
 |-- name: string (nullable = true)
 |-- dob_year: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

root
 |-- NameArray: array (nullable = true)
 |    |-- element: string (containsNull = false)

+--------------------+
|           NameArray|
+--------------------+
| [James,  A,  Smith]|
|[Michael,  Rose, ...|
|[Robert, K, Willi...|
|[Maria, Anne, Jones]|
|  [Jen, Mary, Brown]|
+--------------------+



In [17]:
df.createOrReplaceTempView("PERSON")
spark.sql("select SPLIT(name,',') as NameArray from PERSON") \
    .show()

+--------------------+
|           NameArray|
+--------------------+
| [James,  A,  Smith]|
|[Michael,  Rose, ...|
|[Robert, K, Willi...|
|[Maria, Anne, Jones]|
|  [Jen, Mary, Brown]|
+--------------------+



In [18]:
columns = ["name","languagesAtSchool","currentState"]
data = [("James,,Smith",["Java","Scala","C++"],"CA"), \
    ("Michael,Rose,",["Spark","Java","C++"],"NJ"), \
    ("Robert,,Williams",["CSharp","VB"],"NV")]

df = spark.createDataFrame(data=data,schema=columns)
df.printSchema()
df.show(truncate=False)

#array with multiple strings into string using comma as delimeter
df2 = df.withColumn("languagesAtSchool",
   f.concat_ws(",",f.col("languagesAtSchool")))
df2.printSchema()
df2.show(truncate=False)

root
 |-- name: string (nullable = true)
 |-- languagesAtSchool: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- currentState: string (nullable = true)

+----------------+------------------+------------+
|name            |languagesAtSchool |currentState|
+----------------+------------------+------------+
|James,,Smith    |[Java, Scala, C++]|CA          |
|Michael,Rose,   |[Spark, Java, C++]|NJ          |
|Robert,,Williams|[CSharp, VB]      |NV          |
+----------------+------------------+------------+

root
 |-- name: string (nullable = true)
 |-- languagesAtSchool: string (nullable = false)
 |-- currentState: string (nullable = true)

+----------------+-----------------+------------+
|name            |languagesAtSchool|currentState|
+----------------+-----------------+------------+
|James,,Smith    |Java,Scala,C++   |CA          |
|Michael,Rose,   |Spark,Java,C++   |NJ          |
|Robert,,Williams|CSharp,VB        |NV          |
+----------------+------

In [19]:
df.createOrReplaceTempView("ARRAY_STRING")
spark.sql("select name, concat_ws(',',languagesAtSchool) as languagesAtSchool," + \
    " currentState from ARRAY_STRING") \
    .show(truncate=False)

+----------------+-----------------+------------+
|name            |languagesAtSchool|currentState|
+----------------+-----------------+------------+
|James,,Smith    |Java,Scala,C++   |CA          |
|Michael,Rose,   |Spark,Java,C++   |NJ          |
|Robert,,Williams|CSharp,VB        |NV          |
+----------------+-----------------+------------+



In [20]:
data = [(1,"20200828"),(2,"20180525")]
columns=["id","date"]
df=spark.createDataFrame(data,columns)
df1 = df.withColumn('year', f.substring('date', 1,4))\
    .withColumn('month', f.substring('date', 5,2))\
    .withColumn('day', f.substring('date', 7,2))
df1.printSchema()
df1.show(truncate=False)

root
 |-- id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)

+---+--------+----+-----+---+
|id |date    |year|month|day|
+---+--------+----+-----+---+
|1  |20200828|2020|08   |28 |
|2  |20180525|2018|05   |25 |
+---+--------+----+-----+---+



In [21]:
df.select('date', f.substring('date', 1,4).alias('year'), \
                  f.substring('date', 5,2).alias('month'), \
                  f.substring('date', 7,2).alias('day')).show(truncate=False)

+--------+----+-----+---+
|date    |year|month|day|
+--------+----+-----+---+
|20200828|2020|08   |28 |
|20180525|2018|05   |25 |
+--------+----+-----+---+



In [22]:
df3=df.withColumn('year', f.col('date').substr(1, 4))\
  .withColumn('month',f.col('date').substr(5, 2))\
  .withColumn('day', f.col('date').substr(7, 2))
df3.show()

+---+--------+----+-----+---+
| id|    date|year|month|day|
+---+--------+----+-----+---+
|  1|20200828|2020|   08| 28|
|  2|20180525|2018|   05| 25|
+---+--------+----+-----+---+



In [23]:
address = [(1,"14851 Jeffrey Rd","DE"),
    (2,"43421 Margarita St","NY"),
    (3,"13111 Siemon Ave","CA")]
df =spark.createDataFrame(address,["id","address","state"])
df.show()
df.withColumn('address', f.regexp_replace('address', 'Rd', 'Road')) \
  .show(truncate=False)

+---+------------------+-----+
| id|           address|state|
+---+------------------+-----+
|  1|  14851 Jeffrey Rd|   DE|
|  2|43421 Margarita St|   NY|
|  3|  13111 Siemon Ave|   CA|
+---+------------------+-----+

+---+------------------+-----+
|id |address           |state|
+---+------------------+-----+
|1  |14851 Jeffrey Road|DE   |
|2  |43421 Margarita St|NY   |
|3  |13111 Siemon Ave  |CA   |
+---+------------------+-----+



In [24]:
df.withColumn('address',
    f.when(df.address.endswith('Rd'),f.regexp_replace(df.address,'Rd','Road')) \
   .when(f.col("address").endswith('St'),f.regexp_replace(df.address,'St','Street')) \
   .when(df.address.endswith('Ave'),f.regexp_replace(df.address,'Ave','Avenue')) \
   .otherwise(df.address)) \
   .show(truncate=False)

+---+----------------------+-----+
|id |address               |state|
+---+----------------------+-----+
|1  |14851 Jeffrey Road    |DE   |
|2  |43421 Margarita Street|NY   |
|3  |13111 Siemon Avenue   |CA   |
+---+----------------------+-----+



In [25]:
address = [(1,"14851 Jeffrey Rd","DE"),
    (2,"43421 Margarita St","NY"),
    (3,"13111 Siemon Ave","CA")]
df =spark.createDataFrame(address,["id","address","state"])
df.show()

+---+------------------+-----+
| id|           address|state|
+---+------------------+-----+
|  1|  14851 Jeffrey Rd|   DE|
|  2|43421 Margarita St|   NY|
|  3|  13111 Siemon Ave|   CA|
+---+------------------+-----+



In [26]:
#Replace part of string with another string
df = spark.createDataFrame([("ABCDE_XYZ", "FGH")], ("col1", "col2"))
df.select(f.overlay("col1", "col2", 7).alias("overlayed")).show()

+---------+
|overlayed|
+---------+
|ABCDE_FGH|
+---------+



In [27]:
df=spark.createDataFrame(
        data = [ ("1","2019-06-24 12:01:19.000")],
        schema=["id","input_timestamp"])
df.printSchema()

#Timestamp String to DateType
df.withColumn("timestamp",f.to_timestamp("input_timestamp")) \
  .show(truncate=False)

root
 |-- id: string (nullable = true)
 |-- input_timestamp: string (nullable = true)

+---+-----------------------+-------------------+
|id |input_timestamp        |timestamp          |
+---+-----------------------+-------------------+
|1  |2019-06-24 12:01:19.000|2019-06-24 12:01:19|
+---+-----------------------+-------------------+



In [28]:
df.select(f.to_timestamp(f.lit('06-24-2019 12:01:19.000'),'MM-dd-yyyy HH:mm:ss.SSSS')) \
  .show()

+---------------------------------------------------------------+
|to_timestamp(06-24-2019 12:01:19.000, MM-dd-yyyy HH:mm:ss.SSSS)|
+---------------------------------------------------------------+
|                                            2019-06-24 12:01:19|
+---------------------------------------------------------------+



In [29]:
#SQL string to TimestampType
spark.sql("select to_timestamp('2019-06-24 12:01:19.000') as timestamp")
#SQL CAST timestamp string to TimestampType
spark.sql("select timestamp('2019-06-24 12:01:19.000') as timestamp")
#SQL Custom string to TimestampType
spark.sql("select to_timestamp('06-24-2019 12:01:19.000','MM-dd-yyyy HH:mm:ss.SSSS') as timestamp")

DataFrame[timestamp: timestamp]

In [33]:
# Using Cast to convert Timestamp String to DateType
df.withColumn('date_type', f.col('input_timestamp').cast('date')) \
       .show(truncate=False)

# Using Cast to convert TimestampType to DateType
df.withColumn('date_type', f.to_timestamp('input_timestamp').cast('date')) \
  .show(truncate=False)

df.select(f.to_date(f.lit('06-24-2019 12:01:19.000'),'MM-dd-yyyy HH:mm:ss.SSSS')) \
  .show()

#Timestamp String to DateType
df.withColumn("date_type",f.to_date("input_timestamp")) \
  .show(truncate=False)


df.withColumn("ts",f.to_timestamp(f.col("input_timestamp"))) \
  .withColumn("datetype",f.to_date(f.col("ts"))) \
  .show(truncate=False)

#SQL TimestampType to DateType
spark.sql("select to_date(current_timestamp) as date_type")
#SQL CAST TimestampType to DateType
spark.sql("select date(to_timestamp('2019-06-24 12:01:19.000')) as date_type")
#SQL CAST timestamp string to DateType
spark.sql("select date('2019-06-24 12:01:19.000') as date_type")
#SQL Timestamp String (default format) to DateType
spark.sql("select to_date('2019-06-24 12:01:19.000') as date_type")
#SQL Custom Timeformat to DateType
spark.sql("select to_date('06-24-2019 12:01:19.000','MM-dd-yyyy HH:mm:ss.SSSS') as date_type")

+---+-----------------------+----------+
|id |input_timestamp        |date_type |
+---+-----------------------+----------+
|1  |2019-06-24 12:01:19.000|2019-06-24|
+---+-----------------------+----------+

+---+-----------------------+----------+
|id |input_timestamp        |date_type |
+---+-----------------------+----------+
|1  |2019-06-24 12:01:19.000|2019-06-24|
+---+-----------------------+----------+

+----------------------------------------------------------+
|to_date(06-24-2019 12:01:19.000, MM-dd-yyyy HH:mm:ss.SSSS)|
+----------------------------------------------------------+
|                                                2019-06-24|
+----------------------------------------------------------+

+---+-----------------------+----------+
|id |input_timestamp        |date_type |
+---+-----------------------+----------+
|1  |2019-06-24 12:01:19.000|2019-06-24|
+---+-----------------------+----------+

+---+-----------------------+-------------------+----------+
|id |input_time

DataFrame[date_type: date]

In [34]:
df=spark.createDataFrame([["1"]],["id"])
df.select(f.current_date().alias("current_date"), \
      f.date_format(f.current_date(),"yyyy MM dd").alias("yyyy MM dd"), \
      f.date_format(f.current_timestamp(),"MM/dd/yyyy hh:mm").alias("MM/dd/yyyy"), \
      f.date_format(f.current_timestamp(),"yyyy MMM dd").alias("yyyy MMMM dd"), \
      f.date_format(f.current_timestamp(),"yyyy MMMM dd E").alias("yyyy MMMM dd E") \
   ).show()

#SQL

spark.sql("select current_date() as current_date, "+
      "date_format(current_timestamp(),'yyyy MM dd') as yyyy_MM_dd, "+
      "date_format(current_timestamp(),'MM/dd/yyyy hh:mm') as MM_dd_yyyy, "+
      "date_format(current_timestamp(),'yyyy MMM dd') as yyyy_MMMM_dd, "+
      "date_format(current_timestamp(),'yyyy MMMM dd E') as yyyy_MMMM_dd_E").show()


+------------+----------+----------------+------------+----------------+
|current_date|yyyy MM dd|      MM/dd/yyyy|yyyy MMMM dd|  yyyy MMMM dd E|
+------------+----------+----------------+------------+----------------+
|  2023-07-21|2023 07 21|07/21/2023 06:59| 2023 Jul 21|2023 July 21 Fri|
+------------+----------+----------------+------------+----------------+

+------------+----------+----------------+------------+----------------+
|current_date|yyyy_MM_dd|      MM_dd_yyyy|yyyy_MMMM_dd|  yyyy_MMMM_dd_E|
+------------+----------+----------------+------------+----------------+
|  2023-07-21|2023 07 21|07/21/2023 06:59| 2023 Jul 21|2023 July 21 Fri|
+------------+----------+----------------+------------+----------------+



In [35]:
data = [("1","2019-07-01"),("2","2019-06-24"),("3","2019-08-24")]

df=spark.createDataFrame(data=data,schema=["id","date"])

from pyspark.sql.functions import *

df.select(
      f.col("date"),
      f.current_date().alias("current_date"),
      f.datediff(current_date(),col("date")).alias("datediff")
    ).show()

df.withColumn("datesDiff", f.datediff(current_date(),col("date"))) \
      .withColumn("montsDiff", f.months_between(current_date(),col("date"))) \
      .withColumn("montsDiff_round",round(f.months_between(current_date(),col("date")),2)) \
      .withColumn("yearsDiff",f.months_between(current_date(),col("date"))/lit(12)) \
      .withColumn("yearsDiff_round",round(f.months_between(current_date(),col("date"))/lit(12),2)) \
      .show()

data2 = [("1","07-01-2019"),("2","06-24-2019"),("3","08-24-2019")]
df2=spark.createDataFrame(data=data2,schema=["id","date"])
df2.select(
    f.to_date(col("date"),"MM-dd-yyyy").alias("date"),
    f.current_date().alias("endDate")
    )

spark.sql("select round(months_between('2019-07-01',current_date())/12,2) as years_diff").show()

+----------+------------+--------+
|      date|current_date|datediff|
+----------+------------+--------+
|2019-07-01|  2023-07-21|    1481|
|2019-06-24|  2023-07-21|    1488|
|2019-08-24|  2023-07-21|    1427|
+----------+------------+--------+

+---+----------+---------+-----------+---------------+------------------+---------------+
| id|      date|datesDiff|  montsDiff|montsDiff_round|         yearsDiff|yearsDiff_round|
+---+----------+---------+-----------+---------------+------------------+---------------+
|  1|2019-07-01|     1481|48.64516129|          48.65| 4.053763440833333|           4.05|
|  2|2019-06-24|     1488|48.90322581|           48.9|4.0752688175000005|           4.08|
|  3|2019-08-24|     1427|46.90322581|           46.9|3.9086021508333335|           3.91|
+---+----------+---------+-----------+---------------+------------------+---------------+

+----------+
|years_diff|
+----------+
|     -4.05|
+----------+



In [36]:
arrayArrayData = [
  ("James",[["Java","Scala","C++"],["Spark","Java"]]),
  ("Michael",[["Spark","Java","C++"],["Spark","Java"]]),
  ("Robert",[["CSharp","VB"],["Spark","Python"]])
]

df = spark.createDataFrame(data=arrayArrayData, schema = ['name','subjects'])
df.printSchema()
df.show(truncate=False)

root
 |-- name: string (nullable = true)
 |-- subjects: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

+-------+-----------------------------------+
|name   |subjects                           |
+-------+-----------------------------------+
|James  |[[Java, Scala, C++], [Spark, Java]]|
|Michael|[[Spark, Java, C++], [Spark, Java]]|
|Robert |[[CSharp, VB], [Spark, Python]]    |
+-------+-----------------------------------+



In [37]:
df.select(df.name,f.explode(df.subjects)).show(truncate=False)

+-------+------------------+
|name   |col               |
+-------+------------------+
|James  |[Java, Scala, C++]|
|James  |[Spark, Java]     |
|Michael|[Spark, Java, C++]|
|Michael|[Spark, Java]     |
|Robert |[CSharp, VB]      |
|Robert |[Spark, Python]   |
+-------+------------------+



In [42]:
arrayCol = f.ArrayType(f.StringType(),False)
data = [
 ("James,,Smith",["Java","Scala","C++"],["Spark","Java"],"OH","CA"),
 ("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"],"NY","NJ"),
 ("Robert,,Williams",["CSharp","VB"],["Spark","Python"],"UT","NV")
]
from pyspark.sql.types import StringType, ArrayType,StructType,StructField

schema = StructType([
    StructField("name",StringType(),True),
    StructField("languagesAtSchool",ArrayType(StringType()),True),
    StructField("languagesAtWork",ArrayType(StringType()),True),
    StructField("currentState", StringType(), True),
    StructField("previousState", StringType(), True)
  ])

df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show()


root
 |-- name: string (nullable = true)
 |-- languagesAtSchool: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- languagesAtWork: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- currentState: string (nullable = true)
 |-- previousState: string (nullable = true)

+----------------+------------------+---------------+------------+-------------+
|            name| languagesAtSchool|languagesAtWork|currentState|previousState|
+----------------+------------------+---------------+------------+-------------+
|    James,,Smith|[Java, Scala, C++]|  [Spark, Java]|          OH|           CA|
|   Michael,Rose,|[Spark, Java, C++]|  [Spark, Java]|          NY|           NJ|
|Robert,,Williams|      [CSharp, VB]|[Spark, Python]|          UT|           NV|
+----------------+------------------+---------------+------------+-------------+



In [43]:
df.select(f.split(df.name,",").alias("nameAsArray")).show()

+--------------------+
|         nameAsArray|
+--------------------+
|    [James, , Smith]|
|   [Michael, Rose, ]|
|[Robert, , Williams]|
+--------------------+



In [44]:
df.select(df.name,f.array(df.currentState,df.previousState).alias("States")).show()


+----------------+--------+
|            name|  States|
+----------------+--------+
|    James,,Smith|[OH, CA]|
|   Michael,Rose,|[NY, NJ]|
|Robert,,Williams|[UT, NV]|
+----------------+--------+



In [45]:
df.select(df.name,f.array_contains(df.languagesAtSchool,"Java")
    .alias("array_contains")).show()

+----------------+--------------+
|            name|array_contains|
+----------------+--------------+
|    James,,Smith|          true|
|   Michael,Rose,|          true|
|Robert,,Williams|         false|
+----------------+--------------+



In [46]:
simpleData = [("James", "Sales", 3000),
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)
  ]
schema = ["employee_name", "department", "salary"]
df = spark.createDataFrame(data=simpleData, schema = schema)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [47]:
#//approx_count_distinct()
print("approx_count_distinct: " + \
      str(df.select(approx_count_distinct("salary")).collect()[0][0]))

#//Prints approx_count_distinct: 6

approx_count_distinct: 6


In [50]:
print("avg: " + str(df.select(avg("salary")).collect()[0][0]))

avg: 3400.0


In [51]:
#//collect_list
df.select(collect_list("salary")).show(truncate=False)

+------------------------------------------------------------+
|collect_list(salary)                                        |
+------------------------------------------------------------+
|[3000, 4600, 4100, 3000, 3000, 3300, 3900, 3000, 2000, 4100]|
+------------------------------------------------------------+



In [53]:
#//countDistinct
df2 = df.select(f.countDistinct("department", "salary"))
df2.show(truncate=False)
print("Distinct Count of Department & Salary: "+str(df2.collect()[0][0]))

+----------------------------------+
|count(DISTINCT department, salary)|
+----------------------------------+
|8                                 |
+----------------------------------+

Distinct Count of Department & Salary: 8


In [54]:
print("count: "+str(df.select(count("salary")).collect()[0]))
df.select(first("salary")).show(truncate=False)
df.select(last("salary")).show(truncate=False)
df.select(kurtosis("salary")).show(truncate=False)
df.select(max("salary")).show(truncate=False)
df.select(min("salary")).show(truncate=False)
df.select(mean("salary")).show(truncate=False)
df.select(skewness("salary")).show(truncate=False)
df.select(stddev("salary"), stddev_samp("salary"), \
    stddev_pop("salary")).show(truncate=False)
df.select(sum("salary")).show(truncate=False)
df.select(sumDistinct("salary")).show(truncate=False)
df.select(variance("salary"),var_samp("salary"),var_pop("salary")) \
  .show(truncate=False)

count: Row(count(salary)=10)
+-------------+
|first(salary)|
+-------------+
|3000         |
+-------------+

+------------+
|last(salary)|
+------------+
|4100        |
+------------+

+-------------------+
|kurtosis(salary)   |
+-------------------+
|-0.6467803030303032|
+-------------------+

+-----------+
|max(salary)|
+-----------+
|4600       |
+-----------+

+-----------+
|min(salary)|
+-----------+
|2000       |
+-----------+

+-----------+
|avg(salary)|
+-----------+
|3400.0     |
+-----------+

+--------------------+
|skewness(salary)    |
+--------------------+
|-0.12041791181069571|
+--------------------+

+-------------------+-------------------+------------------+
|stddev_samp(salary)|stddev_samp(salary)|stddev_pop(salary)|
+-------------------+-------------------+------------------+
|765.9416862050705  |765.9416862050705  |726.636084983398  |
+-------------------+-------------------+------------------+

+-----------+
|sum(salary)|
+-----------+
|34000      |
+-----------



+--------------------+
|sum(DISTINCT salary)|
+--------------------+
|20900               |
+--------------------+

+-----------------+-----------------+---------------+
|var_samp(salary) |var_samp(salary) |var_pop(salary)|
+-----------------+-----------------+---------------+
|586666.6666666666|586666.6666666666|528000.0       |
+-----------------+-----------------+---------------+

