<a href="https://colab.research.google.com/github/cht123/Login-streak-calculator/blob/master/login_streak.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Load Spark

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
from pyspark.sql.functions import to_date, floor
import pyspark.sql.functions as F
from pyspark.sql.window import Window


# Purpose

The purpose of this notebook is to define an approach for monitoring monthly login streaks for customers.  

1. The period is monthly
2. A streak is only extended when logins happen in consecutive months


# Method

1. The month being evaluated is the reference month for all other months in the dataset. It is the main input parameter for the process.  The reference day is the last day of the reference month.
2. Create a column date_diff which is calculated as the floor of the rounded difference between the reference month's last day and the login date for each customer.  This is calle the reference day.
3. Filter all logins that occur a after the reference day.  These will have a negative date_diff.
4. Do a distinct on customer and date_diff.  This will provide a consecutive integer for each prior month the customer has logged in.
5. Partition by customer and do a count and a row count
6. create a flag (good flag) that indicates whether the date_diff = the row count.  If the logins are consecutive then they will be equal.
7. Get the max of the count for a customer where the good flag = 1.  This is the length of the customers login streak. 

# Create the counts step by step

## Create Sample Dataframe

In [0]:
hbdf = spark.createDataFrame([('27-DEC-19','A',), 
                            ('15-DEC-19','A',),
                            ('23-NOV-19','A',),
                            ('15-OCT-19','A',),
                            ('12-DEC-19','B',),
                            ('07-DEC-19','B',),
                            ('03-NOV-19','B',),
                            ('15-NOV-19','B',),
                            ('18-NOV-19','C',),
                            ('18-OCT-19','C',),
                            ('18-DEC-19','D',),
                            ('18-OCT-19','D',),
                            ('18-AUG-19','D',),
                            ('23-DEC-19','K',),
                            ('24-NOV-19','K',),
                            ('06-OCT-19','K',),
                            ('11-SEP-19','K',),
                            ('13-AUG-19','K',),
                            ('13-JAN-19','K',)
                            ], X C['LOGIN_STRING', 'cust'])


## Fix the date

In [0]:
# convert the login date to a date type
df = df.withColumn('LOGIN_DATE', F.to_date(df.LOGIN_STRING, 'dd-MMM-yy'))

In [0]:
df = df.drop('LOGIN_STRING')

In [0]:
df.show()

+----+----------+
|cust|LOGIN_DATE|
+----+----------+
|   A|2019-12-27|
|   A|2019-12-15|
|   A|2019-11-23|
|   A|2019-10-15|
|   B|2019-12-12|
|   B|2019-12-07|
|   B|2019-11-03|
|   B|2019-11-15|
|   C|2019-11-18|
|   C|2019-10-18|
|   D|2019-12-18|
|   D|2019-10-18|
|   D|2019-08-18|
|   K|2019-12-23|
|   K|2019-11-24|
|   K|2019-10-06|
|   K|2019-09-11|
|   K|2019-08-13|
|   K|2019-01-13|
+----+----------+



## Calculate the streak

In [0]:
# get the date diff from the reference start_date
start_date = "2019-12-31"
df = df.withColumn('date_diff', floor(F.months_between( to_date(F.lit(start_date)), df.LOGIN_DATE)))

In [0]:
df.show()

+----+----------+---------+
|cust|LOGIN_DATE|date_diff|
+----+----------+---------+
|   A|2019-12-27|        0|
|   A|2019-12-15|        0|
|   A|2019-11-23|        1|
|   A|2019-10-15|        2|
|   B|2019-12-12|        0|
|   B|2019-12-07|        0|
|   B|2019-11-03|        1|
|   B|2019-11-15|        1|
|   C|2019-11-18|        1|
|   C|2019-10-18|        2|
|   D|2019-12-18|        0|
|   D|2019-10-18|        2|
|   D|2019-08-18|        4|
|   K|2019-12-23|        0|
|   K|2019-11-24|        1|
|   K|2019-10-06|        2|
|   K|2019-09-11|        3|
|   K|2019-08-13|        4|
|   K|2019-01-13|       11|
+----+----------+---------+



In [0]:
# remove logins that occur after the reference date
df = df.filter(df['date_diff'] >= 0)

In [0]:
df.show()

+----+----------+---------+
|cust|LOGIN_DATE|date_diff|
+----+----------+---------+
|   A|2019-12-27|        0|
|   A|2019-12-15|        0|
|   A|2019-11-23|        1|
|   A|2019-10-15|        2|
|   B|2019-12-12|        0|
|   B|2019-12-07|        0|
|   B|2019-11-03|        1|
|   B|2019-11-15|        1|
|   C|2019-11-18|        1|
|   C|2019-10-18|        2|
|   D|2019-12-18|        0|
|   D|2019-10-18|        2|
|   D|2019-08-18|        4|
|   K|2019-12-23|        0|
|   K|2019-11-24|        1|
|   K|2019-10-06|        2|
|   K|2019-09-11|        3|
|   K|2019-08-13|        4|
|   K|2019-01-13|       11|
+----+----------+---------+



In [0]:
# get the disting login months relative to the reference date
df = df.select('cust', 'date_diff').distinct().orderBy(*['cust', 'date_diff'])

In [0]:
df.show()

