In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [2]:
import pyspark
import os
from random import random
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
import numpy as np
from pyspark.mllib.stat import Statistics

### 1. Extract,Load data,Read data as text files as RDD Transform

In [3]:
spark = SparkSession.builder.master("local").\
        appName("SparkApplication").\
        config("spark.driver.bindAddress","localhost").\
        config("spark.ui.port","4041").\
        getOrCreate()

In [4]:
spark

#### To read CSV in Spark into single RDD.

In [5]:
sc = spark.sparkContext

In [7]:
#2.read csv file in rdd
data=sc.textFile("C:/Users/merry/Desktop/churn.csv")

In [8]:
print('\n',data) #what file rdd


 C:/Users/merry/Desktop/churn.csv MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0


In [9]:
print('\n',type(data)) # variable type


 <class 'pyspark.rdd.RDD'>


In [10]:
print('\n',dir(data))# what attributes are avaiable 


 ['__add__', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getnewargs__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_computeFractionForSampleSize', '_defaultReducePartitions', '_id', '_is_barrier', '_jrdd', '_jrdd_deserializer', '_memory_limit', '_pickled', '_reserialize', '_to_java_object_rdd', 'aggregate', 'aggregateByKey', 'barrier', 'cache', 'cartesian', 'checkpoint', 'coalesce', 'cogroup', 'collect', 'collectAsMap', 'collectWithJobGroup', 'combineByKey', 'context', 'count', 'countApprox', 'countApproxDistinct', 'countByKey', 'countByValue', 'ctx', 'distinct', 'filter', 'first', 'flatMap', 'flatMapValues', 'fold', 'foldByKey', 'foreach', 'foreachPartition', 'fullOuterJoin', 'getCheckpointFile', 'getNumPartitions', 'getResourc

In [11]:
#header
header=data.first()
print(header)

customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn


In [12]:
# remove header
rdd1= data.filter(lambda line: line !=header)

In [13]:
#total record counts
print('\n file has:',rdd1.count(),'row') #counts


 file has: 7043 row


In [14]:
#filter first row
print('\n file line:',rdd1.first())


 file line: 7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,Yes,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No


In [15]:
rdd1.take(5) #take file element

['7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,Yes,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No',
 '5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,No,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No',
 '3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,Yes,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes',
 '7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,No,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No',
 '9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,No,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes']

In [16]:
#total unique records count
rdd1.distinct().count()

7043

In [17]:
print("initial partition count:"+str(rdd1.getNumPartitions()))
#Outputs: initial partition count:2

initial partition count:1


#### Transform: Exploratory data analysis using rdd

- replace Contract column values

    - Month-to-month -1m

    - One Year - 1y

    - Two Year - 2y

    - rest all - Others

- Unique customer count

- describe the categorical and numerical columns seperately

- GroupBy contract and avg of totalcharges

- using accumulator add the totalcharges

#### Load: Save analysis report

- GroupBy contract and avg of totalcharges save as files

In [19]:
step1= rdd1.map(lambda line: line.split(",")) # split by ,

In [20]:
step1.take(2)

[['7590-VHVEG',
  'Female',
  '0',
  'Yes',
  'No',
  '1',
  'No',
  'No phone service',
  'DSL',
  'No',
  'Yes',
  'No',
  'No',
  'No',
  'No',
  'Month-to-month',
  'Yes',
  'Electronic check',
  '29.85',
  '29.85',
  'No'],
 ['5575-GNVDE',
  'Male',
  '0',
  'No',
  'No',
  '34',
  'Yes',
  'No',
  'DSL',
  'Yes',
  'No',
  'Yes',
  'No',
  'No',
  'No',
  'One year',
  'No',
  'Mailed check',
  '56.95',
  '1889.5',
  'No']]

### 2.Transform: Exploratory data analysis using rdd

replace Contract column values

Month-to-month -1m

One Year - 1y

Two Year - 2y

rest all - Others

In [21]:
def replace(column_val):
    if column_val=="Month-to-month":
        column_val="1m"
    elif column_val=="One year":
        column_val="1y"
    elif column_val=="Two year":
        column_val="2y"
    else:
        column_val= "Others"
    return column_val

In [22]:
step2=step1.map(lambda x: (x[0],x[1],x[2],x[3],x[4],x[5],
                           x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],replace(x[15]),x[16],x[17],x[18],x[19],x[20]))

In [23]:
step2.take(2)

[('7590-VHVEG',
  'Female',
  '0',
  'Yes',
  'No',
  '1',
  'No',
  'No phone service',
  'DSL',
  'No',
  'Yes',
  'No',
  'No',
  'No',
  'No',
  '1m',
  'Yes',
  'Electronic check',
  '29.85',
  '29.85',
  'No'),
 ('5575-GNVDE',
  'Male',
  '0',
  'No',
  'No',
  '34',
  'Yes',
  'No',
  'DSL',
  'Yes',
  'No',
  'Yes',
  'No',
  'No',
  'No',
  '1y',
  'No',
  'Mailed check',
  '56.95',
  '1889.5',
  'No')]

In [24]:
## function to convert numerical columns from string to int
def string_to_int(val):
    try:
        return int(float(val))
    except:
        return 0

