In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window as W

spark = SparkSession.builder.appName('Puzzle').getOrCreate()

In [3]:
schema = T.StructType(
    [
        T.StructField('ID', T.IntegerType(), True),
        T.StructField('Name', T.StringType(), True),
        T.StructField('Salary', T.IntegerType(), True),
        T.StructField('DOJ', T.DateType(), True)
    ]
)

In [4]:
df = spark.read.csv('data/puzzle_038.csv', header=True, schema=schema, dateFormat='dd-MMM-yy')
df.show()

+---+----+------+----------+
| ID|Name|Salary|       DOJ|
+---+----+------+----------+
|  1|   A|   100|2014-10-02|
|  2|   B|   200|2013-03-16|
|  3|   C|   300|2014-01-02|
|  4|   D|   400|2012-02-17|
|  5|   E|   500|2012-02-08|
+---+----+------+----------+



In [5]:
df.select(
    '*',
    F.when(  F.dayofmonth('DOJ')<15, F.last_day('DOJ') ).otherwise(F.last_day(F.add_months('DOJ',1))).alias('NDOJ')
).select(
    '*',
    F.dayofweek('NDOJ'),
    F.when(
          F.dayofweek('NDOJ')<6, 
         -1*(F.dayofweek('NDOJ')+1)
      ).when( 
           F.dayofweek('NDOJ')>6, 
            -1
      ).otherwise(0).alias('DaysTOSub')
).select(
    'ID',
    'Name',
    'Salary',
    F.date_add('NDOJ', F.col('DaysTOSub')).alias('DOJ')
).show()

+---+----+------+----------+
| ID|Name|Salary|       DOJ|
+---+----+------+----------+
|  1|   A|   100|2014-10-31|
|  2|   B|   200|2013-04-26|
|  3|   C|   300|2014-01-31|
|  4|   D|   400|2012-03-30|
|  5|   E|   500|2012-02-24|
+---+----+------+----------+