+----+---------+
|cust|date_diff|
+----+---------+
|   A|        0|
|   A|        1|
|   A|        2|
|   B|        0|
|   B|        1|
|   C|        1|
|   C|        2|
|   D|        0|
|   D|        2|
|   D|        4|
|   K|        0|
|   K|        1|
|   K|        2|
|   K|        3|
|   K|        4|
|   K|       11|
+----+---------+



In [0]:
# partition the date by customer and order by the date diff, this is done so the rownums will be equal if they are consecutive
window = Window.orderBy("date_diff").partitionBy("cust")


In [0]:
# add this so you have the correct number of consecutive months, you could jsut add one to the row_num
df = df.withColumn( "consecutive_count", F.count(df["date_diff"]).over(window))

In [0]:
# add the row_num so you can compare it to the date_diff
df = df.withColumn("row_num", F.row_number().over(window) - 1)

In [0]:
# check if the date_diff == the row_num, if it does then the month is consecutive
df = df.withColumn("good_flag", F.when(df.date_diff == df.row_num, 1).otherwise(0))

In [0]:
df.show()

+----+---------+-----------------+-------+---------+
|cust|date_diff|consecutive_count|row_num|good_flag|
+----+---------+-----------------+-------+---------+
|   K|        0|                1|      0|        1|
|   K|        1|                2|      1|        1|
|   K|        2|                3|      2|        1|
|   K|        3|                4|      3|        1|
|   K|        4|                5|      4|        1|
|   K|       11|                6|      5|        0|
|   B|        0|                1|      0|        1|
|   B|        1|                2|      1|        1|
|   D|        0|                1|      0|        1|
|   D|        2|                2|      1|        0|
|   D|        4|                3|      2|        0|
|   C|        1|                1|      0|        0|
|   C|        2|                2|      1|        0|
|   A|        0|                1|      0|        1|
|   A|        1|                2|      1|        1|
|   A|        2|                3|      2|    

In [0]:
# limit the date to only the months where the are consecutive, then get the max consecutive count for each customer
df = df.filter(df.good_flag == 1).groupBy('cust').agg(F.max('consecutive_count').alias('max_streak'))

In [0]:
df.show()

+----+----------+
|cust|max_streak|
+----+----------+
|   K|         5|
|   B|         2|
|   D|         1|
|   A|         3|
+----+----------+



# Convert it to a function

The funciton can be run in a loop on a list of months, it does all the same stuff as above

## Create the sample dataframe

In [0]:
d2 = spark.createDataFrame([('27-DEC-19','A',), 
                            ('15-DEC-19','A',),
                            ('23-NOV-19','A',),
                            ('15-OCT-19','A',),
                            ('12-DEC-19','B',),
                            ('07-DEC-19','B',),
                            ('03-NOV-19','B',),
                            ('15-NOV-19','B',),
                            ('18-NOV-19','C',),
                            ('18-OCT-19','C',),
                            ('18-DEC-19','D',),
                            ('18-OCT-19','D',),
                            ('18-AUG-19','D',),
                            ('23-DEC-19','K',),
                            ('24-NOV-19','K',),
                            ('06-OCT-19','K',),
                            ('11-SEP-19','K',),
                            ('13-AUG-19','K',),
                            ('13-JAN-19','K',)
                            ], ['LOGIN_STRING', 'cust'])

## Convert the date

In [0]:
d2 = d2.withColumn('LOGIN_DATE', F.to_date(d2.LOGIN_STRING, 'dd-MMM-yy'))

In [0]:
d2 = d2.drop('LOGIN_STRING')

## Create the function that calculates the streak

In [0]:
def get_login_streaks(df, last_day_of_month):

  '''
  Calculates the length of the consecutive login streak for customers

  df -- the dataframe that has the login data for customers

  last_day_of_month -- the final date of the final month of the login streak being calculated (e.g., "2019-12-31") 

  '''
  df2 = df.withColumn('date_diff', floor(F.months_between( to_date(F.lit(last_day_of_month)), df.LOGIN_DATE)))\
          .filter(F.col('date_diff') >= 0)\
          .select('cust', 'date_diff').distinct().orderBy(*['cust', 'date_diff'])\
          .withColumn('row_num', F.row_number().over(Window.orderBy('date_diff').partitionBy('cust'))-1)\
          .withColumn( "consecutive_count", F.count('cust').over(Window.orderBy('date_diff').partitionBy('cust')))\
          .withColumn('good_flag', F.when(F.col('date_diff') == F.col('row_num'), 1).otherwise(0))\
          .filter(F.col('good_flag') == 1).groupBy('cust').agg(F.max('consecutive_count').alias('max_streak'))
          # do a group by max_Streak and count customers
          # send it toPandas()
          # add logic to get the 2 and 3, if needed
          # make it a row to append to a dataframe
          # run in loop for dates
  return df2



In [0]:
tbl = get_login_streaks(d2, "2019-12-31")

In [0]:
tbl.show()

+----+----------+
|cust|max_streak|
+----+----------+
|   K|         5|
|   B|         2|
|   D|         1|
|   A|         3|
+----+----------+



## Scratch

In [0]:
# confirm months_between is 1 for non-30 day months
dfa = spark.createDataFrame([('2019-12-31', '2019-10-31')], ['t', 'd'])
dfa.select(F.months_between(dfa.t, dfa.d).alias('months')).collect()

[Row(months=2.0)]

In [0]:
dfa.show()

+----------+----------+
|         t|         d|
+----------+----------+
|2019-12-31|2019-10-31|
+----------+----------+

