# Customer churn analysis

#### We are tasked to build an end to end machine learning pipeline using snowpark for customer churn prediction in a telecom company.

In this notebook we will load, clean and transform the raw parquet dataset

Update the config.py file before moving on to the next cell



<img src="arch.jpg"/>


In [1]:
from snowflake.snowpark.session import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import *

import pandas as pd

from sklearn import linear_model

import matplotlib.pyplot as plt

%matplotlib inline
import datetime as dt
import numpy as np
import seaborn as sns

#Snowflake connection info is saved in config.py
from config import snowflake_conn_prop


# lets import some tranformations functions
from snowflake.snowpark.functions import udf, col, lit, translate, is_null, iff

In [2]:
from snowflake.snowpark import version
print(version.VERSION)

(0, 5, 0, None)


In [3]:
from snowflake.snowpark import version
print(version.VERSION)
#session.close()
session = Session.builder.configs(snowflake_conn_prop).create()
print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())

(0, 5, 0, None)
[Row(CURRENT_WAREHOUSE()='LAB_S_WH', CURRENT_DATABASE()='DEMO', CURRENT_SCHEMA()='TEST')]


## Infer file schema & Load Data into snowflake

In [4]:
filename = "raw_telco_data.parquet"
stagename = "rawdata"
rawtable = "RAW_PARQUET_DATA"

In [5]:
session.sql(f"create or replace stage {stagename} DIRECTORY = (ENABLE = TRUE);").collect()
session.file.put(filename,stagename)

[PutResult(source='raw_telco_data.parquet', target='raw_telco_data.parquet', source_size=3037540, target_size=3037552, source_compression='PARQUET', target_compression='PARQUET', status='UPLOADED', message='')]

In [6]:
session.sql("CREATE OR REPLACE FILE FORMAT MY_PARQUET_FORMAT TYPE = PARQUET;")

session.sql(f"CREATE OR REPLACE \
            TABLE {rawtable} USING TEMPLATE ( \
                SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*)) \
                FROM \
                    TABLE( INFER_SCHEMA( \
                    LOCATION => '@{stagename}/{filename}', \
                    FILE_FORMAT => 'MY_PARQUET_FORMAT' \
                    ) \
                ) \
            );  ").collect()

[Row(status='Table RAW_PARQUET_DATA successfully created.')]

## For incremental load 

you might want to just delete all rows and load new data to this table

In [7]:
dfClear = session.table(rawtable).delete()

In [8]:
dfRaw = session.read.option("compression","snappy").parquet(f"@{stagename}/{filename}")
dfRaw.copy_into_table(rawtable,MATCH_BY_COLUMN_NAME='CASE_SENSITIVE',FORCE= True)

[Row(file='rawdata/raw_telco_data.parquet', status='LOADED', rows_parsed=100000, rows_loaded=100000, error_limit=1, errors_seen=0, first_error=None, first_error_line=None, first_error_character=None, first_error_column_name=None)]

In [9]:
dfR = session.table(rawtable).sample(n=5)
dfR.toPandas()

Unnamed: 0,COUNTRY,CITY,PHONE SERVICE,MULTIPLE LINES,LATITUDE,ONLINE SECURITY,SENIOR CITIZEN,MONTHLY CHARGES,STREAMING MOVIES,PAYMENT METHOD,...,CHURN SCORE,GENDER,LONGITUDE,ONLINE BACKUP,TOTAL CHARGES,CLTV,CHURN REASON,DEVICE PROTECTION,STATE,ZIP CODE
0,United States,Van Nuys,Yes,Yes,34.195685,No,False,69.0,No,Mailed check,...,0,Female,-118.490752,No,1108.0,4439,do not know,Yes,California,91406
1,United States,Hornitos,Yes,No,37.479926,No internet service,False,20.0,No internet service,Mailed check,...,0,Female,-120.230424,No internet service,196.35,5837,do not know,No internet service,California,95325
2,United States,Los Angeles,Yes,Yes,34.061918,Yes,False,79.75,Yes,Credit card (automatic),...,0,Male,-118.277939,No,4217.8,4142,do not know,No,California,90057
3,United States,Weaverville,Yes,No,40.759401,No internet service,False,20.1,No internet service,Mailed check,...,0,Male,-122.939337,No internet service,279.5,2185,do not know,No internet service,California,96093
4,United States,Capistrano Beach,Yes,Yes,33.458754,No,False,84.2,Yes,Credit card (automatic),...,0,Male,-117.665104,Yes,5956.85,4753,do not know,Yes,California,92624


