In [71]:
from cerebralcortex import Kernel
import pyspark.sql.functions as F
import warnings
warnings.filterwarnings("ignore")

def episodes_with_last_updated_in_localtime():
    def tweak_sdf(sdf, *duplicate_cols):
        """
        Some participants mistakenly registered twice using two different email ids, producing two unique
        user ids for the same user. To ensure data integrity, all user ids are replaced by the first id.
        Duplicate datapoints are also removed.
        
        Args:
            sdf (pyspark.DataFrame) : A stress likelihood/episode dataframe
            *duplicate_cols : A list of two columns used to sort and remove duplicates from the sdf
        Returns:
            sdf (pyspark.DataFrame) : Modified dataframe
        """        
        sdf = (sdf.withColumn("user_id", F.when(F.col('user') == "4e3c01a1-4f61-3329-b843-edd72eaece63",
                                                "62c32dbd-a32d-3ecd-a9f1-fb9bc40fff66")
                                        .when(F.col('user') == "37686d8b-d47b-33e9-99b1-496196e7ada2",
                                              "ff54abe7-a4dd-3c10-a00d-e9bcc68ee92b")
                                        .when(F.col('user') == "68d89413-5a7f-3a3b-9e6c-b22d2699307a",
                                              "07c8b674-2c13-3a33-a973-cf1cab70d9f9")
                                        .when(F.col('user') == "7e0aa5f7-96cd-3a95-a28d-e3ef3684e0e1",
                                              "71e88740-be67-382a-a21f-78fba469cb13")
                                        .when(F.col('user') == "559c51b0-672e-32d9-a1d7-f7b36b5190af",
                                              "f7400971-a4b7-326a-8120-2e1db7f91cca")
                                        .when(F.col('user') == "b9b13911-dda5-38de-bb0d-becd0c9a4c7d",
                                              "835c291b-ecf4-32da-9a58-67bfe5ecfb7f")
                                        .when(F.col('user') == "fd1574fe-3093-3519-949e-15c98b4bc73a",
                                              "d8404d54-51d1-35fc-9bf7-32aa5942c575")
                                        .when(F.col('user') == "336482c0-71cf-39c8-96bb-8aeb7ffa4d3f",
                                              "b0a58353-edf9-3213-9bd5-808e7ad42bd5")
                                        .otherwise(F.col('user')))
                        .drop('user')
                        .withColumnRenamed('user_id', 'user')
                        .orderBy([duplicate_cols[0], duplicate_cols[1]])
                        .dropDuplicates([duplicate_cols[0], duplicate_cols[1]]))

        return sdf

    def get_stress_likelihoods(stream_name, CC):
        """
        End of DST was not reflected in the timestamps of the likelihood datastream for app versions < 32.
        This method handles that error.
    
        Args:
            stream_name : name of the stream
            CC : cerebral cortext boject
        Returns:
            sdf (pyspark.DataFrame) : End of DST reflected dataframe
        """        
        sdf = (CC.get_stream(stream_name=stream_name, 
                            version='all')
              .withColumn("diff_in_seconds", F.col('endtime')/1000000 
                          - F.col('timestamp').cast('long'))
              .withColumn("localtime_et", F.to_timestamp(F.col('localtime').cast('long') 
                                                         + F.col("diff_in_seconds")))
              .withColumnRenamed("localtime", "localtime_st")
              .withColumnRenamed("timestamp", "UTC_st")
              .withColumn("localtime_st_dl", F.when((F.col('localtime_st') >= 
                                                     F.lit("2021-11-07 02:00:00")) 
                                                    & (F.col('version') < 32), 
                                                    F.to_timestamp(F.col('localtime_st').cast('long') 
                                                                   - 3600))
                          .otherwise(F.col('localtime_st')))
              .withColumn("localtime_et_dl", F.when((F.col('localtime_et') >= 
                                                     F.lit("2021-11-07 02:00:00")) 
                                                    & (F.col('version') < 32), 
                                                    F.to_timestamp(F.col('localtime_et').cast('long') 
                                                                   - 3600))
                          .otherwise(F.col('localtime_et')))
              .drop('localtime_st', 'localtime_et', 'endtime', "diff_in_seconds")
              .withColumnRenamed("localtime_st_dl", "localtime_st")
              .withColumnRenamed("localtime_et_dl", "localtime_et"))

        return sdf

    
    def localtime_of_annotations(episode_sdf, likelihood_sdf):
        stress_id_last_updated_lt = (episode_sdf.select(['user', 'stress_id', 'starttime', 
                                                         'endtime', 'last_updated'])._data
                                     .join(likelihood_sdf._data)
                                     .filter((episode_sdf['user'] == likelihood_sdf['user']) & 
                                             (likelihood_sdf['localtime_st'] >= 
                                              F.to_timestamp(episode_sdf['starttime'].cast('long') - 60)) & 
                                             (likelihood_sdf['localtime_et'] 
                                              <= F.to_timestamp(episode_sdf['endtime'].cast('long') + 60)))
                                     .groupBy('stress_id')
                                     .agg(F.min('UTC_st').alias('UTC_st'), 
                                          F.min('localtime_st').alias('localtime_st'), 
                                          F.min('last_updated').alias('last_updated'))
                                     .withColumn('time_diff_secs', F.when(F.col('UTC_st') > F.col('localtime_st'), 
                                                                          F.col('UTC_st').cast('long') - 
                                                                          F.col('localtime_st').cast('long'))
                                                                          .otherwise(F.col('localtime_st').cast('long') - 
                                                                                     F.col('UTC_st').cast('long')))
                                     .withColumn("last_updated_lt", F.to_timestamp(F.col('last_updated').cast('long') 
                                                                                   - F.col('time_diff_secs')))
                                     .select('stress_id', "last_updated_lt"))

        return episode_sdf._data.join(stress_id_last_updated_lt, on='stress_id', how='inner')
    
    CC = Kernel("/home/jupyter/cc3_moods_conf/", study_name="moods")
    episode_sdf = CC.get_stream('org.md2k.moods.episodes', version="all")                         
    episode_sdf = tweak_sdf(episode_sdf, 'user', 'endtime')
    likelihood_sdf = get_stress_likelihoods('probability--org.md2k.watch--fossil_watch_sport', CC)
    likelihood_sdf = tweak_sdf(likelihood_sdf, 'user', 'localtime_st')
    episode_sdf = localtime_of_annotations(episode_sdf, likelihood_sdf)
    return episode_sdf

