# Telecom Churn
- team member 
            Tashi Chotso 20BDA01
            

## Problem statement
- In the telecom industry, customers are able to choose from multiple service providers and actively switch from one operator to another. In this highly competitive market, the telecommunications industry experiences an average of 15-25% annual churn rate. Given the fact that it costs 5-10 times more to acquire a new customer than to retain an existing one, customer retention has now become even more important than customer acquisition.o reduce customer churn, telecom companies need to predict which customers are at high risk of churn.


In [1]:
import pyspark
import os
from random import random
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
import numpy as np
from pyspark.mllib.stat import Statistics

The first thing a Spark program must do is to create a SparkContext object, which tells Spark how to access a cluster. To create a SparkContext you first need to build a SparkConf object that contains information about your application.

# 1. Extract,Load data,Read data as text files as RDD Transform

In [102]:
spark = SparkSession.builder.master("local").\
        appName("SparkApplication").\
        config("spark.driver.bindAddress","localhost").\
        config("spark.ui.port","4041").\
        getOrCreate()

In [3]:
spark

### To read CSV  in Spark into single RDD.

In [103]:
sc = spark.sparkContext

In [5]:
#2.read csv file in rdd
data=sc.textFile("telecomChurn.csv")

In [6]:
print('\n',data) #what file rdd


 telecomChurn.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0


In [7]:
print('\n',type(data)) # variable type


 <class 'pyspark.rdd.RDD'>


In [8]:
print('\n',dir(data))# what attributes are avaiable 



 ['__add__', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getnewargs__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_computeFractionForSampleSize', '_defaultReducePartitions', '_id', '_is_barrier', '_jrdd', '_jrdd_deserializer', '_memory_limit', '_pickled', '_reserialize', '_to_java_object_rdd', 'aggregate', 'aggregateByKey', 'barrier', 'cache', 'cartesian', 'checkpoint', 'coalesce', 'cogroup', 'collect', 'collectAsMap', 'collectWithJobGroup', 'combineByKey', 'context', 'count', 'countApprox', 'countApproxDistinct', 'countByKey', 'countByValue', 'ctx', 'distinct', 'filter', 'first', 'flatMap', 'flatMapValues', 'fold', 'foldByKey', 'foreach', 'foreachPartition', 'fullOuterJoin', 'getCheckpointFile', 'getNumPartitions', 'getResourc

In [9]:
#header
header=data.first()
print(header)

[Stage 0:>                                                          (0 + 1) / 1]

customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn


                                                                                

In [14]:
# remove header
rdd1= data.filter(lambda line: line !=header)

In [15]:
#total record counts
print('\n file has:',rdd1.count(),'row') #counts



 file has: 7043 row


In [16]:
#filter first row
print('\n file line:',rdd1.first())


 file line: 7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,Yes,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No


In [17]:
rdd1.take(5) #take file element

['7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,Yes,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No',
 '5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,No,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No',
 '3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,Yes,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes',
 '7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,No,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No',
 '9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,No,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes']

In [18]:
#total unique records count
rdd1.distinct().count()

                                                                                

7043

In [19]:
print("initial partition count:"+str(rdd1.getNumPartitions()))
#Outputs: initial partition count:2

initial partition count:1


ii.Transform: Exploratory data analysis using rdd

    - replace Contract column values

        - Month-to-month -1m

        - One Year - 1y

        - Two Year - 2y

        - rest all - Others

    - Unique customer count

    - describe the categorical and numerical columns seperately

    - GroupBy contract and avg of totalcharges

    - using accumulator add the totalcharges
   iii.Load: Save analysis report

    - GroupBy contract and avg of totalcharges save as files



In [20]:
step1= rdd1.map(lambda line: line.split(",")) # split by ,

In [21]:
step1.take(2)

[['7590-VHVEG',
  'Female',
  '0',
  'Yes',
  'No',
  '1',
  'No',
  'No phone service',
  'DSL',
  'No',
  'Yes',
  'No',
  'No',
  'No',
  'No',
  'Month-to-month',
  'Yes',
  'Electronic check',
  '29.85',
  '29.85',
  'No'],
 ['5575-GNVDE',
  'Male',
  '0',
  'No',
  'No',
  '34',
  'Yes',
  'No',
  'DSL',
  'Yes',
  'No',
  'Yes',
  'No',
  'No',
  'No',
  'One year',
  'No',
  'Mailed check',
  '56.95',
  '1889.5',
  'No']]

# 2.Transform: Exploratory data analysis using rdd

- replace Contract column values

    - Month-to-month -1m

    - One Year - 1y

    - Two Year - 2y

    - rest all - Others

In [22]:
def replace(column_val):
    if column_val=="Month-to-month":
        column_val="1m"
    elif column_val=="One year":
        column_val="1y"
    elif column_val=="Two year":
        column_val="2y"
    else:
        column_val= "Others"
    return column_val

In [23]:
step2=step1.map(lambda x: (x[0],x[1],x[2],x[3],x[4],x[5],
                           x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],replace(x[15]),x[16],x[17],x[18],x[19],x[20]))

