In [1]:
# Importando todos os componentes da biblioteca spark

import pyspark

In [2]:
#Lendo os arquivos csv com o comando textFile(), e utilizando u zipWithIndex() que junta uma string do texto de cada linha do csv
#vinculada a uma tupla onde a segunda posição representa um indice
RDDtreino = sc.textFile('projeto4_telecom_treino.csv').zipWithIndex()
RDDteste = sc.textFile('projeto4_telecom_teste.csv').zipWithIndex()

In [3]:
#Visualizando o resultado inicial, onde temos uma "lista" (RDD) de tuplas, onde o primeiro elemento de cada tupla é uma string
#referente a toda a linha do csv, e o segundo elemento o índice]

RDDtreino.take(5)

[('"","state","account_length","area_code","international_plan","voice_mail_plan","number_vmail_messages","total_day_minutes","total_day_calls","total_day_charge","total_eve_minutes","total_eve_calls","total_eve_charge","total_night_minutes","total_night_calls","total_night_charge","total_intl_minutes","total_intl_calls","total_intl_charge","number_customer_service_calls","churn"',
  0),
 ('"1","KS",128,"area_code_415","no","yes",25,265.1,110,45.07,197.4,99,16.78,244.7,91,11.01,10,3,2.7,1,"no"',
  1),
 ('"2","OH",107,"area_code_415","no","yes",26,161.6,123,27.47,195.5,103,16.62,254.4,103,11.45,13.7,3,3.7,1,"no"',
  2),
 ('"3","NJ",137,"area_code_415","no","no",0,243.4,114,41.38,121.2,110,10.3,162.6,104,7.32,12.2,5,3.29,0,"no"',
  3),
 ('"4","OH",84,"area_code_408","yes","no",0,299.4,71,50.9,61.9,88,5.26,196.9,89,8.86,6.6,7,1.78,2,"no"',
  4)]

In [4]:
#Primeiramente separando os nomes das colunas, que após utilizado o metodo zipWithIndex, se encontra na tupla onde o elemento [1]
#, o indice, é igual a zero

#Após isso é utilizado o metodo split() para separar o valor unico que era a string, em diversos elementos dentro de uma lista
#separa por virgulas


#O filter percorre cada uma da tuplas, checando 
columns = RDDtreino.filter(lambda x: x[1] == 0).map(lambda x: x[0].replace('\"','')).map(lambda x: x.split(',')).collect()

In [5]:
columns = columns[0]

In [6]:
#De forma análoga ao que foi feito com as colunas, foram filtradas todas as tuplas na lista (RDD) que são maiores que zero
#e realizada a divisao também por virgulas
data = RDDtreino.filter(lambda x: x[1] > 0).map(lambda x: x[0].replace('\"', '')).map(lambda x: x.split(","))

In [7]:
data = data.map(lambda x: (x)).toDF(schema = columns)



In [8]:
data.show()

+---+-----+--------------+-------------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-----------------+-----------------------------+-----+
|   |state|account_length|    area_code|international_plan|voice_mail_plan|number_vmail_messages|total_day_minutes|total_day_calls|total_day_charge|total_eve_minutes|total_eve_calls|total_eve_charge|total_night_minutes|total_night_calls|total_night_charge|total_intl_minutes|total_intl_calls|total_intl_charge|number_customer_service_calls|churn|
+---+-----+--------------+-------------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+--------------

In [9]:
#Analisando o tipo de dados que contém em cada coluna

data.printSchema()

root
 |-- : string (nullable = true)
 |-- state: string (nullable = true)
 |-- account_length: string (nullable = true)
 |-- area_code: string (nullable = true)
 |-- international_plan: string (nullable = true)
 |-- voice_mail_plan: string (nullable = true)
 |-- number_vmail_messages: string (nullable = true)
 |-- total_day_minutes: string (nullable = true)
 |-- total_day_calls: string (nullable = true)
 |-- total_day_charge: string (nullable = true)
 |-- total_eve_minutes: string (nullable = true)
 |-- total_eve_calls: string (nullable = true)
 |-- total_eve_charge: string (nullable = true)
 |-- total_night_minutes: string (nullable = true)
 |-- total_night_calls: string (nullable = true)
 |-- total_night_charge: string (nullable = true)
 |-- total_intl_minutes: string (nullable = true)
 |-- total_intl_calls: string (nullable = true)
 |-- total_intl_charge: string (nullable = true)
 |-- number_customer_service_calls: string (nullable = true)
 |-- churn: string (nullable = true)



