In [123]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, max as max_
from pyspark.sql.window import Window
from pyspark.sql.functions import lag
from pyspark.sql.functions import countDistinct

In [4]:
spark = SparkSession.builder.appName("Premium vs Freemium").getOrCreate()

In [5]:
df_acc_input = spark.read.options(header ='True', InferSchema='True').csv('ms_acc_dimension.csv')
df_user_input = spark.read.options(header ='True', InferSchema='True').csv('ms_user_dimension.csv')
df_download_input = spark.read.options(header ='True', InferSchema='True').csv('ms_download_facts.csv')

In [6]:
df_acc_input.show()
df_user_input.show()
df_download_input.show()

+------+---------------+
|acc_id|paying_customer|
+------+---------------+
|   700|             no|
|   701|             no|
|   702|             no|
|   703|             no|
|   704|             no|
|   705|             no|
|   706|             no|
|   707|             no|
|   708|             no|
|   709|             no|
|   710|             no|
|   711|             no|
|   712|             no|
|   713|             no|
|   714|             no|
|   715|             no|
|   716|             no|
|   717|             no|
|   718|             no|
|   719|             no|
+------+---------------+
only showing top 20 rows

+-------+------+
|user_id|acc_id|
+-------+------+
|      1|   716|
|      2|   749|
|      3|   713|
|      4|   744|
|      5|   726|
|      6|   706|
|      7|   750|
|      8|   732|
|      9|   706|
|     10|   729|
|     11|   748|
|     12|   731|
|     13|   739|
|     14|   740|
|     15|   705|
|     16|   706|
|     17|   701|
|     18|   746|
|     19|   726|


In [27]:
df_acc_user = df_acc_input.join(df_user_input, df_acc_input.acc_id == df_user_input.acc_id, 'inner').orderBy('user_id').select(df_acc_input['acc_id'],'user_id','paying_customer')

In [44]:
df_user_download =  df_acc_user.join(df_download_input, df_acc_user.user_id == df_download_input.user_id, 'inner').orderBy(df_acc_user['user_id']).select(df_acc_user['user_id'],df_acc_input['acc_id'],'paying_customer','date','downloads')

In [45]:
df_user_download.show()

+-------+------+---------------+----------+---------+
|user_id|acc_id|paying_customer|      date|downloads|
+-------+------+---------------+----------+---------+
|      1|   716|             no|24-08-2020|        6|
|      2|   749|            yes|22-08-2020|        6|
|      3|   713|             no|18-08-2020|        2|
|      4|   744|            yes|24-08-2020|        4|
|      5|   726|            yes|19-08-2020|        7|
|      6|   706|             no|21-08-2020|        3|
|      7|   750|            yes|24-08-2020|        1|
|      8|   732|            yes|24-08-2020|        8|
|      9|   706|             no|17-08-2020|        5|
|     10|   729|            yes|16-08-2020|        4|
|     11|   748|            yes|22-08-2020|        8|
|     12|   731|            yes|19-08-2020|        6|
|     13|   739|            yes|15-08-2020|        3|
|     14|   740|            yes|21-08-2020|        0|
|     15|   705|             no|24-08-2020|        0|
|     16|   706|            

In [117]:
df_sum = df_user_download.groupBy('date','paying_customer').sum('downloads').orderBy('date').select('date','paying_customer',col("sum(downloads)").alias("total"))

In [130]:
df_payable = df_sum.filter(df_sum.paying_customer == 'yes').select('date','paying_customer',col("total").alias("paying"))
df_non_payable = df_sum.filter(df_sum.paying_customer == 'no').select('date','paying_customer',col("total").alias("non_paying"))

In [135]:
df_payable.show()
df_non_payable.show()

+----------+---------------+------+
|      date|paying_customer|paying|
+----------+---------------+------+
|15-08-2020|            yes|    19|
|16-08-2020|            yes|    14|
|17-08-2020|            yes|     9|
|18-08-2020|            yes|     7|
|19-08-2020|            yes|    13|
|20-08-2020|            yes|    28|
|21-08-2020|            yes|    17|
|22-08-2020|            yes|    48|
|23-08-2020|            yes|    23|
|24-08-2020|            yes|    39|
|25-08-2020|            yes|    30|
+----------+---------------+------+

+----------+---------------+----------+
|      date|paying_customer|non_paying|
+----------+---------------+----------+
|15-08-2020|             no|        11|
|16-08-2020|             no|        15|
|17-08-2020|             no|        45|
|18-08-2020|             no|        10|
|19-08-2020|             no|        13|
|20-08-2020|             no|        13|
|21-08-2020|             no|        32|
|22-08-2020|             no|        15|
|23-08-2020|       

In [134]:
df_output = df_payable.join(df_non_payable, df_payable.date == df_non_payable.date, 'inner').orderBy(df_payable['date']).select(df_payable['date'],df_non_payable['non_paying'],df_payable['paying']).show()

+----------+----------+------+
|      date|non_paying|paying|
+----------+----------+------+
|15-08-2020|        11|    19|
|16-08-2020|        15|    14|
|17-08-2020|        45|     9|
|18-08-2020|        10|     7|
|19-08-2020|        13|    13|
|20-08-2020|        13|    28|
|21-08-2020|        32|    17|
|22-08-2020|        15|    48|
|23-08-2020|        12|    23|
|24-08-2020|         6|    39|
|25-08-2020|        23|    30|
+----------+----------+------+

