# Rollic Data Engineer Role - Case Study 2022

There are 3 types of data given in csv format: Installs, Sessions, Events.

>Session data contains the session records of users' playtime. The data consist of 6 fields: USER_ID, PLATFORM, APP_NAME, CREATE_TS, INSTALL_TS, SESSION_ID
- USER_ID: Unique identifier for each users
- PLATFORM: Platform info (ios/android)
- APP_NAME: The name of the game the user plays
- CREATE_TS: Created timestamp for the session
- INSTALLS_TS: Installed timestamp for the users
- SESSION_ID: Unique identifier for each session records


>Installs data has a record for each installs. The data consist of 5 fields: USER_ID, PLATFORM, APP_NAME,	INSTALL_TS,	SESSION_ID.
- USER_ID: Unique identifier for each users
- PLATFORM: Platform info (ios/android)
- APP_NAME: The name of the game the user plays
- INSTALLS_TS: Installed timestamp for the users
- SESSION_ID: Session_ID of the first sessions


>Events data consist of action logs of the users. The data consist of 6 fields: USER_ID, PLATFORM, APP_NAME, CREATE_TS, INSTALL_TS, EVENT_NAME, SESSION_ID
- USER_ID: Unique identifier for each users
- PLATFORM: Platform info (ios/android)
- APP_NAME: The name of the game the user plays
- CREATE_TS: Created timestamp for the event (action)
- INSTALLS_TS: Installed timestamp for the users
- SESSION_ID: Session_ID of the first sessions

>For this case study, we have raw data files in csv format as explained above. You are expected to complete neccassary data cleaning (Data cleaning must be completed according to the explanations above.) and create aggregations for the business needs. They are explained below; 

1. __DAU:__ For the first task you need to find daily active users (unique users amount installed or played for each day). Results data should consist of these fields; *app_name, create_date, user_count*. You are expected to the use sessions data. Results must be exported in the following partitions: app_name, platform.

2. __Retention:__ Here, you need to find the user retention rate. Write the code of how many users returned to the game on which day after their installation date for this game.  Results data should consist of these fields; *app_name, install_date, returned_user_count, day (create date - install date)*. You are expected to the use installs and sessions data.  Results must be exported in the following partitions: retention, app_name, platform.

3. __Level Completion Time:__ Level completion time can be calculated as duration between level_started and level_completed event's create timestamp in seconds. This is needed to be calculated for each users for the same session. After that, values must be averaged. Results data should consist of these fields; *app_name, level, average_completion_time* You are expected to the use events data.  Results must be exported in the following partitions results, level, app_name, platfom.
4. __Sessions Data:__ For the last task we need to improve sessions data. We will check that if the session is user's first session or not. We can check 'app_started' events data and pull that value in a boolean data type. Also, install_ts column is needed to be dropped. Results data should consist of these fields *user_id, platform, app_name, create_ts, session_id, is_first_session*. Events data must be used.  Results must be exported in the following partition. create_date. Results have to be idempotent in terms of create date (Data shouldn't be changed if the notebook run again)

> Answers are needed to be given in this notebook. Pyspark must be used for data operations (Ex: Pandas is not allowed.). Results folder and the notebook are expected to be sent in one zip file.

****Dealine is 5 days. Good Luck!****

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 46 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 33.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=1a4a3fce54b70a1494700e6160cc0b5518d9a599b287b7d6452ddbed4ad99a13
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [3]:
import csv
#import os
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import functions as F
from pyspark.sql.window import Window
#from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

In [4]:
#spark = SparkSession.builder.getOrCreate()

spark = SparkSession.builder \
        .config("spark.driver.memory", "15g") \
        .config("spark.executor.memory", "4g") \
        .config("spark.driver.maxResultSize", "10g") \
        .getOrCreate()

In [5]:
def check_df(df):
  print('Schema: ', df.schema)
  print('Schema: ') 
  df.printSchema()
  print('First 5 rows: ') 
  df.show(n=5, truncate=False) 

def check_dist_count(df):
  for c in df.columns:
    temp = df.select(F.col(c)).distinct()
    print('distinct', c, ':', temp.count())

def check_dist(df):
  for c in df.columns:
    temp = df.select(F.col(c)).distinct()
    temp.orderBy(F.col(c)).show(n=5)

def check_null(df):
  for c in df.columns:
    temp = df.filter(F.isnull(c))             #isnull
    if temp.count() > 0:
      print("   ------>   isnull")
      temp.show()

    temp = df.filter(F.col(c) == 'null')      # == 'null'
    if temp.count() > 0:
      print('   ------>   == null')
      temp.show()

    temp = df.filter(F.col(c) == 'NULL')      # == 'NULL'
    if temp.count() > 0:
      print('   ------>    == NULL')
      temp.show()

    temp = df.filter(F.col(c) == 'NaN')      # == 'NaN'
    if temp.count() > 0:
      print('   ------>   == NaN')
      temp.show()
      
    temp = df.filter(F.col(c) == 'nan')      # == 'nan'
    if temp.count() > 0:
      print('   ------>    == nan')
      temp.show()

    temp = df.filter(F.col(c) == '')         # == ''
    if temp.count() > 0:
      print('   ------>   == ....')
      temp.show()

  for c, t in df.dtypes:
    if t != "timestamp":
      temp = df.filter(F.isnan(c))
      if temp.count() > 0:
        print('   ------>   isnan')
        temp.show()
  
def change_null(df):
  for c in df.columns:
    temp = df.filter(F.isnull(c))
    if temp.count() > 0:
      print("   ------>   isnull ----> changed to 'null'")
      df = df.fillna(value='null')
  return df

  

def check_difference(df1, df2):
  temp1 = df1.select('USER_ID', 'PLATFORM', 'APP_NAME', 'INSTALL_TS').distinct()
  temp2 = df2.select('USER_ID', 'PLATFORM', 'APP_NAME', 'INSTALL_TS').distinct()

  print('df1 difference from df2:')
  difference = temp1.subtract(temp2)
  difference.orderBy('USER_ID').show()

  print('df2 difference from df1:')
  difference = temp2.subtract(temp1)
  difference.orderBy('USER_ID').show()


