In [11]:
import sys
import datetime, time
import pyspark
from pyspark.sql.functions import *
import logging
import boto3
from pyspark.sql.functions import col
import re
from pyspark.sql.functions import *
import logging
import pyspark
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark import SparkContext 
from pyspark.sql.types import DateType
import pandas as pd
from datetime import datetime, timedelta
import pyspark.sql.functions as F

In [12]:
spark = SparkSession.builder.master("local[1]") \
                    .appName('ChronologicalRepair') \
                    .getOrCreate()

spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")


DataFrame[key: string, value: string]

In [13]:
'''Example data of what the cleansed layer may look like on a smaller scale'''

filtered_output = spark.read.option("header",True).csv("test_cleansed_layer.csv")
filtered_output.show()

+-----------+-----------------+-----------------+---------------+-------------+
|   child_id|enrollment_status|status_start_date|status_end_date|pool_end_date|
+-----------+-----------------+-----------------+---------------+-------------+
|jakeeverson|         Enrolled|         8/1/2022|     12/31/2099|     1/1/2023|
|jakeeverson|         Enrolled|         8/1/2022|     12/31/2099|     2/1/2023|
|jakeeverson|         Enrolled|         8/1/2022|       1/1/2023|     3/1/2023|
|jakeeverson|         Enrolled|         2/2/2023|     12/31/2099|     4/1/2023|
|jakeeverson|         Enrolled|         2/2/2023|     12/31/2099|     5/1/2023|
|jakeeverson|         Enrolled|         2/2/2023|       6/1/2023|     6/1/2023|
|jakeeverson|         Enrolled|         8/1/2022|       2/1/2023|     7/1/2023|
|jakeeverson|         Enrolled|         2/2/2023|      7/20/2023|     8/1/2023|
| cathywoods|         Enrolled|        10/1/2019|     12/31/2099|     1/1/2023|
| cathywoods|         Enrolled|        1

In [14]:
'''
We make a new column chronoID which is a string created from concating svoc_id, status start date, end date, 
and pool end date. This plays a role when we filter the cleansed layer and we only want those unique events since 
each chrono id is unique for each individual event for every student as id,status start,status end, pool end date 
will never be the same for one student or the same student with different events
'''

filtered_output = filtered_output.withColumn("chrono_id", concat(filtered_output.child_id,
                                                                lit(" "), 
                                                                filtered_output.status_start_date,
                                                                lit(" "), 
                                                                filtered_output.status_end_date,
                                                                lit(" "), 
                                                                filtered_output.pool_end_date))
filtered_output.show()

+-----------+-----------------+-----------------+---------------+-------------+--------------------+
|   child_id|enrollment_status|status_start_date|status_end_date|pool_end_date|           chrono_id|
+-----------+-----------------+-----------------+---------------+-------------+--------------------+
|jakeeverson|         Enrolled|         8/1/2022|     12/31/2099|     1/1/2023|jakeeverson 8/1/2...|
|jakeeverson|         Enrolled|         8/1/2022|     12/31/2099|     2/1/2023|jakeeverson 8/1/2...|
|jakeeverson|         Enrolled|         8/1/2022|       1/1/2023|     3/1/2023|jakeeverson 8/1/2...|
|jakeeverson|         Enrolled|         2/2/2023|     12/31/2099|     4/1/2023|jakeeverson 2/2/2...|
|jakeeverson|         Enrolled|         2/2/2023|     12/31/2099|     5/1/2023|jakeeverson 2/2/2...|
|jakeeverson|         Enrolled|         2/2/2023|       6/1/2023|     6/1/2023|jakeeverson 2/2/2...|
|jakeeverson|         Enrolled|         8/1/2022|       2/1/2023|     7/1/2023|jakeeverson 

In [16]:
'''
We sort the dataframe by ordering by child_id, then status start_date ASC, then most recent pool date DESC
This comes in handy when we remove deplicates of status_start_date, keeping the first instance which would be the most recent 
pool based on our ordering style '''


filtered_output.createOrReplaceTempView("dummy_source")


filtered_output = spark.sql("""SELECT *
                        FROM dummy_source
                        ORDER BY child_id ASC, status_start_date ASC, pool_end_date DESC""")

filtered_output.show()