In [25]:
step3=step2.map(lambda x: (x[0],x[1],string_to_int(x[2]),x[3],x[4],string_to_int(x[5]),
                           x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],x[15],x[16],x[17],string_to_int(x[18]),string_to_int(x[19]),x[20]))

In [26]:
step3.take(1)

[('7590-VHVEG',
  'Female',
  0,
  'Yes',
  'No',
  1,
  'No',
  'No phone service',
  'DSL',
  'No',
  'Yes',
  'No',
  'No',
  'No',
  'No',
  '1m',
  'Yes',
  'Electronic check',
  29,
  29,
  'No')]

### 3. describe the categorical and numerical columns seperately

#### Numerical data

In [27]:
num_data=step3.map(lambda x: (x[2],x[5],x[18],x[19]))

In [28]:
num_data.take(4)

[(0, 1, 29, 29), (0, 34, 56, 1889), (0, 2, 53, 108), (0, 45, 42, 1840)]

#### Maximum and minimum entry

In [29]:
# decribe num
print(" Maximum  entry  in each column",num_data.max())
print("Minimum  entery  in each column",num_data.min())
#print("Mean numbers in each columns",num_data.mean())
#print("Standard deviation of each columns", num_data.std)

 Maximum  entry  in each column (1, 72, 117, 8436)
Minimum  entery  in each column (0, 0, 19, 0)


#### Categorical data

In [30]:
cat_data=step3.map(lambda x:(x[0],x[1],x[3],x[4],x[6],[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],x[15],x[16],x[17],x[20]))
cat_data.take(1)

[('7590-VHVEG',
  'Female',
  'Yes',
  'No',
  'No',
  [7],
  'DSL',
  'No',
  'Yes',
  'No',
  'No',
  'No',
  'No',
  '1m',
  'Yes',
  'Electronic check',
  'No')]

In [31]:
# decribe function or few analysis seems better, 

In [32]:
cat_data.count() #frequence, distinct,

7043

In [33]:
##functions 
# Module for changing string to int
def string_to_int(val):
    try:
        return int(float(val))
    except:
        return 0

# Module getting the count
def count(x):
    temp=0
    for val in list(x):
        temp = temp + val[1]
        out = str(val[0])+","+str(temp)
    return (out)

#module for average
def mean_val(x):
    sums=0
    l=0
    for i in x:
        sums= sums + i[1]
        l=l+1
        avg=round(sums/l,2)
        
    return (avg)

## module for replace value
def replace(column_val):
    if column_val=="Month-to-month":
        column_val="1m"
    elif column_val=="One year":
        column_val="1y"
    elif column_val=="Two year":
        column_val="2y"
    else:
        column_val= "Others"
    return column_val

### 4 .Unique customer count

In [34]:
cat_customer = data.filter(lambda line: line != header).\
            map(lambda line: line.split(",")).map(
    lambda x: ((x[0]), string_to_int(x[5]))).\
    groupBy(lambda x: x[0]).\
    map(lambda x: (count(x[1]))).coalesce(1)

cat_customer.take(4)

['7590-VHVEG,1', '5575-GNVDE,34', '3668-QPYBK,2', '7795-CFOCW,45']

### 5. -GroupBy contract and avg of totalcharges

In [35]:
step4=step3.map(lambda x: (x[15],x[19]))
step4.take(2)

[('1m', 29), ('1y', 1889)]

In [36]:
#groupby contract and avg totalcharges   
def mean_val(x):
    sums=0
    l=0
    for i in x:
        sums= sums + i[1]
        l=l+1
        avg=round(sums/l,2)
        
    return (avg)
    
step5= step4.map(lambda x:((x[0]),string_to_int(x[1])))\
            .map(lambda x: (x[0],x[1]))\
            .groupBy(lambda x: (x[0])).\
            map(lambda x: (x[0],mean_val(x[1]))).coalesce(1)

           
step5.collect() 
##step5.saveAsTextFile("data/avg_totalCharges")

[('1m', 1368.79), ('1y', 3032.14), ('2y', 3706.47)]

### 6. using accumulator add the totalcharges

In [37]:
totalcharges=step4.map(lambda x: x[1])
totalcharges.take(5)
accum=spark.sparkContext.accumulator(0)
totalcharges.foreach(lambda x:accum.add(x))
print(accum.value)

accuSum=spark.sparkContext.accumulator(0)
def countFun(x):
    global accuSum
    accuSum+=x
totalcharges.foreach(countFun)
print(accuSum.value)

accumCount=spark.sparkContext.accumulator(0)
totalcharges.foreach(lambda x:accumCount.add(1))
print(accumCount.value)

16052864
16052864
7043


### 7.Load: Save analysis report

GroupBy contract and avg of totalcharges save as files

In [38]:
step5.saveAsTextFile("Report/avg_totalCharges")

referencs

https://www.nbshare.io/notebook/403283317/How-To-Analyze-Data- Using-Pyspark-RDD/

https://github.com/sathishmtech01/pyspark_learning/blob/master/scripts/spark/project/main_file.py

https://youtu.be/ou0MYgLnftg