def update_platform(df1, df2):
  
  temp1 = df1.select('USER_ID', 'PLATFORM', 'APP_NAME').distinct()
  temp2 = df2.select('USER_ID', 'PLATFORM', 'APP_NAME').distinct()

  dif = temp1.subtract(temp2)
  dif = dif.orderBy('USER_ID')
  c = dif.count()

  print('df1_dif')
  dif.show()
  if c > 0:
    for i in range(c):
      y = dif.collect()[i]
      df2_dif = temp2.filter((temp2.USER_ID == y.USER_ID) 
                          & (temp2.APP_NAME == y.APP_NAME))
    

      print('df2_dif')
      df2_dif.show()
      c2 = df2_dif.count()
      if c2 > 0:
        y = df2_dif.collect()[0]
        df1 = df1.withColumn('PLATFORM', 
                            F.when((df1.USER_ID == y.USER_ID)
                            & (df1.APP_NAME == y.APP_NAME),
                            y.PLATFORM).otherwise(F.col('PLATFORM')))
      else:
        print("No difference between dataframes.")
  else:
    print('No different platforms between dataframes.')
  
  return df1



def check_dup(df):
    print("not distinct: ", df.count())
    distinctDF = df.distinct()
    print("distinct: ", distinctDF.count())

    df_duplicates = df.groupBy(df.columns).count().filter(F.col('count')>1)
    df_duplicates.orderBy(F.col('count').desc()).show(n=50)

def remove_dup(df):
    df = df.distinct()
    return df


In [6]:
path = ['/content/drive/MyDrive/Colab Notebooks/RollicDataEngineeringCase/sessions.csv',
        '/content/drive/MyDrive/Colab Notebooks/RollicDataEngineeringCase/installs.csv', 
        '/content/drive/MyDrive/Colab Notebooks/RollicDataEngineeringCase/events.csv']


sessions = spark.read.format('csv').options(header='true', inferSchema='true').load(path[0])
installs = spark.read.format('csv').options(header='true', inferSchema='true').load(path[1])
events = spark.read.format('csv').options(header='true', inferSchema='true').load(path[2])

sessions = sessions.distinct().orderBy('USER_ID')
installs = installs.distinct().orderBy('USER_ID')
events = events.distinct().orderBy('USER_ID')

In [7]:
# check null values
print('---------installs---------')
check_null(installs)
print('---------sessions---------')
check_null(sessions)
print('---------events---------')
check_null(events)

---------installs---------
   ------>   isnull
+-------+--------+--------+-------------------+-------------+
|USER_ID|PLATFORM|APP_NAME|         INSTALL_TS|   SESSION_ID|
+-------+--------+--------+-------------------+-------------+
|     38|    null|   app_1|2022-01-01 12:59:13|1641041957099|
|     89|    null|   app_1|2022-01-03 17:03:28|1641229407771|
+-------+--------+--------+-------------------+-------------+

---------sessions---------
   ------>   == null
+-------+--------+--------+-------------------+-------------------+-------------+
|USER_ID|PLATFORM|APP_NAME|          CREATE_TS|         INSTALL_TS|   SESSION_ID|
+-------+--------+--------+-------------------+-------------------+-------------+
|      8|    null|   app_1|2022-01-02 20:47:37|2022-01-01 14:51:28|1641156457490|
+-------+--------+--------+-------------------+-------------------+-------------+

---------events---------


In [7]:
# update null platforms
#print('---------before update-------')
#check_difference(sessions, events)
print('---------updating-------')
sessions = update_platform(sessions, events)
#print('---------after update-------')
#check_difference(sessions, events)

#print('---------before update-------')
#check_difference(installs, events)
print('---------updating-------')
installs = update_platform(installs, events)
#print('---------after update-------')
#check_difference(installs, events)

#temp2 = installs.filter(F.isnull(installs.PLATFORM))
#temp2.show()
#temp2 =  installs.filter((installs.USER_ID == '38') | (installs.USER_ID == '89'))
#temp2.show()

---------updating-------
df1_dif
+-------+--------+--------+
|USER_ID|PLATFORM|APP_NAME|
+-------+--------+--------+
|      8|    null|   app_1|
+-------+--------+--------+

df2_dif
+-------+--------+--------+
|USER_ID|PLATFORM|APP_NAME|
+-------+--------+--------+
|      8| android|   app_1|
+-------+--------+--------+

---------updating-------
df1_dif
+-------+--------+--------+
|USER_ID|PLATFORM|APP_NAME|
+-------+--------+--------+
|     38|    null|   app_1|
|     89|    null|   app_1|
+-------+--------+--------+

df2_dif
+-------+--------+--------+
|USER_ID|PLATFORM|APP_NAME|
+-------+--------+--------+
|     38| android|   app_1|
+-------+--------+--------+

df2_dif
+-------+--------+--------+
|USER_ID|PLATFORM|APP_NAME|
+-------+--------+--------+
|     89|     ios|   app_1|
+-------+--------+--------+



In [8]:
# check null values
print('---------installs---------')
check_null(installs)
print('---------sessions---------')
check_null(sessions)
print('---------events---------')
check_null(events)

---------installs---------
---------sessions---------
---------events---------