In [22]:
from pyspark.sql.functions import col 
from pyspark.sql.types import StringType,BooleanType,DateType, FloatType

#looping que percorre todas as colunas do dataframe, e quando encontrados os nomes numbers e total, a coluna é transformada
#de string para booleano, para alterar colunas é utilizado o método withColumn, e para muda o tipo de dados o método cast

for i in data.columns:
    if "number" in i:
        data = data.withColumn(i, col(i).cast(FloatType()))
    elif "total" in i:
        data = data.withColumn(i, col(i).cast(FloatType()))
    elif "length" in i:
        data = data.withColumn(i, col(i).cast(FloatType()))

In [23]:
#Visualizando se as transformações foram realizadas de forma correta

data.printSchema()

root
 |-- : string (nullable = true)
 |-- state: string (nullable = true)
 |-- account_length: float (nullable = true)
 |-- area_code: string (nullable = true)
 |-- international_plan: string (nullable = true)
 |-- voice_mail_plan: string (nullable = true)
 |-- number_vmail_messages: float (nullable = true)
 |-- total_day_minutes: float (nullable = true)
 |-- total_day_calls: float (nullable = true)
 |-- total_day_charge: float (nullable = true)
 |-- total_eve_minutes: float (nullable = true)
 |-- total_eve_calls: float (nullable = true)
 |-- total_eve_charge: float (nullable = true)
 |-- total_night_minutes: float (nullable = true)
 |-- total_night_calls: float (nullable = true)
 |-- total_night_charge: float (nullable = true)
 |-- total_intl_minutes: float (nullable = true)
 |-- total_intl_calls: float (nullable = true)
 |-- total_intl_charge: float (nullable = true)
 |-- number_customer_service_calls: float (nullable = true)
 |-- churn: string (nullable = true)



In [24]:
#Criando uma função para lista as colunas de acordo com suas variáveis, para usar em um select

def listVarByType(type):
    booleanVars = []

    for i in data.dtypes:
        if i[1] == f"{type}":
            booleanVars.append(i[0])
    return booleanVars

In [25]:
booleanVars

['number_vmail_messages',
 'total_day_minutes',
 'total_day_calls',
 'total_day_charge',
 'total_eve_minutes',
 'total_eve_calls',
 'total_eve_charge',
 'total_night_minutes',
 'total_night_calls',
 'total_night_charge',
 'total_intl_minutes',
 'total_intl_calls',
 'total_intl_charge',
 'number_customer_service_calls']

In [26]:
data.select(listVarByType('string')).show()

+---+-----+-------------+------------------+---------------+-----+
|   |state|    area_code|international_plan|voice_mail_plan|churn|
+---+-----+-------------+------------------+---------------+-----+
|  1|   KS|area_code_415|                no|            yes|   no|
|  2|   OH|area_code_415|                no|            yes|   no|
|  3|   NJ|area_code_415|                no|             no|   no|
|  4|   OH|area_code_408|               yes|             no|   no|
|  5|   OK|area_code_415|               yes|             no|   no|
|  6|   AL|area_code_510|               yes|             no|   no|
|  7|   MA|area_code_510|                no|            yes|   no|
|  8|   MO|area_code_415|               yes|             no|   no|
|  9|   LA|area_code_408|                no|             no|   no|
| 10|   WV|area_code_415|               yes|            yes|   no|
| 11|   IN|area_code_415|                no|             no|  yes|
| 12|   RI|area_code_415|                no|             no|  

In [36]:
data.groupBy('state').avg('total_day_minutes').show()

+-----+----------------------+
|state|avg(total_day_minutes)|
+-----+----------------------+
|   AZ|     171.6046872138977|
|   SC|     166.4416673342387|
|   LA|    178.37647082758886|
|   MN|    183.35476157778785|
|   NJ|    196.22499982048484|
|   DC|    171.37962934705945|
|   OR|    176.24615434499887|
|   VA|    177.24415608195517|
|   RI|    167.47846128023588|
|   WY|    180.17012903287812|
|   KY|    173.75423793469446|
|   NH|    177.32857264791215|
|   MI|     180.5931496554858|
|   NV|    176.42575651226622|
|   WI|    179.13076868424048|
|   ID|    178.61917814489914|
|   CA|    183.56470691456514|
|   NE|    177.46557423325837|
|   CT|    175.14054107666016|
|   MT|     174.0073528289795|
+-----+----------------------+
only showing top 20 rows

