In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import pipeline

In [2]:
spark=SparkSession.builder.appName('Big Data').getOrCreate()

In [3]:
spark

### Importando datos con PySpark

In [4]:
df=spark.read.csv("telco_customer_churn.csv",header=True,inferSchema=True)

In [5]:
var_cat=[nC for nC,dt in df.dtypes if dt=='string']   
var_num=[nC for nC,dt in df.dtypes if dt in ['int','double']]

In [6]:
df.select(var_cat).limit(5).toPandas()

Unnamed: 0,customerID,gender,Partner,Dependents,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,TotalCharges,Churn
0,7590-VHVEG,Female,Yes,No,No,No phone service,DSL,No,Yes,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,No
1,5575-GNVDE,Male,No,No,Yes,No,DSL,Yes,No,Yes,No,No,No,One year,No,Mailed check,1889.5,No
2,3668-QPYBK,Male,No,No,Yes,No,DSL,Yes,Yes,No,No,No,No,Month-to-month,Yes,Mailed check,108.15,Yes
3,7795-CFOCW,Male,No,No,No,No phone service,DSL,Yes,No,Yes,Yes,No,No,One year,No,Bank transfer (automatic),1840.75,No
4,9237-HQITU,Female,No,No,Yes,No,Fiber optic,No,No,No,No,No,No,Month-to-month,Yes,Electronic check,151.65,Yes


In [7]:
var_cat.remove('customerID')
var_cat.remove('TotalCharges')
var_cat.remove('Churn')

In [8]:
# var_num.append('TotalCharges')

In [9]:
var_cat

['gender',
 'Partner',
 'Dependents',
 'PhoneService',
 'MultipleLines',
 'InternetService',
 'OnlineSecurity',
 'OnlineBackup',
 'DeviceProtection',
 'TechSupport',
 'StreamingTV',
 'StreamingMovies',
 'Contract',
 'PaperlessBilling',
 'PaymentMethod']

In [10]:
lista_etapas=[]

for cat in var_cat:
    strIdx = StringIndexer(inputCol=cat,outputCol=cat+'_index')
    encoder = OneHotEncoder(inputCol=cat+'_index',outputCol=cat+'_oneHot')
    lista_etapas+=[strIdx,encoder]

In [11]:
lista_etapas

[StringIndexer_81da8ba5142f,
 OneHotEncoder_d60e45c9a168,
 StringIndexer_a756266ae5e3,
 OneHotEncoder_a91d03aa21c1,
 StringIndexer_afbacf32af79,
 OneHotEncoder_56fbb168a052,
 StringIndexer_32236a20d42c,
 OneHotEncoder_6061eb275d95,
 StringIndexer_7225d340b186,
 OneHotEncoder_2851c7adf6a1,
 StringIndexer_1e97fd1a0ace,
 OneHotEncoder_1851a6222061,
 StringIndexer_ed07bd1c6423,
 OneHotEncoder_6b7e10d0f2a2,
 StringIndexer_ef0474e5b276,
 OneHotEncoder_5cce59f5373a,
 StringIndexer_1fef198bfa7a,
 OneHotEncoder_04ee9f290ca4,
 StringIndexer_7f1d20e42982,
 OneHotEncoder_3049e32a30c8,
 StringIndexer_6e0e483e7a63,
 OneHotEncoder_277fd214d1e5,
 StringIndexer_0b0c354302b9,
 OneHotEncoder_db9c90dcc13f,
 StringIndexer_a327ca7dd71a,
 OneHotEncoder_81742b0588a6,
 StringIndexer_30c3d7ee9248,
 OneHotEncoder_196f1648c7a4,
 StringIndexer_fed613aac03d,
 OneHotEncoder_d49d28c30980]

In [12]:
print(lista_etapas[0].getInputCol(),'|',lista_etapas[0].getOutputCol())      # Elemento de StringIndexer

gender | gender_index


In [13]:
print(lista_etapas[1].getInputCol(),'|',lista_etapas[1].getOutputCol())      # Elemento de OneHotEncoder

gender_index | gender_oneHot


## Tratamiento para la variable de salida Y (Churn)   

In [14]:
strIndx2 = StringIndexer(inputCol='Churn',outputCol='Y')
lista_etapas.append(strIndx2)