In [9]:
# events ve sessions'daki installs_ts'leri, installs dosyası üzerinden düzeltir.
def update_install_ts(df1, df2):
  temp1 = df1.select('USER_ID', 'PLATFORM', 'APP_NAME', 'INSTALL_TS').distinct()
  temp2 = df2.select('USER_ID', 'PLATFORM', 'APP_NAME', 'INSTALL_TS').distinct()
  
  dif = temp2.subtract(temp1)
  dif = dif.orderBy('USER_ID')
  c = dif.count()
  print(c)
  if c > 0:
    print('Updating....')
    for i in range(c):  #change invalid INSTALL_TS
      y = dif.collect()[i]
      df1 = df1.withColumn('INSTALL_TS', \
                           F.when((df1.USER_ID == y.USER_ID) \
                                  & (df1.PLATFORM == y.PLATFORM) \
                                  & (df1.APP_NAME == y.APP_NAME) \
                                  & (df1.INSTALL_TS != y.INSTALL_TS), \
                                  y.INSTALL_TS).otherwise(F.col('INSTALL_TS')))
  

  else:
     print("No wrong install_ts in the dataframe.")
  
  #df1.filter((df1.USER_ID =='56')).show()

  return df1


# update install timestamps in sessions and events
#print('events---------before update-------')
#check_difference(events, installs)
print('events---------updating-------')
events = update_install_ts(events, installs)
#print('events---------after update-------')
#check_difference(events, installs)

#print('sessions---------before update-------')
#check_difference(sessions, installs)
print('sessions---------updating-------')
sessions = update_install_ts(sessions, installs)
#print('sessions---------after update-------')
#check_difference(sessions, installs)

events---------updating-------
3
Updating....
sessions---------updating-------
6
Updating....


In [17]:
check_difference(installs, sessions)
check_difference(installs, events)
check_difference(sessions, events)

df1 difference from df2:
+-------+--------+--------+-------------------+
|USER_ID|PLATFORM|APP_NAME|         INSTALL_TS|
+-------+--------+--------+-------------------+
|      5| android|   app_1|2022-01-01 10:38:27|
|     12| android|   app_1|2022-01-01 17:44:12|
|     30| android|   app_1|2022-01-01 13:27:55|
|     82| android|   app_1|2022-01-03 08:08:16|
+-------+--------+--------+-------------------+

df2 difference from df1:
+-------+--------+--------+----------+
|USER_ID|PLATFORM|APP_NAME|INSTALL_TS|
+-------+--------+--------+----------+
+-------+--------+--------+----------+

df1 difference from df2:
+-------+--------+--------+----------+
|USER_ID|PLATFORM|APP_NAME|INSTALL_TS|
+-------+--------+--------+----------+
+-------+--------+--------+----------+

df2 difference from df1:
+-------+--------+--------+----------+
|USER_ID|PLATFORM|APP_NAME|INSTALL_TS|
+-------+--------+--------+----------+
+-------+--------+--------+----------+

df1 difference from df2:
+-------+--------+-

In [10]:
# sessions ve events'e installs SESSION_IDlerini ekler.
def add_install_sessions(df1, df2, df3):

  # sessions
  temp_sessions = df1.join(df2, on=['USER_ID', 'SESSION_ID'], how='leftanti')

  # print(temp_sessions.count())
  c = temp_sessions.count()
  if c > 0:
    print('There are', c, 'install SESSION_ID missing in df2')
    print('Adding', c, 'new sessions.....')
    temp_sessions = temp_sessions.select(F.col('USER_ID'), \
                                             F.col('PLATFORM'), \
                                             F.col('APP_NAME'), \
                                             F.col('INSTALL_TS').alias('CREATE_TS'), \
                                             F.col('INSTALL_TS'), \
                                             F.col('SESSION_ID'))

    df2 = df2.union(temp_sessions)
    df2 = df2.orderBy('USER_ID')
    #df2.show()
    #df2.printSchema()
  else:
    print('Every install SESSION_ID is in the df2.')

  # events
  temp_events = df1.join(df3, on=['USER_ID', 'SESSION_ID'], how='leftanti')
  # print(temp_events.count())
  c = temp_events.count()
  if c > 0:
    print('There are', c, 'install SESSION_ID missing in df3.')
    print('Adding', c, 'new sessions.....')
    #df.select("firstname","salary", lit(0.3).alias("bonus")).show()

    temp_events = temp_events.select(F.col('USER_ID'), \
                                         F.col('PLATFORM'), \
                                         F.col('APP_NAME'), \
                                         F.col('INSTALL_TS').alias('CREATE_TS'), \
                                         F.col('INSTALL_TS'), \
                                         F.lit('app_started').alias("EVENT_NAME"), \
                                         F.col('SESSION_ID'))
    temp_events.orderBy('USER;ID', 'CREATE_TS').show()
    temp_events.show(n=50)
    df3 = df3.union(temp_events)
    df3 = df3.orderBy('USER_ID')

  else:
    print('Every install SESSION_ID is in the df3.')

  return df2, df3

sessions, events = add_install_sessions(installs, sessions, events)


There are 41 install SESSION_ID missing in df2
Adding 41 new sessions.....
Every install SESSION_ID is in the df3.


In [19]:
check_difference(installs, sessions)
check_difference(installs, events)
check_difference(sessions, events)

df1 difference from df2:
+-------+--------+--------+----------+
|USER_ID|PLATFORM|APP_NAME|INSTALL_TS|
+-------+--------+--------+----------+
+-------+--------+--------+----------+

df2 difference from df1:
+-------+--------+--------+----------+
|USER_ID|PLATFORM|APP_NAME|INSTALL_TS|
+-------+--------+--------+----------+
+-------+--------+--------+----------+

df1 difference from df2:
+-------+--------+--------+----------+
|USER_ID|PLATFORM|APP_NAME|INSTALL_TS|
+-------+--------+--------+----------+
+-------+--------+--------+----------+

df2 difference from df1:
+-------+--------+--------+----------+
|USER_ID|PLATFORM|APP_NAME|INSTALL_TS|
+-------+--------+--------+----------+
+-------+--------+--------+----------+

df1 difference from df2:
+-------+--------+--------+----------+
|USER_ID|PLATFORM|APP_NAME|INSTALL_TS|
+-------+--------+--------+----------+
+-------+--------+--------+----------+

