# Demonstration of customer churn analysis on streaming data through Spark Streaming

## Uploading and checking for presence of 10 sample files for simulating streaming

In [0]:
dbutils.fs.ls('/FileStore/shared_uploads/kukka006@umn.edu/stream/')

Out[34]: [FileInfo(path='dbfs:/FileStore/shared_uploads/kukka006@umn.edu/stream/Sample1.csv', name='Sample1.csv', size=581, modificationTime=1651298618000),
 FileInfo(path='dbfs:/FileStore/shared_uploads/kukka006@umn.edu/stream/Sample10.csv', name='Sample10.csv', size=570, modificationTime=1651342086000),
 FileInfo(path='dbfs:/FileStore/shared_uploads/kukka006@umn.edu/stream/Sample2.csv', name='Sample2.csv', size=1410, modificationTime=1651298618000),
 FileInfo(path='dbfs:/FileStore/shared_uploads/kukka006@umn.edu/stream/Sample3.csv', name='Sample3.csv', size=734, modificationTime=1651298619000),
 FileInfo(path='dbfs:/FileStore/shared_uploads/kukka006@umn.edu/stream/Sample4.csv', name='Sample4.csv', size=319, modificationTime=1651298619000),
 FileInfo(path='dbfs:/FileStore/shared_uploads/kukka006@umn.edu/stream/Sample5.csv', name='Sample5.csv', size=570, modificationTime=1651298620000),
 FileInfo(path='dbfs:/FileStore/shared_uploads/kukka006@umn.edu/stream/Sample6.csv', name='Sample6.c

## Streaming data processing requires pre-specification of schema of data streaming in

In [0]:
schema1 = StructType([StructField('label',IntegerType(),True),
                      StructField('downgraded',IntegerType(),True),
                      StructField('registered_days',IntegerType(),True),
                      StructField('avg_daily_items',DoubleType(),True),
                      StructField('avg_session',DoubleType(),True),
                      StructField('avg_daily_song',DoubleType(),True),
                      StructField('gender_index',DoubleType(),False),
                      StructField('level_preChurn_index',DoubleType(),False),
                      StructField('agent_index',DoubleType(),False),
                      StructField('state_index',DoubleType(),False)])

## Specifying source of streaming data - in this case source of sample files

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *  
spark.conf.set("spark.sql.shuffle.partitions", "2") 
streamingDataframe = (
  spark
    .readStream
    .schema(schema1)
    .option("maxFilesPerTrigger", 1) \
    .format("csv")\
    .option("header", "true")\
    .load('/FileStore/shared_uploads/kukka006@umn.edu/stream/')
)

## Specifying the analyses that needs to be run on streaming data - these analyses will automatically be done on streaming data. In this case we are checking churn prediction distribution by state in real time.

In [0]:
churn_rate = modelRF.transform(streamingDataframe).groupBy('state_index').agg((avg('prediction').alias('churn_rate')))
churn_rate.isStreaming

Out[37]: True

## Specifying the mode of ouput of the stream data - in this case we are storing the new results in memory and calling the dataset "counts". This also starts the stream.

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", "2") 
query = (
  churn_rate
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only in Spark 2.0)
    .queryName("counts")     # counts = name of the in-memory table            
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

## Querying the live streaming data for state level cutomer churn - analysis updates automatically upon requerying this data after a few seconds

In [0]:
%sql select state_index as state,churn_rate from counts order by state_index

state,churn_rate
0.0,0.1
1.0,0.1666666666666666
2.0,0.1666666666666666
3.0,0.0
4.0,0.0
5.0,0.0
8.0,0.0
9.0,0.0
10.0,0.5
13.0,0.0


## Stopping the stream

In [0]:
query.stop()