In [15]:
print(lista_etapas[-1].getInputCol(),'|',lista_etapas[-1].getOutputCol())

Churn | Y


### Uniendo los vectores OneHot de variables Cat con var_num

In [16]:
[c+'_oneHot'for c in var_cat]+var_num

['gender_oneHot',
 'Partner_oneHot',
 'Dependents_oneHot',
 'PhoneService_oneHot',
 'MultipleLines_oneHot',
 'InternetService_oneHot',
 'OnlineSecurity_oneHot',
 'OnlineBackup_oneHot',
 'DeviceProtection_oneHot',
 'TechSupport_oneHot',
 'StreamingTV_oneHot',
 'StreamingMovies_oneHot',
 'Contract_oneHot',
 'PaperlessBilling_oneHot',
 'PaymentMethod_oneHot',
 'SeniorCitizen',
 'tenure',
 'MonthlyCharges']

In [17]:
columnasVectores=[c+'_oneHot'for c in var_cat]+var_num  

ensamblador=VectorAssembler(inputCols=columnasVectores,outputCol='X')       #Todas las columnas OneHot en un solo vector

lista_etapas.append(ensamblador)

In [18]:
lista_etapas

[StringIndexer_81da8ba5142f,
 OneHotEncoder_d60e45c9a168,
 StringIndexer_a756266ae5e3,
 OneHotEncoder_a91d03aa21c1,
 StringIndexer_afbacf32af79,
 OneHotEncoder_56fbb168a052,
 StringIndexer_32236a20d42c,
 OneHotEncoder_6061eb275d95,
 StringIndexer_7225d340b186,
 OneHotEncoder_2851c7adf6a1,
 StringIndexer_1e97fd1a0ace,
 OneHotEncoder_1851a6222061,
 StringIndexer_ed07bd1c6423,
 OneHotEncoder_6b7e10d0f2a2,
 StringIndexer_ef0474e5b276,
 OneHotEncoder_5cce59f5373a,
 StringIndexer_1fef198bfa7a,
 OneHotEncoder_04ee9f290ca4,
 StringIndexer_7f1d20e42982,
 OneHotEncoder_3049e32a30c8,
 StringIndexer_6e0e483e7a63,
 OneHotEncoder_277fd214d1e5,
 StringIndexer_0b0c354302b9,
 OneHotEncoder_db9c90dcc13f,
 StringIndexer_a327ca7dd71a,
 OneHotEncoder_81742b0588a6,
 StringIndexer_30c3d7ee9248,
 OneHotEncoder_196f1648c7a4,
 StringIndexer_fed613aac03d,
 OneHotEncoder_d49d28c30980,
 StringIndexer_371e51d83bc2,
 VectorAssembler_f15c0d5abf7c]

### Aplicando las etapas al dataset - Pipeline

In [19]:
procesadorEtapas=pipeline.Pipeline(stages=lista_etapas)

In [20]:
modelo=procesadorEtapas.fit(df)

In [21]:
df2=modelo.transform(df)

In [22]:
df2.limit(5).toPandas().transpose()

Unnamed: 0,0,1,2,3,4
customerID,7590-VHVEG,5575-GNVDE,3668-QPYBK,7795-CFOCW,9237-HQITU
gender,Female,Male,Male,Male,Female
SeniorCitizen,0,0,0,0,0
Partner,Yes,No,No,No,No
Dependents,No,No,No,No,No
tenure,1,34,2,45,2
PhoneService,No,Yes,Yes,No,Yes
MultipleLines,No phone service,No,No,No phone service,No
InternetService,DSL,DSL,DSL,DSL,Fiber optic
OnlineSecurity,No,Yes,Yes,Yes,No


In [23]:
df2.select(['X','Y']).limit(5).toPandas()

Unnamed: 0,X,Y
0,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, ...",0.0
1,"(1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, ...",0.0
2,"(1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, ...",1.0
3,"(1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ...",0.0
4,"(0.0, 1.0, 1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 1.0, ...",1.0


In [24]:
### Tarea
1. Transformación de la variable categórica(TotalCharges) a numérica con pySpark
2. Construir el ensamblador con la nueva variable
3. Normalizar las variables numéricas entre valores 0 y 1
4. Construir el ensamblador con las variables normalizadas