df2 difference from df1:
+-------+--------+--------+----------+
|USER_ID|PLATFORM|APP_NAME

In [11]:
# update create_ts in sessions and events based on install_ts

def update_create_ts(df1, df2):

  if 'CREATE_TS' in df2.columns:
    if 'EVENT_NAME' in df2.columns:
      temp = df2.filter(df2.EVENT_NAME == 'app_started')
      w3 = Window.partitionBy('SESSION_ID').orderBy('CREATE_TS')
      temp = temp.withColumn('row', F.row_number().over(w3)).filter(F.col('row') == 1).drop('row')
      temp = temp.filter(temp.CREATE_TS != temp.INSTALL_TS)
      temp = temp.join(df1, on=['USER_ID', 'SESSION_ID'], how='leftsemi')   # find data on events based on install SESSION_IDs
      
      c = temp.count()
      print('There are', c, 'different create_ts then install_ts.')

      if c > 0:
        print('Updating.....')
        for i in range(c.count()):
          y = temp2.collect()[i]
          df2 = df2.withColumn('CREATE_TS',
                            F.when((df2.USER_ID == y.USER_ID)
                            & (df2.EVENT_NAME == y.EVENT_NAME)
                            & (df2.SESSION_ID == y.SESSION_ID),
                            y.INSTALL_TS).otherwise(F.col('CREATE_TS')))
  
    else:
      # ----sessions -----
      temp = df2.filter(df2.CREATE_TS != df2.INSTALL_TS)
      temp = temp.join(df1, on=['USER_ID', 'SESSION_ID'], how='leftsemi')
      c = temp.count()
      print('There are', c, 'different create_ts then install_ts.')
      #temp.show(n = 100)

      if c > 0:
        print('Updating.....')
        for i in range(c):
          y = temp.collect()[i]
          df2 = df2.withColumn('CREATE_TS',
                            F.when((df2.USER_ID == y.USER_ID)
                            & (df2.SESSION_ID == y.SESSION_ID),
                            y.INSTALL_TS).otherwise(F.col('CREATE_TS')))
          #y.orderBy('USER_ID', 'CREATE_TS').show()
          #sessions_new.filter(sessions_new.USER_ID =='10').show()

  else:
    print('CREATE_TS is not in df2.')

  return df2

events = update_create_ts(installs, events)
sessions = update_create_ts(installs, sessions)
#y.orderBy('USER_ID', 'CREATE_TS').show()
#sessions.filter(sessions.USER_ID =='10').show()

There are 0 different create_ts then install_ts.
There are 53 different create_ts then install_ts.
Updating.....


In [12]:
# update create_ts in sessions based on events
def upd_create_ts(df1, df2):
  temp1 = df1
  temp2 = df2
  #temp2 = temp2.cache()

  w3 = Window.partitionBy('SESSION_ID').orderBy('CREATE_TS')
  temp2 = temp2.withColumn('row', F.row_number().over(w3)).filter(F.col('row') == 1).drop('row')
  #print(temp1.count())
  #temp1.show()
  #print(temp2_2.count())
  temp2 = temp2.orderBy('USER_ID', 'CREATE_TS')
  temp2 = temp2.select('USER_ID', 'PLATFORM', 'APP_NAME', 'CREATE_TS', 'INSTALL_TS', 'SESSION_ID')
  temp2 = temp2.subtract(temp1)
  temp2 = temp2.join(temp1, on=['USER_ID', 'SESSION_ID'], how='leftsemi')
  temp2 = temp2.select('USER_ID', 'PLATFORM', 'APP_NAME', 'CREATE_TS', 'INSTALL_TS', 'SESSION_ID')
  temp1 = temp1.join(temp2, on=['USER_ID', 'SESSION_ID'], how='leftanti')
  temp1 = temp1.select('USER_ID', 'PLATFORM', 'APP_NAME', 'CREATE_TS', 'INSTALL_TS', 'SESSION_ID')
  print('There are', temp2.count(), 'wrong create_ts in sessions.')
  #print(temp1.count())
  #print(df1.count())
  if temp2.count() > 0:
    print('Updating......')
    df1 = temp1.union(temp2)
    #for i in range(temp2.count()):
      #y = temp2.collect()[i]
      #df1 = df1.withColumn('CREATE_TS', F.when((df1.USER_ID == y.USER_ID)
                        #    & (df1.SESSION_ID == y.SESSION_ID),
                        #    y.CREATE_TS).otherwise(F.col('CREATE_TS')))

  #temp1 = df1.filter(df1.CREATE_TS < df1.INSTALL_TS)
  #print(temp1)

  #temp1 = df1.filter(df1.CREATE_TS < df1.INSTALL_TS)
  print('Done!')
  return df1

#sessions.filter(sessions.USER_ID == 4).show()
sessions = upd_create_ts(sessions, events)
#sessions.filter(sessions.USER_ID == 4).show()


There are 918 wrong create_ts in sessions.
Updating......
Done!


In [13]:
sessions = sessions.orderBy('USER_ID', 'CREATE_TS')
events = events.orderBy('USER_ID', 'CREATE_TS')
sessions.printSchema()
events.printSchema()

root
 |-- USER_ID: integer (nullable = true)
 |-- PLATFORM: string (nullable = true)
 |-- APP_NAME: string (nullable = true)
 |-- CREATE_TS: timestamp (nullable = true)
 |-- INSTALL_TS: timestamp (nullable = true)
 |-- SESSION_ID: long (nullable = true)

root
 |-- USER_ID: integer (nullable = true)
 |-- PLATFORM: string (nullable = true)
 |-- APP_NAME: string (nullable = true)
 |-- CREATE_TS: timestamp (nullable = true)
 |-- INSTALL_TS: timestamp (nullable = true)
 |-- EVENT_NAME: string (nullable = true)
 |-- SESSION_ID: long (nullable = true)