In [24]:
step2.take(2)

[('7590-VHVEG',
  'Female',
  '0',
  'Yes',
  'No',
  '1',
  'No',
  'No phone service',
  'DSL',
  'No',
  'Yes',
  'No',
  'No',
  'No',
  'No',
  '1m',
  'Yes',
  'Electronic check',
  '29.85',
  '29.85',
  'No'),
 ('5575-GNVDE',
  'Male',
  '0',
  'No',
  'No',
  '34',
  'Yes',
  'No',
  'DSL',
  'Yes',
  'No',
  'Yes',
  'No',
  'No',
  'No',
  '1y',
  'No',
  'Mailed check',
  '56.95',
  '1889.5',
  'No')]

In [120]:
## function to convert numerical columns from string to int
def string_to_int(val):
    try:
        return int(float(val))
    except:
        return 0

In [123]:
step3=step2.map(lambda x: (x[0],x[1],string_to_int(x[2]),x[3],x[4],string_to_int(x[5]),
                           x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],x[15],x[16],x[17],string_to_int(x[18]),string_to_int(x[19]),x[20]))

In [28]:
step3.take(1)

[('7590-VHVEG',
  'Female',
  0,
  'Yes',
  'No',
  1,
  'No',
  'No phone service',
  'DSL',
  'No',
  'Yes',
  'No',
  'No',
  'No',
  'No',
  '1m',
  'Yes',
  'Electronic check',
  29,
  29,
  'No')]

# 3. describe the categorical and numerical columns seperately


## Numerical data

In [29]:
num_data=step3.map(lambda x: (x[2],x[5],x[18],x[19]))

In [30]:
num_data.take(4)

[(0, 1, 29, 29), (0, 34, 56, 1889), (0, 2, 53, 108), (0, 45, 42, 1840)]

## mean and variance 

In [301]:
import numpy as np
from pyspark.mllib.stat import Statistics
num_data=step3.map(lambda x: (x[2],x[5],x[18],x[19]))
num_data.take(5)
summary = Statistics.colStats(num_data)
print("mean :",summary.mean())  # a dense vector containing the mean value for each column
print("variance :",summary.variance())  # column-wise variance


mean : [1.62146812e-01 3.23711487e+01 6.42958966e+01 2.27926509e+03]
variance : [1.35874516e-01 6.03168108e+02 9.05572188e+02 5.13834082e+06]


[Stage 427:>                                                        (0 + 1) / 1]                                                                                

## Maximum and minimum entry  

In [32]:
# decribe num
print(" Maximum  entry  in each column",num_data.max())
print("Minimum  entery  in each column",num_data.min())
#print("Mean numbers in each columns",num_data.mean())
#print("Standard deviation of each columns", num_data.std)

 Maximum  numbers in each column (1, 72, 117, 8436)
Minimum  numbers in each column (0, 0, 19, 0)


## Categorical data

In [260]:
cat_data=step3.map(lambda x:(x[0],x[1],x[3],x[4],x[6],[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],x[15],x[16],x[17],x[20]))
cat_data.take(1)

