# Customer churn analysis

#### We are tasked to build an end to end machine learning pipeline using snowpark for customer churn prediction in a telecom company.

In this notebook we will load, clean and transform the raw parquet dataset

Update the config.py file before moving on to the next cell



<img src="arch.jpg"/>


In [17]:
from snowflake.snowpark.session import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import *

import pandas as pd

from sklearn import linear_model

import matplotlib.pyplot as plt

%matplotlib inline
import datetime as dt
import numpy as np
import seaborn as sns

#Snowflake connection info is saved in config.py
from config import snowflake_conn_prop


# lets import some tranformations functions
from snowflake.snowpark.functions import udf, col, lit, translate, is_null, iff

In [18]:
# snowflake_conn_prop['account']='FY45510'
print(snowflake_conn_prop)

{'account': 'FY45510', 'user': 'AOLSON2', 'password': 'Password123', 'role': 'ACCOUNTADMIN', 'database': 'snowpark_quickstart', 'schema': 'TELCO', 'warehouse': 'sp_qs_wh'}


Let's configure our Snowpark Session and initialize the database, warehouse, and schema that we will use for the remainder of the quickstart.

In [12]:
from snowflake.snowpark import version
print(version.VERSION)
#session.close()
session = Session.builder.configs(snowflake_conn_prop).create()
session.sql("use role accountadmin").collect()
session.sql("create database if not exists  {}".format(snowflake_conn_prop['database'])).collect()
session.sql("use database {}".format(snowflake_conn_prop['database'])).collect()
session.sql("create schema if not exists {}".format(snowflake_conn_prop['schema'])).collect()
session.sql("use schema {}".format(snowflake_conn_prop['schema'])).collect()
session.sql("create or replace warehouse {} with \
                WAREHOUSE_SIZE = XSMALL \
                AUTO_SUSPEND = 120 \
                AUTO_RESUME = TRUE".format(snowflake_conn_prop['warehouse'])).collect()
session.sql("use warehouse {}".format(snowflake_conn_prop['warehouse']))
print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())

(0, 7, 0)


ForbiddenError: 250001 (08001): Failed to connect to DB. Verify the account name is correct: FY45510.snowflakecomputing.com:443. 000403: HTTP 403: Forbidden

## Infer file schema & Load Data into snowflake

In [None]:
filename = "raw_telco_data.parquet"
stagename = "rawdata"
rawtable = "RAW_PARQUET_DATA"

In [None]:
session.sql(f"create or replace stage {stagename} DIRECTORY = (ENABLE = TRUE);").collect()
session.file.put(filename,stagename)

In [None]:
session.sql("CREATE OR REPLACE FILE FORMAT MY_PARQUET_FORMAT TYPE = PARQUET;").collect()

session.sql(f"CREATE OR REPLACE \
            TABLE {rawtable} USING TEMPLATE ( \
                SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*)) \
                FROM \
                    TABLE( INFER_SCHEMA( \
                    LOCATION => '@{stagename}/{filename}', \
                    FILE_FORMAT => 'MY_PARQUET_FORMAT' \
                    ) \
                ) \
            );  ").collect()

## For incremental load 

you might want to just delete all rows and load new data to this table

In [None]:
dfClear = session.table(rawtable).delete()

In [None]:
dfRaw = session.read.option("compression","snappy").parquet(f"@{stagename}/{filename}")
dfRaw.copy_into_table(rawtable,FORCE= True)

In [None]:
dfR = session.table(rawtable).sample(n=5)
dfR.toPandas()

# Snowpark Transformations

The Snowpark API provides programming language constructs for building SQL statements. It's a new developer experience which enables us to build code in :-

<b><li>  Language of our choice </li></b>
<b><li> Tool of our choice and </li></b>
<b><li> Lazy execution to prevent multiple network hops to server </li></b>

Once the customer data is available in the RAW schema, we can use snowpark to create dimensions and fact tables. We will use the RAW_PARQUET table to create following tables -
    
<li> DEMOGRAPHICS </li>
<li> LOCATION </li>
<li> STATUS </li>
<li> SERVICES </li>

We will also transform and clean the data using Snowpark dataframe API

In [None]:
dfR = session.table(rawtable)

In [None]:
dfDemographics = dfR.select(col("CUSTOMERID"),
                             col("COUNT").alias("COUNT"),
                             translate(col("GENDER"),lit("NULL"),lit("Male")).alias("GENDER"),
                             col("SENIOR CITIZEN").alias("SENIORCITIZEN"),
                             col("PARTNER"),
                             col("DEPENDENTS")          
                            )


dfDemographics.write.mode('overwrite').saveAsTable('DEMOGRAPHICS')
dfDemographics.show()


In [None]:
dfLocation = dfR.select(col("CUSTOMERID"),
                         col("COUNTRY").name("COUNTRY"),
                         col("STATE").name("STATE"),
                         col("CITY").name("CITY"),
                         translate(col("ZIP CODE"),lit("NULL"),lit(0)).name("ZIPCODE"),
                         col("LAT LONG").name("LATLONG"),
                         col("LATITUDE").name("LATITUDE"),
                         col("LONGITUDE").name("LONGITUDE")       
                        )