SyntaxError: invalid syntax (<ipython-input-24-2cf190d5b4fc>, line 2)

In [103]:
# 1)
df=spark.read.csv("telco_customer_churn.csv",header=True,inferSchema=True)
var_cat=[nC for nC,dt in df.dtypes if dt=='string']   
var_num=[nC for nC,dt in df.dtypes if dt in ['int','double']]

In [104]:
df = df.withColumn('TotalCharges', df['TotalCharges'].cast('double'))    # cambiar variable str a double

In [105]:
df.describe

<bound method DataFrame.describe of DataFrame[customerID: string, gender: string, SeniorCitizen: int, Partner: string, Dependents: string, tenure: int, PhoneService: string, MultipleLines: string, InternetService: string, OnlineSecurity: string, OnlineBackup: string, DeviceProtection: string, TechSupport: string, StreamingTV: string, StreamingMovies: string, Contract: string, PaperlessBilling: string, PaymentMethod: string, MonthlyCharges: double, TotalCharges: double, Churn: string]>

In [106]:
var_cat.remove('customerID')
var_cat.remove('TotalCharges')
var_cat.remove('Churn')

In [107]:
var_num.append('TotalCharges')

In [108]:
var_num

['SeniorCitizen', 'tenure', 'MonthlyCharges', 'TotalCharges']

In [109]:
df.select(var_num)           # TotalCharges está como double

DataFrame[SeniorCitizen: int, tenure: int, MonthlyCharges: double, TotalCharges: double]

In [110]:
lista_etapas=[]

for cat in var_cat:
    strIdx = StringIndexer(inputCol=cat,outputCol=cat+'_index')
    encoder = OneHotEncoder(inputCol=cat+'_index',outputCol=cat+'_oneHot')
    lista_etapas+=[strIdx,encoder]

In [111]:
strIndx2 = StringIndexer(inputCol='Churn',outputCol='Y')
lista_etapas.append(strIndx2)

In [112]:
[c+'_oneHot'for c in var_cat]+var_num

['gender_oneHot',
 'Partner_oneHot',
 'Dependents_oneHot',
 'PhoneService_oneHot',
 'MultipleLines_oneHot',
 'InternetService_oneHot',
 'OnlineSecurity_oneHot',
 'OnlineBackup_oneHot',
 'DeviceProtection_oneHot',
 'TechSupport_oneHot',
 'StreamingTV_oneHot',
 'StreamingMovies_oneHot',
 'Contract_oneHot',
 'PaperlessBilling_oneHot',
 'PaymentMethod_oneHot',
 'SeniorCitizen',
 'tenure',
 'MonthlyCharges',
 'TotalCharges']

In [113]:
#2)
columnasVectores=[c+'_oneHot'for c in var_cat]+var_num

ensamblador=VectorAssembler(inputCols=columnasVectores,outputCol='X')       #Todas las columnas OneHot en un solo vector

lista_etapas.append(ensamblador)

In [114]:
procesadorEtapas=pipeline.Pipeline(stages=lista_etapas)

In [115]:
modelo=procesadorEtapas.fit(df)

In [116]:
df2=modelo.transform(df)

In [117]:
df2.limit(5).toPandas().transpose()

Unnamed: 0,0,1,2,3,4
customerID,7590-VHVEG,5575-GNVDE,3668-QPYBK,7795-CFOCW,9237-HQITU
gender,Female,Male,Male,Male,Female
SeniorCitizen,0,0,0,0,0
Partner,Yes,No,No,No,No
Dependents,No,No,No,No,No
tenure,1,34,2,45,2
PhoneService,No,Yes,Yes,No,Yes
MultipleLines,No phone service,No,No,No phone service,No
InternetService,DSL,DSL,DSL,DSL,Fiber optic
OnlineSecurity,No,Yes,Yes,Yes,No


In [56]:
#3)  NORMALIZACIÓN
from pyspark.ml.functions import vector_to_array

df=spark.read.csv("telco_customer_churn.csv",header=True,inferSchema=True)
var_cat=[nC for nC,dt in df.dtypes if dt=='string']   
var_num=[nC for nC,dt in df.dtypes if dt in ['int','double']]

