# Customer churn analysis

#### We are tasked to build an end to end machine learning pipeline using snowpark for customer churn prediction in a telecom company.

In this notebook we will load, clean and transform the raw parquet dataset

**Update the config.py file before moving on to the next cell**


In [1]:
from snowflake.snowpark.session import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import *

import pandas as pd

from sklearn import linear_model

import matplotlib.pyplot as plt

%matplotlib inline
import datetime as dt
import numpy as np
import seaborn as sns

#Snowflake connection info is saved in config.py
from config import snowflake_conn_prop


# lets import some tranformations functions
from snowflake.snowpark.functions import udf, col, lit, translate, is_null, iff

In [2]:
from snowflake.snowpark import version
print(version.VERSION)

(1, 0, 0)


Let's configure our Snowpark Session and initialize the database, warehouse, and schema that we will use for the remainder of the quickstart.

In [3]:
from snowflake.snowpark import version
print(version.VERSION)
#session.close()
session = Session.builder.configs(snowflake_conn_prop).create()
session.sql("use role accountadmin").collect()
session.sql("create database if not exists  {}".format(snowflake_conn_prop['database'])).collect()
session.sql("use database {}".format(snowflake_conn_prop['database'])).collect()
session.sql("create schema if not exists {}".format(snowflake_conn_prop['schema'])).collect()
session.sql("use schema {}".format(snowflake_conn_prop['schema'])).collect()
session.sql("create or replace warehouse {} with \
                WAREHOUSE_SIZE = XSMALL \
                AUTO_SUSPEND = 120 \
                AUTO_RESUME = TRUE".format(snowflake_conn_prop['warehouse'])).collect()
session.sql("use warehouse {}".format(snowflake_conn_prop['warehouse']))
print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())

(1, 0, 0)
[Row(CURRENT_WAREHOUSE()='LAB_S_WH', CURRENT_DATABASE()='FEAST_SF', CURRENT_SCHEMA()='PUBLIC')]


## Infer file schema & Load Data into snowflake

In [4]:
filename = "raw_telco_data_dt.parquet"
stagename = "rawdata"
rawtable = "RAW_PARQUET_DATA"

In [5]:
session.sql(f"create or replace stage {stagename} DIRECTORY = (ENABLE = TRUE);").collect()
session.file.put(filename,stagename)

[PutResult(source='raw_telco_data_dt.parquet', target='raw_telco_data_dt.parquet', source_size=3039492, target_size=3039504, source_compression='PARQUET', target_compression='PARQUET', status='UPLOADED', message='')]

In [6]:
session.sql("CREATE OR REPLACE FILE FORMAT MY_PARQUET_FORMAT TYPE = PARQUET;").collect()

session.sql(f"CREATE OR REPLACE \
            TABLE {rawtable} USING TEMPLATE ( \
                SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*)) \
                FROM \
                    TABLE( INFER_SCHEMA( \
                    LOCATION => '@{stagename}/{filename}', \
                    FILE_FORMAT => 'MY_PARQUET_FORMAT' \
                    ) \
                ) \
            );  ").collect()

[Row(status='Table RAW_PARQUET_DATA successfully created.')]

## For incremental load 

you might want to just delete all rows and load new data to this table

In [7]:
dfClear = session.table(rawtable).delete()

In [8]:
dfRaw = session.read.option("compression","snappy").parquet(f"@{stagename}/{filename}")
dfRaw.copy_into_table(rawtable,FORCE= True)

[Row(file='rawdata/raw_telco_data_dt.parquet', status='LOADED', rows_parsed=100000, rows_loaded=100000, error_limit=1, errors_seen=0, first_error=None, first_error_line=None, first_error_character=None, first_error_column_name=None)]

In [9]:
dfR = session.table(rawtable).sample(n=5)
dfR.to_pandas()

Unnamed: 0,COUNTRY,CITY,PHONE SERVICE,MULTIPLE LINES,LATITUDE,ONLINE SECURITY,SENIOR CITIZEN,MONTHLY CHARGES,STREAMING MOVIES,PAYMENT METHOD,...,LONGITUDE,ONLINE BACKUP,TOTAL CHARGES,CLTV,CHURN REASON,DEVICE PROTECTION,STATE,ZIP CODE,EVENT_TIMESTAMP,CREATED
0,United States,Orinda,Yes,No,37.873916,Yes,False,47.85,No,Electronic check,...,-122.20522,No,1886.4,5531,do not know,No,California,94563,2022-10-07,2022-10-07
1,United States,Tulelake,No,No phone service,41.813521,No,False,50.95,Yes,Credit card (automatic),...,-121.492666,Yes,229.4,3573,do not know,No,California,96134,2022-10-07,2022-10-07
2,United States,El Cajon,Yes,Yes,32.785165,No,False,91.5,No,Credit card (automatic),...,-116.862648,No,3236.35,3617,do not know,No,California,92019,2022-10-07,2022-10-07
3,United States,Borrego Springs,Yes,Yes,33.200369,No,True,54.8,No,Credit card (automatic),...,-116.192313,Yes,1291.3,4330,do not know,No,California,92004,2022-10-07,2022-10-07
4,United States,Angelus Oaks,Yes,No,34.1678,No internet service,False,19.9,No internet service,Mailed check,...,-116.86433,No internet service,19.9,5441,Competitor offered more data,No internet service,California,92305,2022-10-07,2022-10-07