[('7590-VHVEG',
  'Female',
  'Yes',
  'No',
  'No',
  [7],
  'DSL',
  'No',
  'Yes',
  'No',
  'No',
  'No',
  'No',
  '1m',
  'Yes',
  'Electronic check',
  'No')]

In [34]:
cat_data.count() #frequence, distinct,

7043

In [242]:
##functions 
# Module for changing string to int
def string_to_int(val):
    try:
        return int(float(val))
    except:
        return 0

# Module getting the count
def count(x):
    temp=0
    for val in list(x):
        temp = temp + val[1]
        out = str(val[0])+","+str(temp)
    return (out)

#module for average
def mean_val(x):
    sums=0
    l=0
    for i in x:
        sums= sums + i[1]
        l=l+1
        avg=round(sums/l,2)
        
    return (avg)

## module for replace value
def replace(column_val):
    if column_val=="Month-to-month":
        column_val="1m"
    elif column_val=="One year":
        column_val="1y"
    elif column_val=="Two year":
        column_val="2y"
    else:
        column_val= "Others"
    return column_val


### count Churn

In [251]:
col=[0,1,3,4,6,7,8,9,10,11,12,13,14,15,16,17,20]
cat_count = data.filter(lambda line: line != header).\
            map(lambda line: line.split(",")).\
            map(lambda x: (x[0],x[1],x[2],x[3],x[4],x[5],
                           x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],replace(x[15]),x[16],x[17],x[18],x[19],x[20])).map(
    lambda x: ((x[20]), string_to_int(x[5]))).\
    groupBy(lambda x: x[0]).\
    map(lambda x: (count(x[1]))).coalesce(1)

cat_count.collect()

['No,194387', 'Yes,33603']

In [277]:
cat_ge = data.filter(lambda line: line != header).\
            map(lambda line: line.split(",")).\
           map(
    lambda x: ((x[1]), string_to_int(x[5]))).\
    groupBy(lambda x: x[0]).\
    map(lambda x: (count(x[1]))).coalesce(1)

cat_ge.collect()

['Female,112469', 'Male,115521']

### Count internetService

In [272]:
cat_service = data.filter(lambda line: line != header).\
            map(lambda line: line.split(",")).map(
    lambda x: ((x[8]), string_to_int(x[5]))).\
    groupBy(lambda x: x[0]).\
    map(lambda x: (count(x[1]))).coalesce(1)

cat_service.collect()

['DSL,79461', 'Fiber optic,101914', 'No,46615']

###  count PaymentMethod

In [273]:
cat_pyMethod = data.filter(lambda line: line != header).\
            map(lambda line: line.split(",")).map(
    lambda x: ((x[17]), string_to_int(x[5]))).\
    groupBy(lambda x: x[0]).\
    map(lambda x: (count(x[1]))).coalesce(1)

cat_pyMethod.collect()

['Electronic check,59538',
 'Mailed check,35190',
 'Bank transfer (automatic),67406',
 'Credit card (automatic),65856']

### Count MultipleLines

In [274]:
cat_pyMethod = data.filter(lambda line: line != header).\
            map(lambda line: line.split(",")).map(
    lambda x: ((x[7]), string_to_int(x[5]))).\
    groupBy(lambda x: x[0]).\
    map(lambda x: (count(x[1]))).coalesce(1)

cat_pyMethod.collect()

['No phone service,21645', 'No,81817', 'Yes,124528']

### number of churn w.r.t gender

In [257]:
cat_gender = data.filter(lambda line: line != header).\
            map(lambda line: line.split(",")).map(
    lambda x: ((x[20],x[1]), string_to_int(x[5]))).\
    groupBy(lambda x: x[0]).\
    map(lambda x: (count(x[1]))).coalesce(1)

cat_gender.collect()

["('No', 'Female'),96502",
 "('No', 'Male'),97885",
 "('Yes', 'Male'),17636",
 "('Yes', 'Female'),15967"]

# 4 .Unique customer count

