In [73]:
from datetime import datetime,timedelta
from pyspark.sql import Row
from pyspark.sql.window import Window
from pyspark.sql.functions import lag ,when,array,lit,udf
from pyspark.sql import Column
from pyspark.sql.types import ArrayType,StringType,NullType

##Dataframe UDF
@udf(returnType=ArrayType(StringType()))
def addValue(arr,elem):
    if elem not in arr:
        arr.append(elem)
    return arr

@udf(returnType=ArrayType(StringType()))
def mergeArrays(arr1,arr2):
    returnVal=[]
    if arr1 != None:
        returnVal.extend(arr1)
    if arr2 != None:
        returnVal.extend(arr2)
    return returnVal

@udf(returnType=ArrayType(StringType()))
def getCurrentEnts(added,removed,previousEnts):
    returnVal=[]
    if added !=None:
        returnVal.extend(added)
    if previousEnts !=None:
        if removed !=None:
            returnVal.extend([elem for elem in previousEnts if elem not in removed])
        else:
            returnVal.extend(previousEnts)
    return returnVal

@udf(returnType=ArrayType(StringType()))
def calculateRemovedEnts(added,removed):
    return [ent for ent in removed if ent not in added]

## functions used with rdd of dataframe
def flatten(arr):
    returnVal=[]
    for a in arr:
        if len(a)!=0:
           returnVal.extend(a)
    return returnVal

def transform(row):
    added=flatten(row['collect_set(added)'])
    removed=flatten(row['collect_set(removed)'])
    return Row(last_modified_date=row.last_modified_date,reason=row['max(reason)'],addedEnts=added,removedEnts=removed,updatedBy=row['max(updatedBy)'])

In [71]:
#Data gathering either from database or anyother datasource
dt=datetime.now()
rdd=sc.parallelize([Row(last_modified_date=dt,policy_id='CR_1',active='N',reason='removed',updatedBy='system'),
                    Row(last_modified_date=dt+timedelta(minutes=5),policy_id='CR_2',active='Y',reason='applied',updatedBy='kapil'),
                    Row(last_modified_date=dt+timedelta(minutes=6),policy_id='CR_1',active='Y',reason='applied',updatedBy='amit'),
                    Row(last_modified_date=dt+timedelta(minutes=6),policy_id='CR_1',active='N',reason='removed',updatedBy='amit'),
                    Row(last_modified_date=dt+timedelta(minutes=6),policy_id='CR_1',active='N',reason='removed',updatedBy='kapil')])
df=rdd.toDF()
df.persist()

DataFrame[active: string, last_modified_date: timestamp, policy_id: string, reason: string, updatedBy: string]

In [72]:
windowSpec=Window.partitionBy().orderBy('last_modified_date')
df=df.withColumn('added',array()).withColumn('removed',array())
df=df.select(when(df.active=='Y',addValue(df.added,df.policy_id)).otherwise(df.added).alias('added'),
             when(df.active=='N',addValue(df.removed,df.policy_id)).otherwise(df.removed).alias('removed'),
             df.last_modified_date,df.policy_id,df.active,df.reason,df.updatedBy)
df=df.groupBy("last_modified_date").agg({"added":"collect_set","removed":"collect_set","reason":"max","updatedBy":"max"}).orderBy(df.last_modified_date)
df.persist()

DataFrame[last_modified_date: timestamp, max(reason): string, max(updatedBy): string, collect_set(removed): array<array<string>>, collect_set(added): array<array<string>>]

In [75]:
##mapping removed and added array flatten
dffinal=df.rdd.map(transform).toDF()
dffinal=dffinal.withColumn('removedEnts',calculateRemovedEnts(dffinal.addedEnts.astype(ArrayType(StringType())),dffinal.removedEnts.astype(ArrayType(StringType()))))
dffinal=dffinal.withColumn('currentEnts',dffinal.addedEnts)
dffinal=dffinal.withColumn('currentEnts',getCurrentEnts(dffinal.addedEnts.astype(ArrayType(StringType())),dffinal.removedEnts.astype(ArrayType(StringType())),lag('currentEnts').over(windowSpec)))
dffinal.show()

+---------+--------------------+-------+-----------+---------+------------+
|addedEnts|  last_modified_date| reason|removedEnts|updatedBy| currentEnts|
+---------+--------------------+-------+-----------+---------+------------+
|       []|2018-07-31 21:47:...|removed|     [CR_1]|   system|          []|
|   [CR_2]|2018-07-31 21:52:...|applied|         []|    kapil|      [CR_2]|
|   [CR_1]|2018-07-31 21:53:...|removed|         []|    kapil|[CR_1, CR_2]|
+---------+--------------------+-------+-----------+---------+------------+