# Snowpark Transformations

The Snowpark API provides programming language constructs for building SQL statements. It's a new developer experience which enables us to build code in :-

<b><li>  Language of our choice </li></b>
<b><li> Tool of our choice and </li></b>
<b><li> Lazy execution to prevent multiple network hops to server </li></b>

Once the customer data is available in the RAW schema, we can use snowpark to create dimensions and fact tables. We will use the RAW_PARQUET table to create following tables -
    
<li> DEMOGRAPHICS </li>
<li> LOCATION </li>
<li> STATUS </li>
<li> SERVICES </li>

We will also transform and clean the data using Snowpark dataframe API

In [10]:
dfR = session.table(rawtable)

In [11]:
dfDemographics = dfR.select(col("CUSTOMERID"),
                             col("COUNT").alias("COUNT"),
                             translate(col("GENDER"),lit("NULL"),lit("Male")).alias("GENDER"),
                             col("SENIOR CITIZEN").alias("SENIORCITIZEN"),
                             col("PARTNER"),
                             col("DEPENDENTS"),
                             col('EVENT_TIMESTAMP'),
                             col('CREATED')
                            )


dfDemographics.write.mode('overwrite').saveAsTable('DEMOGRAPHICS')
dfDemographics.show()


------------------------------------------------------------------------------------------------------------------------------
|"CUSTOMERID"  |"COUNT"  |"GENDER"  |"SENIORCITIZEN"  |"PARTNER"  |"DEPENDENTS"  |"EVENT_TIMESTAMP"    |"CREATED"            |
------------------------------------------------------------------------------------------------------------------------------
|7090-ZyCMx    |1        |Female    |False            |False      |True          |2022-10-07 00:00:00  |2022-10-07 00:00:00  |
|1364-wJXMS    |1        |Female    |False            |False      |True          |2022-10-07 00:00:00  |2022-10-07 00:00:00  |
|6564-sLgIC    |1        |Male      |True             |False      |True          |2022-10-07 00:00:00  |2022-10-07 00:00:00  |
|7853-2xheR    |1        |Male      |False            |False      |True          |2022-10-07 00:00:00  |2022-10-07 00:00:00  |
|8457-E9FuW    |1        |Female    |False            |False      |True          |2022-10-07 00:00:00  |2022-10

In [12]:
dfLocation = dfR.select(col("CUSTOMERID"),
                         col("COUNTRY").name("COUNTRY"),
                         col("STATE").name("STATE"),
                         col("CITY").name("CITY"),
                         translate(col("ZIP CODE"),lit("NULL"),lit(0)).name("ZIPCODE"),
                         col("LAT LONG").name("LATLONG"),
                         col("LATITUDE").name("LATITUDE"),
                         col("LONGITUDE").name("LONGITUDE")       
                        )

dfLocation.write.mode('overwrite').saveAsTable('LOCATION')
dfLocation.show()


-------------------------------------------------------------------------------------------------------------------------------
|"CUSTOMERID"  |"COUNTRY"      |"STATE"     |"CITY"           |"ZIPCODE"  |"LATLONG"               |"LATITUDE"  |"LONGITUDE"  |
-------------------------------------------------------------------------------------------------------------------------------
|7090-ZyCMx    |United States  |California  |Los Angeles      |90005      |34.059281, -118.30742   |34.059281   |-118.307420  |
|1364-wJXMS    |United States  |California  |Los Angeles      |90006      |34.048013, -118.293953  |34.048013   |-118.293953  |
|6564-sLgIC    |United States  |California  |Los Angeles      |90065      |34.108833, -118.229715  |34.108833   |-118.229715  |
|7853-2xheR    |United States  |California  |La Habra         |90631      |33.940619, -117.9513    |33.940619   |-117.951300  |
|8457-E9FuW    |United States  |California  |Glendale         |91206      |34.162515, -118.203869  |34.1

#### you can run transformation on data using similar dataframe API constructs, for example -