# Snowpark Transformations

The Snowpark API provides programming language constructs for building SQL statements. It's a new developer experience which enables us to build code in :-

<b><li>  Language of our choice </li></b>
<b><li> Tool of our choice and </li></b>
<b><li> Lazy execution to prevent multiple network hops to server </li></b>

Once the customer data is available in the RAW schema, we can use snowpark to create dimensions and fact tables. We will use the RAW_PARQUET table to create following tables -
    
<li> DEMOGRAPHICS </li>
<li> LOCATION </li>
<li> STATUS </li>
<li> SERVICES </li>

We will also transform and clean the data using Snowpark dataframe API

In [10]:
dfR = session.table(rawtable)

In [11]:
dfDemographics = dfR.select(col("CUSTOMERID"),
                             col("COUNT").alias("COUNT"),
                             translate(col("GENDER"),lit("NULL"),lit("Male")).alias("GENDER"),
                             col("SENIOR CITIZEN").alias("SENIORCITIZEN"),
                             col("PARTNER"),
                             col("DEPENDENTS")          
                            )


dfDemographics.write.mode('overwrite').saveAsTable('DEMOGRAPHICS')


In [12]:
dfLocation = dfR.select(col("CUSTOMERID"),
                         col("COUNTRY").name("COUNTRY"),
                         col("STATE").name("STATE"),
                         col("CITY").name("CITY"),
                         translate(col("ZIP CODE"),lit("NULL"),lit(0)).name("ZIPCODE"),
                         col("LAT LONG").name("LATLONG"),
                         col("LATITUDE").name("LATITUDE"),
                         col("LONGITUDE").name("LONGITUDE")       
                        )

dfLocation.write.mode('overwrite').saveAsTable('LOCATION')


#### you can run transformation on data using similar dataframe API constructs, for example -

In [13]:
dfServices = dfR.select(col("CUSTOMERID"),
                       col("TENURE MONTHS").name("TENUREMONTHS"),
                       iff(is_null(col("PHONE SERVICE")),lit('N'),col("PHONE SERVICE")).name("PHONESERVICE"),
                       iff(is_null(col("MULTIPLE LINES")),lit("No"),col("MULTIPLE LINES")).name("MULTIPLELINES"),
                       iff(is_null(col("INTERNET SERVICE")),lit("No"),col("INTERNET SERVICE")).name("INTERNETSERVICE"),
                       iff(is_null(col("ONLINE SECURITY")),lit("No"),col("ONLINE SECURITY")).name("ONLINESECURITY"),
                       iff(is_null(col("ONLINE BACKUP")),lit("No"),col("ONLINE BACKUP")).name("ONLINEBACKUP"),
                       iff(is_null(col("DEVICE PROTECTION")),lit("No"),col("DEVICE PROTECTION")).name("DEVICEPROTECTION"),
                       iff(is_null(col("TECH SUPPORT")),lit('N'),col("TECH SUPPORT")).name("TECHSUPPORT"),
                       iff(is_null(col("STREAMING TV")),lit("No"),col("STREAMING TV")).name("STREAMINGTV"),
                       iff(is_null(col("STREAMING MOVIES")),lit("No"),col("STREAMING MOVIES")).name("STREAMINGMOVIES"),
                       iff(is_null(col("CONTRACT")),lit("Month-to-month"),col("CONTRACT")).name("CONTRACT"),
                       iff(is_null(col("PAPERLESS BILLING")),lit('Y'),col("PAPERLESS BILLING")).name("PAPERLESSBILLING"),
                       iff(is_null(col("PAYMENT METHOD")),lit("Mailed check"),col("PAYMENT METHOD")).name("PAYMENTMETHOD"),
                       col("MONTHLY CHARGES").name("MONTHLYCHARGES"),
                       col("TOTAL CHARGES").name("TOTALCHARGES"),
                       col("CHURN VALUE").name("CHURNVALUE")        

                      )

dfServices.write.mode('overwrite').saveAsTable('SERVICES')

In [14]:
dfStatus = dfR.select(col("CUSTOMERID"),
                    iff(is_null(col("CHURN LABEL")),lit('N'),col("CHURN LABEL")).name("CHURNLABEL"),
                    col("CHURN VALUE").name("CHURNVALUE"),
                    col("CHURN SCORE").name("CHURNSCORE"),
                    col("CLTV").name("CLTV"),
                    iff(is_null(col("CHURN REASON")),lit("do not know"),col("CHURN REASON")).name("CHURNREASON")          
                    )

dfStatus.write.mode('overwrite').saveAsTable('STATUS')