In [15]:
# check entities where create_ts is < install_ts
def invalid_create_ts(df1):
  temp = df1.filter(df1.CREATE_TS < df1.INSTALL_TS)
  #temp.orderBy(temp.USER_ID, temp.CREATE_TS).show(n=50)
  c = temp.count()
  print('There are', c, 'invalid create_ts in the dataframe.')

  if c > 0:
    df1 = df1.filter(F.col('CREATE_TS') >= F.col('INSTALL_TS'))

  return df1
sessions = invalid_create_ts(sessions)
events = invalid_create_ts(events)


There are 13 invalid create_ts in the dataframe.
There are 35 invalid create_ts in the dataframe.


In [16]:
print(events.count())
print(sessions.count())

4499
1397


In [17]:
# check and update wrong SESSION_IDs in sessions and events
def del_wrong_sessions(df1, df2):

  # sessions
  temp_sessions = df1.join(df2, on=['USER_ID', 'SESSION_ID'], how='leftanti')
  c = temp_sessions.count()
  
  if c > 0:

    print('df1 have', c,  'different SESSION IDs from df2.')

    print('Total session count in df1:', df1.count())
    df1 = df1.join(temp_sessions, on=['USER_ID', 'SESSION_ID'], how='leftanti')
    df1 = df1.select('USER_ID', 'PLATFORM', 'APP_NAME', 'CREATE_TS', 'INSTALL_TS', 'SESSION_ID')
    
    print('different session count:', c)
    print('new session count after cleaning:', df1.count())
    df1 = df1.orderBy('USER_ID', 'CREATE_TS')

  else:

    print('df1 dont have different SESSION IDs from df2.')

  # events 
  temp_events = df2.join(df1, on=['USER_ID', 'SESSION_ID'], how='leftanti')
  c = temp_events.count()

  if c > 0:

    print('df2 have', c,  'different SESSION IDs from df1.')
    print('adding these IDs to df1.....')

    w3 = Window.partitionBy('SESSION_ID').orderBy('CREATE_TS')
    t = temp_events.withColumn('row', F.row_number().over(w3)).filter(F.col('row') == 1).drop('row')
    t = t.orderBy('USER_ID', 'CREATE_TS')
    
    #t.show()
    print(t.count())

    t = t.select('USER_ID', 'PLATFORM', 'APP_NAME', 'CREATE_TS', 'INSTALL_TS', 'SESSION_ID')
    df1 = df1.union(t)
    df1 = df1.orderBy('USER_ID', 'CREATE_TS')

  else:
    print('df2 dont have different SESSION IDs from df1.')
  
  return df1


sessions = del_wrong_sessions(sessions, events)

df1 have 327 different SESSION IDs from df2.
Total session count in df1: 1397
different session count: 327
new session count after cleaning: 1070
df2 have 394 different SESSION IDs from df1.
adding these IDs to df1.....
+-------+-------------+--------+--------+-------------------+-------------------+---------------+
|USER_ID|   SESSION_ID|PLATFORM|APP_NAME|          CREATE_TS|         INSTALL_TS|     EVENT_NAME|
+-------+-------------+--------+--------+-------------------+-------------------+---------------+
|      3|1641016206539|     ios|   app_1|2022-01-01 05:50:14|2022-01-01 05:02:46|  level_started|
|      3|1641016256176|     ios|   app_1|2022-01-01 05:50:57|2022-01-01 05:02:46|level_completed|
|      5|1641483452735| android|   app_1|2022-01-06 15:37:47|2022-01-01 10:38:27|  level_started|
|      7|1641058228398| android|   app_1|2022-01-01 17:31:54|2022-01-01 17:31:54|    app_started|
|      8|1641049054667| android|   app_1|2022-01-01 14:57:45|2022-01-01 14:51:29|  level_start

In [56]:
# check datasets
print("---------------installs-------------")
check_df(installs)
print("---------------sessions-------------")
check_df(sessions)
print("---------------events-------------")
check_df(events)

---------------installs-------------
Schema:  StructType([StructField('USER_ID', IntegerType(), True), StructField('PLATFORM', StringType(), True), StructField('APP_NAME', StringType(), True), StructField('INSTALL_TS', TimestampType(), True), StructField('SESSION_ID', LongType(), True)])
Schema: 
root
 |-- USER_ID: integer (nullable = true)
 |-- PLATFORM: string (nullable = true)
 |-- APP_NAME: string (nullable = true)
 |-- INSTALL_TS: timestamp (nullable = true)
 |-- SESSION_ID: long (nullable = true)

First 5 rows: 
+-------+--------+--------+-------------------+-------------+
|USER_ID|PLATFORM|APP_NAME|INSTALL_TS         |SESSION_ID   |
+-------+--------+--------+-------------------+-------------+
|1      |ios     |app_1   |2022-01-01 01:09:32|1640999371742|
|2      |ios     |app_1   |2022-01-01 18:55:17|1641063316272|
|3      |ios     |app_1   |2022-01-01 05:02:46|1641013365666|
|4      |android |app_1   |2022-01-01 08:09:09|1641024547451|
|5      |android |app_1   |2022-01-01 10:3

In [None]:
#check same session IDs for different user IDs

def check_same_session_for_dif_user(df1, df2):
  temp1 = df1.select('USER_ID', 'SESSION_ID').distinct()
  temp2 = df2.select('USER_ID', 'SESSION_ID').distinct()

  t = temp1.join(temp2, 'SESSION_ID')
  print(t.count())
  t = t.select(t.SESSION_ID).distinct()
  print(t.count())

  temp1 = df1.select('USER_ID', 'SESSION_ID').distinct()
  temp2 = df1.select('SESSION_ID').distinct()
  print(temp1.count())
  print(temp2.count())

  temp1 = df2.select('USER_ID', 'SESSION_ID').distinct()
  temp2 = df2.select('SESSION_ID').distinct()
  print(temp1.count())
  print(temp2.count())


check_same_session_for_dif_user(installs, sessions)
check_same_session_for_dif_user(installs, events)
check_same_session_for_dif_user(sessions, events)