+-----------+-----------------+-----------------+---------------+-------------+--------------------+
|   child_id|enrollment_status|status_start_date|status_end_date|pool_end_date|           chrono_id|
+-----------+-----------------+-----------------+---------------+-------------+--------------------+
| cathywoods|         Enrolled|        10/1/2019|      3/30/2023|     4/1/2023|cathywoods 10/1/2...|
| cathywoods|         Enrolled|        10/1/2019|     12/31/2099|     3/1/2023|cathywoods 10/1/2...|
| cathywoods|         Enrolled|        10/1/2019|       2/1/2023|     2/1/2023|cathywoods 10/1/2...|
| cathywoods|         Enrolled|        10/1/2019|     12/31/2099|     1/1/2023|cathywoods 10/1/2...|
|jakeeverson|         Enrolled|         2/2/2023|      7/20/2023|     8/1/2023|jakeeverson 2/2/2...|
|jakeeverson|         Enrolled|         2/2/2023|       6/1/2023|     6/1/2023|jakeeverson 2/2/2...|
|jakeeverson|         Enrolled|         2/2/2023|     12/31/2099|     5/1/2023|jakeeverson 

In [18]:
'''
This chunk of code starts of by making a list of unique student_ids (child_id_list) using collect() method
then uses a for loop to iterate through every student id and makes a unqiue data frame for every student in the loop to 
manipulate using the filter function. After removing duplicates based on subset = status_start_date then reordering
it collects the unique chronoIDs we generated at the beginning unless it detects error in chronological order of a student
'''


child_id_list = list(set(child_id[0] for child_id in filtered_output.select('child_id').collect()))
error_ids = []
chrono_ids = []

for id in child_id_list:
    
    #create unique dataframe for student based on id in for loop
    child_profile = (filtered_output.filter(filtered_output.child_id.isin(id)))\
        .dropDuplicates(subset=["status_start_date"]).orderBy(col("status_start_date").asc())
    
    chrono_ids.extend([chrono[0] for chrono in child_profile.select('chrono_id').collect()])
    
    child_profile.show()
    
    #if student has more than one entry in cleansed, check for chronological order
    if child_profile.count() > 1:
        counter = 0
        start_date = [start[0] for start in child_profile.select('status_start_date').collect()]
        end_date = [end[0] for end in child_profile.select('status_end_date').collect()]

        while counter < child_profile.count() - 1:
            if start_date[counter + 1] <=  end_date[counter]:
                if id not in error_ids:
                    error_ids.append(id)

            counter +=1

+----------+-----------------+-----------------+---------------+-------------+--------------------+
|  child_id|enrollment_status|status_start_date|status_end_date|pool_end_date|           chrono_id|
+----------+-----------------+-----------------+---------------+-------------+--------------------+
|cathywoods|         Enrolled|        10/1/2019|      3/30/2023|     4/1/2023|cathywoods 10/1/2...|
+----------+-----------------+-----------------+---------------+-------------+--------------------+

+-----------+-----------------+-----------------+---------------+-------------+--------------------+
|   child_id|enrollment_status|status_start_date|status_end_date|pool_end_date|           chrono_id|
+-----------+-----------------+-----------------+---------------+-------------+--------------------+
|jakeeverson|         Enrolled|         2/2/2023|      7/20/2023|     8/1/2023|jakeeverson 2/2/2...|
|jakeeverson|         Enrolled|         8/1/2022|       2/1/2023|     7/1/2023|jakeeverson 8/1/

In [19]:
''' The chronoIDs that were saved to a list as a result of the for loop '''

chrono_ids

['cathywoods 10/1/2019 3/30/2023 4/1/2023',
 'jakeeverson 2/2/2023 7/20/2023 8/1/2023',
 'jakeeverson 8/1/2022 2/1/2023 7/1/2023']

In [21]:
'''
Apply the same rule, now we filter the "cleansed" dataframe by our chronoids list that we generated that we can use in our
aws write method. Clean up columns commented out for visual purposes.
'''

correct_dataframe = filtered_output.filter(filtered_output.chrono_id.isin(chrono_ids))\
                   #.drop("pool_end_date").drop("chronoID")
correct_dataframe.show()

+-----------+-----------------+-----------------+---------------+-------------+--------------------+
|   child_id|enrollment_status|status_start_date|status_end_date|pool_end_date|           chrono_id|
+-----------+-----------------+-----------------+---------------+-------------+--------------------+
| cathywoods|         Enrolled|        10/1/2019|      3/30/2023|     4/1/2023|cathywoods 10/1/2...|
|jakeeverson|         Enrolled|         2/2/2023|      7/20/2023|     8/1/2023|jakeeverson 2/2/2...|
|jakeeverson|         Enrolled|         8/1/2022|       2/1/2023|     7/1/2023|jakeeverson 8/1/2...|
+-----------+-----------------+-----------------+---------------+-------------+--------------------+