df = df.withColumn('TotalCharges', df['TotalCharges'].cast('double'))    # cambiar variable str a double

var_cat.remove('customerID')
var_cat.remove('TotalCharges')
var_cat.remove('Churn')
var_num.append('TotalCharges')

df = df.na.drop(subset=["TotalCharges"])       # Eliminar datos nulos sino el fit sale error

In [57]:
# Normalización de variables numéricas
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StandardScaler

lista_etapasNum=[]

for var in var_num:
    ensam=VectorAssembler(inputCols=[var],outputCol=var+'_vec')     
    norm=StandardScaler(inputCol=var+'_vec', outputCol=var+'_scaled')
    lista_etapasNum+=[ensam,norm]
    
etapasNum = pipeline.Pipeline(stages=lista_etapasNum)
model = etapasNum.fit(df)
df_normNum = model.transform(df)

In [58]:
df_normNum.select(var_num).limit(10).toPandas().transpose()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
SeniorCitizen,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
tenure,1.0,34.0,2.0,45.0,2.0,8.0,22.0,10.0,28.0,62.0
MonthlyCharges,29.85,56.95,53.85,42.3,70.7,99.65,89.1,29.75,104.8,56.15
TotalCharges,29.85,1889.5,108.15,1840.75,151.65,820.5,1949.4,301.9,3046.05,3487.95


In [59]:
# Procedimiento para obtener solo el valor normalizado
for col in var_num:
    df_normNum = df_normNum.drop(col+"_vec")        # Borrar todos los var+'_vec'
    
for col in var_num:
    df_normNum = df_normNum.withColumn(col, vector_to_array(col+"_scaled"))  # Pasar de vector a array las columnas normalizadas

In [60]:
for col in var_num:
    df_normNum = df_normNum.drop(col+"_scaled")
    
for col in var_num:
    df_normNum = df_normNum.withColumn(col, df_normNum[col].getItem(0))

In [61]:
df_normNum.select(var_num).limit(10).toPandas().transpose()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
SeniorCitizen,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
tenure,0.040741,1.385196,0.081482,1.833348,0.081482,0.325929,0.896303,0.407411,1.14075,2.525946
MonthlyCharges,0.992157,1.892909,1.789871,1.405971,2.349932,3.312175,2.961513,0.988833,3.483351,1.866318
TotalCharges,0.013169,0.833564,0.047711,0.812058,0.066901,0.361969,0.85999,0.133185,1.343784,1.53873


In [62]:
#4) Construyendo el ensamblador

lista_etapas=[]

for cat in var_cat:
    strIdx = StringIndexer(inputCol=cat,outputCol=cat+'_index')
    encoder = OneHotEncoder(inputCol=cat+'_index',outputCol=cat+'_oneHot')
    lista_etapas+=[strIdx,encoder]
    
strIndx2 = StringIndexer(inputCol='Churn',outputCol='Y')
lista_etapas.append(strIndx2)

In [63]:
columnasVectores = [c+'_oneHot' for c in var_cat] + var_num
ensamblador = VectorAssembler(inputCols=columnasVectores, outputCol="X")
lista_etapas.append(ensamblador)

In [64]:
procesadorEtapas = pipeline.Pipeline(stages=lista_etapas)
df_normNumFinal = procesadorEtapas.fit(df_normNum).transform(df_normNum)

In [65]:
df_normNumFinal.select(["X", "Y"]).limit(10).toPandas()

Unnamed: 0,X,Y
0,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, ...",0.0
1,"(1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, ...",0.0
2,"(1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, ...",1.0
3,"(1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ...",0.0
4,"(0.0, 1.0, 1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 1.0, ...",1.0
5,"(0.0, 1.0, 1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 1.0, ...",1.0
6,"(1.0, 1.0, 0.0, 1.0, 0.0, 1.0, 1.0, 0.0, 1.0, ...",0.0
7,"(0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ...",0.0
8,"(0.0, 0.0, 1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 1.0, ...",1.0
9,"(1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, ...",0.0


In [70]:
df_normNumFinal.select(["X", "Y"]).limit(10).toPandas()