In [18]:
# write dataframes to new csv files
def write_df_to_csv(df1, df2, df3):
  df1.repartition(1).write.csv(path="/content/drive/MyDrive/Colab Notebooks/RollicDataEngineeringCase/installs_cleared.csv", 
                               header="true")
  df2.repartition(1).write.csv(path="/content/drive/MyDrive/Colab Notebooks/RollicDataEngineeringCase/sessions_cleared.csv", 
                               header="true")
  df3.repartition(1).write.csv(path="/content/drive/MyDrive/Colab Notebooks/RollicDataEngineeringCase/events_cleared.csv", 
                               header="true")
  


installs = installs.distinct()
installs = installs.orderBy('USER_ID')
sessions = sessions.distinct()
sessions = sessions.orderBy('USER_ID', 'CREATE_TS')
events = events.distinct()
events = events.orderBy('USER_ID', 'CREATE_TS')
write_df_to_csv(installs, sessions, events)

In [36]:
path = ['/content/drive/MyDrive/Colab Notebooks/RollicDataEngineeringCase' +
        '/installs_cleared.csv/part-00000-d3ee878b-b5ce-4f40-afd9-39230fc6e32f-c000.csv',
        '/content/drive/MyDrive/Colab Notebooks/RollicDataEngineeringCase/' +
        'sessions_cleared.csv/part-00000-31be7df7-60c6-4a85-b212-87d7b6029c1c-c000.csv', 
        '/content/drive/MyDrive/Colab Notebooks/RollicDataEngineeringCase' +
        '/events_cleared.csv/part-00000-a62a07d9-4d95-4112-8440-e1efee6fcec6-c000.csv']

installs_new = spark.read.format('csv').options(header='true', inferSchema='true').load(path[0])
sessions_new = spark.read.format('csv').options(header='true', inferSchema='true').load(path[1])
events_new = spark.read.format('csv').options(header='true', inferSchema='true').load(path[2])



In [37]:
# check datasets
print("---------------installs-------------")
check_df(installs_new)
print("---------------sessions-------------")
check_df(sessions_new)
print("---------------events-------------")
check_df(events_new)


# check null values
print('---------installs---------')
check_null(installs_new)
print('---------sessions---------')
check_null(sessions_new)
print('---------events---------')
check_null(events_new)

check_difference(installs_new, sessions_new)
check_difference(installs_new, events_new)
check_difference(sessions_new, events_new)


---------------installs-------------
Schema:  StructType([StructField('USER_ID', IntegerType(), True), StructField('PLATFORM', StringType(), True), StructField('APP_NAME', StringType(), True), StructField('INSTALL_TS', TimestampType(), True), StructField('SESSION_ID', LongType(), True)])
Schema: 
root
 |-- USER_ID: integer (nullable = true)
 |-- PLATFORM: string (nullable = true)
 |-- APP_NAME: string (nullable = true)
 |-- INSTALL_TS: timestamp (nullable = true)
 |-- SESSION_ID: long (nullable = true)

First 5 rows: 
+-------+--------+--------+-------------------+-------------+
|USER_ID|PLATFORM|APP_NAME|INSTALL_TS         |SESSION_ID   |
+-------+--------+--------+-------------------+-------------+
|1      |ios     |app_1   |2022-01-01 01:09:32|1640999371742|
|2      |ios     |app_1   |2022-01-01 18:55:17|1641063316272|
|3      |ios     |app_1   |2022-01-01 05:02:46|1641013365666|
|4      |android |app_1   |2022-01-01 08:09:09|1641024547451|
|5      |android |app_1   |2022-01-01 10:3

In [38]:
# check distinct column count
print('---------installs---------')
check_dist_count(installs_new)
print('---------sessions---------')
check_dist_count(sessions_new)
print('---------events---------')
check_dist_count(events_new)

aaaaa = events_new.select('CREATE_TS')
print(aaaaa.count())
print(sessions_new.count())

---------installs---------
distinct USER_ID : 100
distinct PLATFORM : 2
distinct APP_NAME : 1
distinct INSTALL_TS : 100
distinct SESSION_ID : 100
---------sessions---------
distinct USER_ID : 100
distinct PLATFORM : 2
distinct APP_NAME : 1
distinct CREATE_TS : 1143
distinct INSTALL_TS : 100
distinct SESSION_ID : 1143
---------events---------
distinct USER_ID : 100
distinct PLATFORM : 2
distinct APP_NAME : 1
distinct CREATE_TS : 4489
distinct INSTALL_TS : 100
distinct EVENT_NAME : 4
distinct SESSION_ID : 1143
4499
1143


In [39]:
# find different session IDs
temp1 = sessions_new.select('SESSION_ID').distinct()
temp2 = events_new.select('SESSION_ID').distinct()
print('sessions_new:', sessions.count())
print(temp1.count())
print(temp2.count())

dif=temp1.subtract(temp2)
print('---------> sessions_new difference from events_new')
print(dif.count())

temp = sessions_new.join(events_new, on='SESSION_ID', how='leftanti')
temp.orderBy('USER_ID', 'CREATE_TS').show()
temp.filter(F.col('CREATE_TS') < F.col('INSTALL_TS')).show()
print(temp.count())

dif=temp2.subtract(temp1)
print('---------> events_new difference from sessions_new')
print(dif.count())

temp = events_new.join(sessions_new, on='SESSION_ID', how='leftanti')
temp.orderBy('USER_ID', 'CREATE_TS').show()
temp.filter(F.col('CREATE_TS') < F.col('INSTALL_TS')).show()
print(temp.count())

temp = sessions_new.filter(sessions_new.CREATE_TS < sessions_new.INSTALL_TS)
temp.show()
print(temp.count())
temp = temp.select('SESSION_ID')
x = events_new.join(temp, on='SESSION_ID', how='leftsemi')
x.show()
x.filter(F.col('EVENT_NAME') == 'app_started').show()

