In [None]:
%pip install spark

In [None]:
%pip install pyspark

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.master('local[*]').appName('transformacao com Spark 1').getOrCreate()

In [None]:
spark

In [None]:
dados = spark.read\
    .format('csv')\
    .option('inferSchema', 'true')\
    .option('sep', ',')\
    .option('header', 'true')\
    .load('dados/dados_clientes.csv')

### Explorando dados

In [None]:
dados

In [None]:
dados.limit(15).toPandas()

In [None]:
dados.count()

In [None]:
dados.groupBy('Churn').count().show()

In [None]:
dados.printSchema()

### Colunas binárias

In [None]:
colunasBinarias = [
    'Churn',
    'Conjuge',
    'Dependentes',
    'TelefoneFixo',
    'MaisDeUmaLinhaTelefonica',
    'SegurancaOnline',
    'BackupOnline',
    'SeguroDispositivo',
    'SuporteTecnico',
    'TVaCabo',
    'StreamingFilmes',
    'ContaCorreio'
]

In [None]:
from pyspark.sql import functions as f

In [None]:
todasColunas = [f.when(f.col(c)== 'Sim', 1).otherwise(0).alias(c) for c in colunasBinarias] 

In [None]:
todasColunas
# Podemos ler a primeira linha como: quando Churn for igual a "Sim", 
# seu valor será 1; caso contrário, será igual a 0. "AS Churn" é a atribuição do apelido para a coluna.

In [None]:
for coluna in reversed(dados.columns): 
    if coluna not in colunasBinarias:
        todasColunas.insert(0, coluna)
todasColunas

In [None]:
dados.select(todasColunas).show()

In [None]:
dataset = dados.select(todasColunas)

In [None]:
dataset.printSchema()

### Criando dummies

In [None]:
dados.select(['Internet', 'TipoContrato', 'MetodoPagamento']).show()

In [None]:
dataset.groupBy('id').pivot('Internet').agg(f.lit(1)).na.fill(0).show()

In [None]:
Internet = dataset.groupBy('id').pivot('Internet').agg(f.lit(1)).na.fill(0)
TipoContrato = dataset.groupBy('id').pivot('TipoContrato').agg(f.lit(1)).na.fill(0)
MetodoPagamento = dataset.groupBy('id').pivot('MetodoPagamento').agg(f.lit(1)).na.fill(0)

In [None]:
dataset = dataset\
    .join(Internet, 'id', how='inner')\
    .join(TipoContrato, 'id', how='inner')\
    .join(MetodoPagamento, 'id', how='inner')\
    .select(
        '*',
        f.col('DSL').alias('Internet_DSL'), 
        f.col('FibraOptica').alias('Internet_FibraOptica'), 
        f.col('Nao').alias('Internet_Nao'), 
        f.col('Mensalmente').alias('TipoContrato_Mensalmente'), 
        f.col('UmAno').alias('TipoContrato_UmAno'), 
        f.col('DoisAnos').alias('TipoContrato_DoisAnos'), 
        f.col('DebitoEmConta').alias('MetodoPagamento_DebitoEmConta'), 
        f.col('CartaoCredito').alias('MetodoPagamento_CartaoCredito'), 
        f.col('BoletoEletronico').alias('MetodoPagamento_BoletoEletronico'), 
        f.col('Boleto').alias('MetodoPagamento_Boleto')        
    ).drop(
        'Internet', 'TipoContrato', 'MetodoPagamento', 'DSL', 
        'FibraOptica', 'Nao', 'Mensalmente', 'UmAno', 'DoisAnos', 
        'DebitoEmConta', 'CartaoCredito', 'BoletoEletronico', 'Boleto'
    )

In [None]:
dataset.printSchema()

### Salvando

In [None]:
dataset.write.mode("overwrite").format('csv').save('/dados/')