Unnamed: 0,X,Y
0,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, ...",0.0
1,"(1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, ...",0.0
2,"(1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, ...",1.0
3,"(1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ...",0.0
4,"(0.0, 1.0, 1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 1.0, ...",1.0
5,"(0.0, 1.0, 1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 1.0, ...",1.0
6,"(1.0, 1.0, 0.0, 1.0, 0.0, 1.0, 1.0, 0.0, 1.0, ...",0.0
7,"(0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ...",0.0
8,"(0.0, 0.0, 1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 1.0, ...",1.0
9,"(1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, ...",0.0


### Modelo con estandarización de variables numéricas y categóricas

In [78]:
df=spark.read.csv("telco_customer_churn.csv",header=True,inferSchema=True)
var_cat=[nC for nC,dt in df.dtypes if dt=='string']   
var_num=[nC for nC,dt in df.dtypes if dt in ['int','double']]

df = df.withColumn('TotalCharges', df['TotalCharges'].cast('double'))    # cambiar variable str a double

var_cat.remove('customerID')
var_cat.remove('TotalCharges')
var_cat.remove('Churn')
var_num.append('TotalCharges')

df = df.na.drop(subset=["TotalCharges"])  

In [79]:
# Normalización en variables numéricas

lista_etapasNum=[]

for var in var_num:
    ensam=VectorAssembler(inputCols=[var],outputCol=var+'_vec')     
    norm=StandardScaler(inputCol=var+'_vec', outputCol=var+'_scaled')
    lista_etapasNum+=[ensam,norm]
    
etapasNum = pipeline.Pipeline(stages=lista_etapasNum)
model = etapasNum.fit(df)
df_normNum = model.transform(df)

In [80]:
for col in var_num:
    df_normNum = df_normNum.drop(col+"_vec")        # Borrar todos los var+'_vec'
    
for col in var_num:
    df_normNum = df_normNum.withColumn(col, vector_to_array(col+"_scaled"))

for col in var_num:
    df_normNum = df_normNum.drop(col+"_scaled")
    
for col in var_num:
    df_normNum = df_normNum.withColumn(col, df_normNum[col].getItem(0))

In [82]:
# Normalización en variables categóricas

lista_etapas=[]

for cat in var_cat:
    strIdx = StringIndexer(inputCol=cat,outputCol=cat+'_index')
    encoder = OneHotEncoder(inputCol=cat+'_index',outputCol=cat+'_oneHot')
    standard = StandardScaler(inputCol=cat+'_oneHot',outputCol=cat+'_scaled')
    lista_etapas+=[strIdx,encoder,standard]
    
strIndx2 = StringIndexer(inputCol='Churn',outputCol='Y')
lista_etapas.append(strIndx2)

In [83]:
#4) Construyendo el ensamblador
columnasVectores = [c+'_scaled' for c in var_cat] + var_num
ensamblador = VectorAssembler(inputCols=columnasVectores, outputCol="X")
lista_etapas.append(ensamblador)

In [84]:
procesadorEtapas = pipeline.Pipeline(stages=lista_etapas)
df_normNumFinal = procesadorEtapas.fit(df_normNum).transform(df_normNum)

In [85]:
df_normNumFinal.select(["X", "Y"]).limit(10).toPandas()

Unnamed: 0,X,Y
0,"(0.0, 0.0, 2.1851748981300347, 0.0, 0.0, 0.0, ...",0.0
1,"(1.9999458781640924, 2.001082630964643, 2.1851...",0.0
2,"(1.9999458781640924, 2.001082630964643, 2.1851...",1.0
3,"(1.9999458781640924, 2.001082630964643, 2.1851...",0.0
4,"(0.0, 2.001082630964643, 2.1851748981300347, 3...",1.0
5,"(0.0, 2.001082630964643, 2.1851748981300347, 3...",1.0
6,"(1.9999458781640924, 2.001082630964643, 0.0, 3...",0.0
7,"(0.0, 2.001082630964643, 2.1851748981300347, 0...",0.0
8,"(0.0, 0.0, 2.1851748981300347, 3.3832826290967...",1.0
9,"(1.9999458781640924, 2.001082630964643, 0.0, 3...",0.0