sessions_new: 1369
1143
1143
---------> sessions_new difference from events_new
0
+----------+-------+--------+--------+---------+----------+
|SESSION_ID|USER_ID|PLATFORM|APP_NAME|CREATE_TS|INSTALL_TS|
+----------+-------+--------+--------+---------+----------+
+----------+-------+--------+--------+---------+----------+

+----------+-------+--------+--------+---------+----------+
|SESSION_ID|USER_ID|PLATFORM|APP_NAME|CREATE_TS|INSTALL_TS|
+----------+-------+--------+--------+---------+----------+
+----------+-------+--------+--------+---------+----------+

0
---------> events_new difference from sessions_new
0
+----------+-------+--------+--------+---------+----------+----------+
|SESSION_ID|USER_ID|PLATFORM|APP_NAME|CREATE_TS|INSTALL_TS|EVENT_NAME|
+----------+-------+--------+--------+---------+----------+----------+
+----------+-------+--------+--------+---------+----------+----------+

+----------+-------+--------+--------+---------+----------+----------+
|SESSION_ID|USER_ID|PLATF

In [56]:
path_output = '/content/drive/MyDrive/Colab Notebooks/RollicDataEngineeringCase/output/'
# DAU: For the first task you need to find daily active users 
# (unique users amount installed or played for each day). Results data should consist of these fields;
# app_name, create_date, user_count. You are expected to the use sessions data. 
# Results must be exported in the following partitions: app_name, platform.
def dau():

  sessions_new.createTempView("sessions")

  dau = spark.sql("SELECT APP_NAME as app_name, \
                  PLATFORM as platform,\
                  DATE(CREATE_TS) as create_date, \
                  COUNT(DISTINCT USER_ID) as user_count \
                  FROM sessions GROUP BY app_name, platform, create_date \
                  ORDER BY app_name, platform, create_date")

  spark.catalog.dropTempView("sessions")

  dau.show()

  dau.write.partitionBy('app_name', 'platform').format("csv").save(path_output + 'dau', header = 'true')


dau()

+--------+--------+-----------+----------+
|app_name|platform|create_date|user_count|
+--------+--------+-----------+----------+
|   app_1| android| 2022-01-01|        25|
|   app_1| android| 2022-01-02|        26|
|   app_1| android| 2022-01-03|        20|
|   app_1| android| 2022-01-04|         6|
|   app_1| android| 2022-01-05|         7|
|   app_1| android| 2022-01-06|         7|
|   app_1| android| 2022-01-07|         5|
|   app_1|     ios| 2022-01-01|        13|
|   app_1|     ios| 2022-01-02|        27|
|   app_1|     ios| 2022-01-03|        21|
|   app_1|     ios| 2022-01-04|        14|
|   app_1|     ios| 2022-01-05|        14|
|   app_1|     ios| 2022-01-06|         8|
|   app_1|     ios| 2022-01-07|         9|
+--------+--------+-----------+----------+



In [57]:
# Retention: Here, you need to find the user retention rate. 
# Write the code of how many users returned to the game on which day after their installation date for this game. 
# Results data should consist of these fields; app_name, install_date, returned_user_count, day (create date - install date). 
# You are expected to the use installs and sessions data. Results must be exported in the following partitions: retention, app_name, platform.
def retention():

  sessions_new.createTempView("sessions")

  retention = spark.sql("SELECT APP_NAME as app_name, PLATFORM as platform, \
                          DATE(INSTALL_TS) as install_date, \
                          (DAY(CREATE_TS) - DAY(INSTALL_TS)) as day, \
                          COUNT(DISTINCT USER_ID) as returned_user_count \
                          FROM sessions WHERE DAY(CREATE_TS) - DAY(INSTALL_TS) != 0 \
                          GROUP BY app_name, platform, install_date, day \
                          ORDER BY app_name, platform, install_date, day")
  
  spark.catalog.dropTempView("sessions")

  retention.show(n = 50)

  retention.write.partitionBy('day', 'app_name', 'platform').format("csv").save(path_output + 'retention', header = 'true')

retention()

+--------+--------+------------+---+-------------------+
|app_name|platform|install_date|day|returned_user_count|
+--------+--------+------------+---+-------------------+
|   app_1| android|  2022-01-01|  1|                 10|
|   app_1| android|  2022-01-01|  2|                  7|
|   app_1| android|  2022-01-01|  3|                  2|
|   app_1| android|  2022-01-01|  4|                  4|
|   app_1| android|  2022-01-01|  5|                  3|
|   app_1| android|  2022-01-01|  6|                  3|
|   app_1| android|  2022-01-02|  1|                  3|
|   app_1| android|  2022-01-02|  2|                  1|
|   app_1| android|  2022-01-02|  3|                  1|
|   app_1| android|  2022-01-02|  4|                  1|
|   app_1| android|  2022-01-03|  1|                  1|
|   app_1| android|  2022-01-03|  2|                  1|
|   app_1| android|  2022-01-03|  3|                  1|
|   app_1| android|  2022-01-03|  4|                  1|
|   app_1| android|  2022-01-04