In [13]:
dfServices = dfR.select(col("CUSTOMERID"),
                       col("TENURE MONTHS").name("TENUREMONTHS"),
                       iff(is_null(col("PHONE SERVICE")),lit('N'),col("PHONE SERVICE")).name("PHONESERVICE"),
                       iff(is_null(col("MULTIPLE LINES")),lit("No"),col("MULTIPLE LINES")).name("MULTIPLELINES"),
                       iff(is_null(col("INTERNET SERVICE")),lit("No"),col("INTERNET SERVICE")).name("INTERNETSERVICE"),
                       iff(is_null(col("ONLINE SECURITY")),lit("No"),col("ONLINE SECURITY")).name("ONLINESECURITY"),
                       iff(is_null(col("ONLINE BACKUP")),lit("No"),col("ONLINE BACKUP")).name("ONLINEBACKUP"),
                       iff(is_null(col("DEVICE PROTECTION")),lit("No"),col("DEVICE PROTECTION")).name("DEVICEPROTECTION"),
                       iff(is_null(col("TECH SUPPORT")),lit('N'),col("TECH SUPPORT")).name("TECHSUPPORT"),
                       iff(is_null(col("STREAMING TV")),lit("No"),col("STREAMING TV")).name("STREAMINGTV"),
                       iff(is_null(col("STREAMING MOVIES")),lit("No"),col("STREAMING MOVIES")).name("STREAMINGMOVIES"),
                       iff(is_null(col("CONTRACT")),lit("Month-to-month"),col("CONTRACT")).name("CONTRACT"),
                       iff(is_null(col("PAPERLESS BILLING")),lit('Y'),col("PAPERLESS BILLING")).name("PAPERLESSBILLING"),
                       iff(is_null(col("PAYMENT METHOD")),lit("Mailed check"),col("PAYMENT METHOD")).name("PAYMENTMETHOD"),
                       col("MONTHLY CHARGES").name("MONTHLYCHARGES"),
                       col("TOTAL CHARGES").name("TOTALCHARGES"),
                       col("CHURN VALUE").name("CHURNVALUE"),
                       col("EVENT_TIMESTAMP").name("EVENT_TIMESTAMP"),
                       col("CREATED").name("CREATED")
                      )

dfServices.write.mode('overwrite').saveAsTable('SERVICES')
dfServices.show()

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"CUSTOMERID"  |"TENUREMONTHS"  |"PHONESERVICE"  |"MULTIPLELINES"  |"INTERNETSERVICE"  |"ONLINESECURITY"     |"ONLINEBACKUP"       |"DEVICEPROTECTION"   |"TECHSUPPORT"        |"STREAMINGTV"        |"STREAMINGMOVIES"    |"CONTRACT"      |"PAPERLESSBILLING"  |"PAYMENTMETHOD"   |"MONTHLYCHARGES"  |"TOTALCHARGES"  |"CHURNVALUE"  |"EVENT_TIMESTAMP"    |"CREATED"            |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [14]:
dfStatus = dfR.select(col("CUSTOMERID"),
                    iff(is_null(col("CHURN LABEL")),lit('N'),col("CHURN LABEL")).name("CHURNLABEL"),
                    col("CHURN VALUE").name("CHURNVALUE"),
                    col("CHURN SCORE").name("CHURNSCORE"),
                    col("CLTV").name("CLTV"),
                    iff(is_null(col("CHURN REASON")),lit("do not know"),col("CHURN REASON")).name("CHURNREASON")          
                    )

dfStatus.write.mode('overwrite').saveAsTable('STATUS')
dfStatus.show()


-----------------------------------------------------------------------------------------------------------------
|"CUSTOMERID"  |"CHURNLABEL"  |"CHURNVALUE"  |"CHURNSCORE"  |"CLTV"  |"CHURNREASON"                             |
-----------------------------------------------------------------------------------------------------------------
|7090-ZyCMx    |true          |1.0           |1             |2701    |Moved                                     |
|1364-wJXMS    |true          |1.0           |1             |5372    |Moved                                     |
|6564-sLgIC    |true          |1.0           |1             |3179    |Competitor made better offer              |
|7853-2xheR    |true          |1.0           |1             |4415    |Product dissatisfaction                   |
|8457-E9FuW    |true          |1.0           |1             |5142    |Price too high                            |
|5718-ykxBT    |true          |1.0           |1             |2484    |Poor expertise of 

# Lets check the data using an example query

This shows one of many uses of snowpark. You can build and query dataframes lazily.

In [15]:
# Lets run a query for quick sanity check
# This Query will show us the total revenue by city and contract term

dfLoc = session.table("LOCATION")
dfServ = session.table("SERVICES")

dfJoin = dfLoc.join(dfServ,dfLoc.col("CUSTOMERID") == dfServ.col("CUSTOMERID"))

dfResult = dfJoin.select(col("CITY"),
                         col("CONTRACT"),
                         col("TOTALCHARGES")).groupBy(col("CITY"),col("CONTRACT")).sum(col("TOTALCHARGES"))

dfResult.show()

----------------------------------------------------------
|"CITY"           |"CONTRACT"      |"SUM(TOTALCHARGES)"  |
----------------------------------------------------------
|Los Angeles      |Month-to-month  |3931004.7            |
|La Habra         |Month-to-month  |6828.35              |
|Glendale         |Month-to-month  |460483.05            |
|Burbank          |Month-to-month  |378354.4             |
|Ontario          |Two year        |57487.6              |
|Alpine           |Month-to-month  |69186.04999999999    |
|Borrego Springs  |Month-to-month  |94737.0              |
|Oceanside        |Month-to-month  |49559.5              |
|Niland           |Month-to-month  |24946.0              |
|San Bernardino   |Month-to-month  |253583.3             |
----------------------------------------------------------



### We have the base datasetup done. Let's move on to notebook 02

We will setup Feast to create offline feature store that will be configured to extract features from `DEMOGRAPHICS` and `SERVICES` tables for model training.

In [16]:
session.close()