# Lets check the data using an example query

This shows one of many uses of snowpark. You can run dataframes lazily.

In [15]:
# Lets run a query for quick sanity check
# This Query will show us the total revenue by city and contract term

dfLoc = session.table("LOCATION")
dfServ = session.table("SERVICES")

dfJoin = dfLoc.join(dfServ,dfLoc.col("CUSTOMERID") == dfServ.col("CUSTOMERID"))

dfResult = dfJoin.select(col("CITY"),
                         col("CONTRACT"),
                         col("TOTALCHARGES")).groupBy(col("CITY"),col("CONTRACT")).sum(col("TOTALCHARGES"))

dfResult.show()

-----------------------------------------------------------------
|"CITY"                  |"CONTRACT"      |"SUM(TOTALCHARGES)"  |
-----------------------------------------------------------------
|La Habra                |Month-to-month  |6828.35              |
|Glendale                |Month-to-month  |460483.05            |
|Ontario                 |Two year        |57487.6              |
|Oceanside               |Month-to-month  |49559.5              |
|March Air Reserve Base  |Month-to-month  |28670.95             |
|Lemon Cove              |Month-to-month  |40099.65             |
|Lancaster               |Month-to-month  |53851.799999999996   |
|Miramonte               |Two year        |194143.34999999998   |
|Fremont                 |Month-to-month  |77389.2              |
|Stevenson Ranch         |Month-to-month  |34417.45             |
-----------------------------------------------------------------



### Let's create a view for data science team to begin data analysis

In [16]:
print(session.sql('CREATE OR REPLACE VIEW TRAIN_DATASET \
                  AS \
                  SELECT D.CUSTOMERID, D.GENDER,D.SENIORCITIZEN,D.PARTNER,D.DEPENDENTS, \
                  S.PHONESERVICE, S.MULTIPLELINES, S.INTERNETSERVICE,S.ONLINESECURITY, \
                  S.ONLINEBACKUP, S.DEVICEPROTECTION, S.TECHSUPPORT, S.STREAMINGTV, \
                  S.STREAMINGMOVIES ,S.CONTRACT, S.PAPERLESSBILLING, S.PAYMENTMETHOD, \
                  S.TENUREMONTHS, S.MONTHLYCHARGES, S.TOTALCHARGES, \
                  S.CHURNVALUE \
                  FROM DEMOGRAPHICS D LEFT OUTER JOIN \
                  SERVICES S ON S.CUSTOMERID = D.CUSTOMERID').collect())


[Row(status='View TRAIN_DATASET successfully created.')]


In [17]:
%%time

raw = session.table('TRAIN_DATASET').sample(n = 20)
data = raw.toPandas()

CPU times: user 8.84 ms, sys: 3.73 ms, total: 12.6 ms
Wall time: 745 ms


# Off to ~02 notebook for exploratory data analysis

In [18]:
pd.pandas.set_option('display.max_columns', None)
data.head()

Unnamed: 0,CUSTOMERID,GENDER,SENIORCITIZEN,PARTNER,DEPENDENTS,PHONESERVICE,MULTIPLELINES,INTERNETSERVICE,ONLINESECURITY,ONLINEBACKUP,DEVICEPROTECTION,TECHSUPPORT,STREAMINGTV,STREAMINGMOVIES,CONTRACT,PAPERLESSBILLING,PAYMENTMETHOD,TENUREMONTHS,MONTHLYCHARGES,TOTALCHARGES,CHURNVALUE
0,3965-sjF5m,Female,False,True,False,Yes,No,Fiber optic,No,No,No,No,No,No,Month-to-month,True,Bank transfer (automatic),7,69.2,477.55,0.0
1,9465-FZ3JF,Male,False,True,False,Yes,No,No,No internet service,No internet service,No internet service,No internet service,No internet service,No internet service,One year,False,Credit card (automatic),33,20.1,620.55,0.0
2,8209-JgHyt,Female,False,False,False,Yes,Yes,Fiber optic,Yes,No,No,No,Yes,No,Month-to-month,True,Bank transfer (automatic),23,90.05,2169.8,1.0
3,4277-2shsd,Female,False,False,False,Yes,Yes,Fiber optic,Yes,No,No,Yes,Yes,Yes,Month-to-month,False,Bank transfer (automatic),30,106.4,3211.9,0.0
4,1422-3Gzdo,Male,True,True,False,Yes,Yes,Fiber optic,No,Yes,Yes,No,Yes,Yes,Month-to-month,True,Electronic check,43,103.0,4414.3,1.0