In [59]:
# Level Completion Time: Level completion time can be calculated as duration between level_started and 
# level_completed event's create timestamp in seconds. This is needed to be calculated for each users for the same session.
# After that, values must be averaged. Results data should consist of these fields; app_name, level, average_completion_time 
# You are expected to the use events data. Results must be exported in the following partitions results, level, app_name, platfom.
def level_comp_time():
  
  sessions_new.createTempView("sessions")
  events_new.createTempView("events")

  level_comp_time = spark.sql("WITH started as( \
                              SELECT USER_ID as user_id, APP_NAME as app, PLATFORM as plat, \
                              CREATE_TS as create_time, EVENT_NAME as event_name, \
                              SESSION_ID as session \
                              FROM events WHERE EVENT_NAME == 'level_started' \
                              ), \
                              completed as( \
                              SELECT USER_ID as user_id, APP_NAME as app, PLATFORM as plat, \
                              CREATE_TS as create_time, EVENT_NAME as event_name, \
                              SESSION_ID as session \
                              FROM events WHERE EVENT_NAME == 'level_completed' \
                              ), \
                              x as( \
                              SELECT started.app as app_name, started.plat as platform, \
                              started.create_time as start_time, completed.create_time as complete_time, \
                              started.session as session_id, \
                              TIMESTAMPDIFF(SECOND, started.create_time, completed.create_time) as time_dif \
                              FROM started INNER JOIN completed on started.session == completed.session \
                              WHERE completed.create_time > started.create_time \
                              GROUP BY app_name, platform, start_time, complete_time, session_id \
                              ORDER BY session_id, start_time, complete_time \
                              ), \
                              completion_time as( \
                              SELECT app_name, platform, start_time, \
                              session_id, MIN(time_dif) as completion_time \
                              FROM x GROUP BY app_name, platform, start_time, session_id \
                              ORDER BY start_time \
                              ) \
                              SELECT app_name, platform, session_id, \
                              AVG(completion_time) as average_completion_time \
                              FROM completion_time \
                              GROUP BY app_name, platform, session_id \
                              ORDER BY app_name, platform, session_id")
  
  spark.catalog.dropTempView("sessions")
  spark.catalog.dropTempView("events")
                              
  level_comp_time.show()

  level_comp_time.write.partitionBy('app_name', 'platform').format("csv").save(path_output + 'level', header = 'true')

level_comp_time()

+--------+--------+-------------+-----------------------+
|app_name|platform|   session_id|average_completion_time|
+--------+--------+-------------+-----------------------+
|   app_1| android|1640998429081|                   14.0|
|   app_1| android|1640998501136|                    8.0|
|   app_1| android|1640998567508|                    9.0|
|   app_1| android|1640998630515|                   23.0|
|   app_1| android|1640998732759|                   13.0|
|   app_1| android|1640998806033|                    9.0|
|   app_1| android|1640998881859|                   14.0|
|   app_1| android|1641005273704|                    7.0|
|   app_1| android|1641005377900|                   10.5|
|   app_1| android|1641008210548|                   29.0|
|   app_1| android|1641011640300|                    8.0|
|   app_1| android|1641013603785|                   20.0|
|   app_1| android|1641016838195|                   20.0|
|   app_1| android|1641026476627|                   20.0|
|   app_1| and

In [60]:
# Sessions Data: For the last task we need to improve sessions data. 
# We will check that if the session is user's first session or not. We can check 'app_started' events data and 
# pull that value in a boolean data type. Also, install_ts column is needed to be dropped. Results data should consist 
# of these fields user_id, platform, app_name, create_ts, session_id, is_first_session. Events data must be used. 
# Results must be exported in the following partition. create_date. Results have to be idempotent in terms of 
# create date (Data shouldn't be changed if the notebook run again)
def sessions_data():

  events_new.createTempView("events")

  install_data = spark.sql("WITH installs as( \
                            SELECT USER_ID, APP_NAME, PLATFORM, \
                            MIN(CREATE_TS) as CREATE_TS, INSTALL_TS, EVENT_NAME, \
                            SESSION_ID \
                            FROM events WHERE events.EVENT_NAME == 'app_started' and events.CREATE_TS == events.INSTALL_TS \
                            GROUP BY USER_ID, APP_NAME, PLATFORM, INSTALL_TS, EVENT_NAME, SESSION_ID \
                            ORDER BY USER_ID \
                            ) \
                            SELECT * FROM installs ORDER BY user_id \
                            ")
  
  spark.catalog.dropTempView("events")


  install_data = install_data.select('USER_ID', 'APP_NAME', 'PLATFORM', 'CREATE_TS', 'SESSION_ID')

  temp = sessions_new.select('USER_ID', 'APP_NAME', 'PLATFORM', 'CREATE_TS', 'SESSION_ID')

  temp_false = temp.subtract(install_data)

  temp_true = temp.join(temp_false, on=['USER_ID', 'SESSION_ID'], how='leftanti')
  temp_true = temp_true.select('USER_ID', 'APP_NAME', 'PLATFORM', 'CREATE_TS', 'SESSION_ID')

  temp_false = temp_false.withColumn('is_first_session', F.lit(False))
  temp_true = temp_true.withColumn('is_first_session', F.lit(True))

  temp = temp_true.union(temp_false).orderBy('USER_ID', 'CREATE_TS')

  temp =temp.withColumn('create_date', F.to_date(F.col('CREATE_TS')))

  temp.show()

  temp.write.partitionBy('create_date').format("csv").save(path_output + 'sessions_data', header = 'true')

sessions_data()

+-------+--------+--------+-------------------+-------------+----------------+-----------+
|USER_ID|APP_NAME|PLATFORM|          CREATE_TS|   SESSION_ID|is_first_session|create_date|
+-------+--------+--------+-------------------+-------------+----------------+-----------+
|      1|   app_1|     ios|2022-01-01 01:09:32|1640999371742|            true| 2022-01-01|
|      2|   app_1|     ios|2022-01-01 18:55:17|1641063316272|            true| 2022-01-01|
|      2|   app_1|     ios|2022-01-01 18:55:33|1641063328206|           false| 2022-01-01|
|      2|   app_1|     ios|2022-01-01 18:58:17|1641063488674|           false| 2022-01-01|
|      2|   app_1|     ios|2022-01-01 19:28:03|1641065283271|           false| 2022-01-01|
|      2|   app_1|     ios|2022-01-02 21:11:41|1641157900472|           false| 2022-01-02|
|      2|   app_1|     ios|2022-01-03 01:40:04|1641174004106|           false| 2022-01-03|
|      2|   app_1|     ios|2022-01-04 03:28:57|1641266936380|           false| 2022-01-04|