In [75]:
episode_sdf = episodes_with_last_updated_in_localtime()

In [74]:
(episode_sdf.filter((F.col('user') == 'af5f95f0-09cc-3a0a-b283-1a0ae724e43c') & 
                    (F.col('starttime') >= F.lit("2022-05-05 12:00:00")) & 
                    (F.col('starttime') <= F.lit("2022-05-06 12:00:00")))
 .select(['user', 'stress_id', 'starttime', 'endtime', 
          'last_updated', "last_updated_lt"])
 .show(5))                                                                                

+--------------------+---------+-------------------+-------------------+-------------------+-------------------+
|                user|stress_id|          starttime|            endtime|       last_updated|    last_updated_lt|
+--------------------+---------+-------------------+-------------------+-------------------+-------------------+
|af5f95f0-09cc-3a0...|    98166|2022-05-05 12:15:17|2022-05-05 12:33:17|2022-05-07 01:38:42|2022-05-06 20:38:42|
|af5f95f0-09cc-3a0...|    98171|2022-05-05 14:01:17|2022-05-05 14:04:17|2022-05-05 22:45:18|2022-05-05 17:45:18|
|af5f95f0-09cc-3a0...|    98173|2022-05-05 14:16:17|2022-05-05 14:23:18|2022-05-05 22:45:18|2022-05-05 17:45:18|
|af5f95f0-09cc-3a0...|    98182|2022-05-05 15:46:18|2022-05-05 15:51:42|2022-05-05 22:45:18|2022-05-05 17:45:18|
|af5f95f0-09cc-3a0...|    98169|2022-05-05 12:53:17|2022-05-05 13:35:17|2022-05-05 22:45:18|2022-05-05 17:45:18|
+--------------------+---------+-------------------+-------------------+-------------------+----