dfLocation.write.mode('overwrite').saveAsTable('LOCATION')
dfLocation.show()


#### you can run transformation on data using similar dataframe API constructs, for example -

In [None]:
dfServices = dfR.select(col("CUSTOMERID"),
                       col("TENURE MONTHS").name("TENUREMONTHS"),
                       iff(is_null(col("PHONE SERVICE")),lit('N'),col("PHONE SERVICE")).name("PHONESERVICE"),
                       iff(is_null(col("MULTIPLE LINES")),lit("No"),col("MULTIPLE LINES")).name("MULTIPLELINES"),
                       iff(is_null(col("INTERNET SERVICE")),lit("No"),col("INTERNET SERVICE")).name("INTERNETSERVICE"),
                       iff(is_null(col("ONLINE SECURITY")),lit("No"),col("ONLINE SECURITY")).name("ONLINESECURITY"),
                       iff(is_null(col("ONLINE BACKUP")),lit("No"),col("ONLINE BACKUP")).name("ONLINEBACKUP"),
                       iff(is_null(col("DEVICE PROTECTION")),lit("No"),col("DEVICE PROTECTION")).name("DEVICEPROTECTION"),
                       iff(is_null(col("TECH SUPPORT")),lit('N'),col("TECH SUPPORT")).name("TECHSUPPORT"),
                       iff(is_null(col("STREAMING TV")),lit("No"),col("STREAMING TV")).name("STREAMINGTV"),
                       iff(is_null(col("STREAMING MOVIES")),lit("No"),col("STREAMING MOVIES")).name("STREAMINGMOVIES"),
                       iff(is_null(col("CONTRACT")),lit("Month-to-month"),col("CONTRACT")).name("CONTRACT"),
                       iff(is_null(col("PAPERLESS BILLING")),lit('Y'),col("PAPERLESS BILLING")).name("PAPERLESSBILLING"),
                       iff(is_null(col("PAYMENT METHOD")),lit("Mailed check"),col("PAYMENT METHOD")).name("PAYMENTMETHOD"),
                       col("MONTHLY CHARGES").name("MONTHLYCHARGES"),
                       col("TOTAL CHARGES").name("TOTALCHARGES"),
                       col("CHURN VALUE").name("CHURNVALUE")        

                      )

dfServices.write.mode('overwrite').saveAsTable('SERVICES')
dfServices.show()

In [None]:
dfStatus = dfR.select(col("CUSTOMERID"),
                    iff(is_null(col("CHURN LABEL")),lit('N'),col("CHURN LABEL")).name("CHURNLABEL"),
                    col("CHURN VALUE").name("CHURNVALUE"),
                    col("CHURN SCORE").name("CHURNSCORE"),
                    col("CLTV").name("CLTV"),
                    iff(is_null(col("CHURN REASON")),lit("do not know"),col("CHURN REASON")).name("CHURNREASON")          
                    )

dfStatus.write.mode('overwrite').saveAsTable('STATUS')
dfStatus.show()


# Lets check the data using an example query

This shows one of many uses of snowpark. You can build and query dataframes lazily.

In [None]:
# Lets run a query for quick sanity check
# This Query will show us the total revenue by city and contract term

dfLoc = session.table("LOCATION")
dfServ = session.table("SERVICES")

dfJoin = dfLoc.join(dfServ,dfLoc.col("CUSTOMERID") == dfServ.col("CUSTOMERID"))

dfResult = dfJoin.select(col("CITY"),
                         col("CONTRACT"),
                         col("TOTALCHARGES")).groupBy(col("CITY"),col("CONTRACT")).sum(col("TOTALCHARGES"))

dfResult.show()

### Let's create a view for data science team to begin data analysis

To do so, join up the `DEMOGRAPHICS` and `SERVICES` tables based on `CUSTOMERID`

In [None]:
dfD = session.table('DEMOGRAPHICS')
dfS = session.table('SERVICES')
dfJ = dfD.join(dfS, using_columns='CUSTOMERID', join_type = 'left')
dfJ.select(col('GENDER'),
              col('SENIORCITIZEN'),
              col('PARTNER'),
              col('DEPENDENTS'),
              col('MULTIPLELINES'),
              col('INTERNETSERVICE'),
              col('ONLINESECURITY'),
              col('ONLINEBACKUP'),
              col('DEVICEPROTECTION'),
              col('TECHSUPPORT'),
              col('STREAMINGTV'),
              col('STREAMINGMOVIES'),
              col('CONTRACT'),
              col('PAPERLESSBILLING'),
              col('PAYMENTMETHOD'),
              col('TENUREMONTHS'),
              col('MONTHLYCHARGES'),
              col('TOTALCHARGES'),
              col('CHURNVALUE'))
dfJ.create_or_replace_view('TRAIN_DATASET')

In [None]:
%%time

raw = session.table('TRAIN_DATASET').sample(n = 20)
data = raw.toPandas()

# Off to ~02 notebook for exploratory data analysis

In [None]:
pd.pandas.set_option('display.max_columns', None)
data.head()

In [None]:
session.close()