In [279]:
cat_customer = data.filter(lambda line: line != header).\
            map(lambda line: line.split(",")).map(
    lambda x: ((x[0]), string_to_int(x[5]))).\
    groupBy(lambda x: x[0]).\
    map(lambda x: (count(x[1]))).coalesce(1)

cat_customer.collect()

['7590-VHVEG,1',
 '5575-GNVDE,34',
 '3668-QPYBK,2',
 '7795-CFOCW,45',
 '9237-HQITU,2',
 '9305-CDSKC,8',
 '1452-KIOVK,22',
 '6713-OKOMC,10',
 '7892-POOKP,28',
 '6388-TABGU,62',
 '9763-GRSKD,13',
 '7469-LKBCI,16',
 '8091-TTVAX,58',
 '0280-XJGEX,49',
 '5129-JLPIS,25',
 '3655-SNQYZ,69',
 '8191-XWSZG,52',
 '9959-WOFKT,71',
 '4190-MFLUW,10',
 '4183-MYFRB,21',
 '8779-QRDMV,1',
 '1680-VDCWW,12',
 '1066-JKSGK,1',
 '3638-WEABW,58',
 '6322-HRPFA,49',
 '6865-JZNKO,30',
 '6467-CHFZW,47',
 '8665-UTDHZ,1',
 '5248-YGIJN,72',
 '8773-HHUOZ,17',
 '3841-NFECX,71',
 '4929-XIHVW,2',
 '6827-IEAUQ,27',
 '7310-EGVHZ,1',
 '3413-BMNZE,1',
 '6234-RAAPL,72',
 '6047-YHPVI,5',
 '6572-ADKRS,46',
 '5380-WJKOV,34',
 '8168-UQWWF,11',
 '8865-TNMNX,10',
 '9489-DEDVP,70',
 '9867-JCZSP,17',
 '4671-VJLCL,63',
 '4080-IIARD,13',
 '3714-NTNFO,49',
 '5948-UJZLF,2',
 '7760-OYPDY,2',
 '7639-LIAYI,52',
 '2954-PIBKO,69',
 '8012-SOUDQ,43',
 '9420-LOJKX,15',
 '6575-SUVOI,25',
 '7495-OOKFY,8',
 '4667-QONEA,60',
 '1658-BYGOY,18',
 '8769

# 5. -GroupBy contract and avg of totalcharges 

In [225]:
step4=step3.map(lambda x: (x[15],x[19]))
step4.take(2)

[('1m', 29), ('1y', 1889)]

In [224]:
 #groupby contract and avg totalcharges   
def mean_val(x):
    sums=0
    l=0
    for i in x:
        sums= sums + i[1]
        l=l+1
        avg=round(sums/l,2)
        
    return (avg)
    
step5= step4.map(lambda x:((x[0]),string_to_int(x[1])))\
            .map(lambda x: (x[0],x[1]))\
            .groupBy(lambda x: (x[0])).\
            map(lambda x: (x[0],mean_val(x[1]))).coalesce(1)

           
step5.collect() 
##step5.saveAsTextFile("data/avg_totalCharges")

[('1m', 1368.79), ('1y', 3032.14), ('2y', 3706.47)]

# 6. using accumulator add the totalcharges

In [321]:
totalcharges=step4.map(lambda x: x[1])
totalcharges.take(5)
ccum=spark.sparkContext.accumulator(0)
totalcharges.foreach(lambda x:accum.add(x))
print(accum.value)

accuSum=spark.sparkContext.accumulator(0)
def countFun(x):
    global accuSum
    accuSum+=x
totalcharges.foreach(countFun)
print(accuSum.value)

accumCount=spark.sparkContext.accumulator(0)
totalcharges.foreach(lambda x:accumCount.add(1))
print(accumCount.value)

176581534
16052864
7043


# 7.Load: Save analysis report

- GroupBy contract and avg of totalcharges save as files

In [314]:
#step5.saveAsTextFile("Report/avg_totalCharges")

## referencs
- https://www.nbshare.io/notebook/403283317/How-To-Analyze-Data- Using-Pyspark-RDD/