<a href="https://colab.research.google.com/github/andcore20/Fifteen_Hundred_Curso_Dados/blob/main/Desafio_Final_Testes_com_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PYSPARK ON COLAB



In [None]:
!apt-get update -qq

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

!tar xf spark-3.1.2-bin-hadoop2.7.tgz

!pip install -q findspark

W: https://cloud.r-project.org/bin/linux/ubuntu/jammy-cran40/InRelease: Key is stored in legacy trusted.gpg keyring (/etc/apt/trusted.gpg), see the DEPRECATION section in apt-key(8) for details.


In [None]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
import findspark

findspark.init()

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
  .master('local[*]')\
  .appName("Iniciando com Spark")\
  .getOrCreate()

# Importando dados com pandas

In [None]:
import pandas as pd
from google.colab import drive
drive.mount('/content/drive')

path_bank = '/content/drive/MyDrive/17.UNI1500th/Projetos em grupo AWS UNI1500/Desafio final/bank-full.csv'
path_bank_ad = '/content/drive/MyDrive/17.UNI1500th/Projetos em grupo AWS UNI1500/Desafio final/bank-additional-full.csv'

bank = pd.read_csv(path_bank, sep=';')
bank_ad = pd.read_csv(path_bank_ad, sep=';')

Mounted at /content/drive


# Tratamento Perfil de Cliente

## Filtro poutcome = 'success' ou y = 'yes'

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Crie uma sessão do Spark
spark = SparkSession.builder.getOrCreate()

# Converta os DataFrames pandas em DataFrames Spark
bank_ad_spark = spark.createDataFrame(bank_ad)
bank_spark = spark.createDataFrame(bank)
bank_spark = bank_spark[['job', 'age', 'marital', 'education', 'default', 'housing', 'loan', 'poutcome', 'y']]
bank_ad_spark = bank_ad_spark[['job', 'age', 'marital', 'education', 'default', 'housing', 'loan', 'poutcome', 'y']]

# Defina as condições
cond1 = (col("poutcome") == "success") | (col("y") == "yes")
cond2 = (col("poutcome") == "success") | (col("y") == "yes")

# Aplique as condições e faça a concatenação dos DataFrames
bank_final = bank_ad_spark.filter(cond1).union(bank_spark.filter(cond2))

# Exiba o DataFrame resultante
bank_final.show()


+------------+---+--------+-------------------+-------+-------+----+-----------+---+
|         job|age| marital|          education|default|housing|loan|   poutcome|  y|
+------------+---+--------+-------------------+-------+-------+----+-----------+---+
| blue-collar| 41|divorced|           basic.4y|unknown|    yes|  no|nonexistent|yes|
|entrepreneur| 49| married|  university.degree|unknown|    yes|  no|nonexistent|yes|
|  technician| 49| married|           basic.9y|     no|     no|  no|nonexistent|yes|
|  technician| 41| married|professional.course|unknown|    yes|  no|nonexistent|yes|
| blue-collar| 45| married|           basic.9y|unknown|    yes|  no|nonexistent|yes|
| blue-collar| 42| married|           basic.9y|     no|    yes| yes|nonexistent|yes|
|   housemaid| 39| married|           basic.9y|     no|    yes|  no|nonexistent|yes|
|     unknown| 28|  single|            unknown|unknown|    yes| yes|nonexistent|yes|
|    services| 44| married|        high.school|     no|    yes|  

In [None]:
bank_final.count()

10941

## Ajustando alguns valores

In [None]:

# Exibir valores únicos de cada coluna
#for column in bank_final.columns:
#    unique_values = bank_final.select(column).distinct().collect()
#    print(f"Valores únicos da coluna '{column}':")
#    for value in unique_values:
#        print(value[0])
#    print()


In [None]:


# Tratar valores 'unknown', 'other' e 'nonexistent' como 'failure'
#bank_final = bank_final.withColumn("poutcome", when(bank_final["poutcome"].isin(["unknown", "other", "nonexistent"]), "failure").otherwise(bank_final["poutcome"]))
#bank_final.select("poutcome").distinct().collect()

## Ajustando coluna education

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

# Dicionário de substituição
substitution_dict = {
    'basic.4y': 'basic',
    'basic.6y': 'basic',
    'basic.9y': 'basic',
    'high.school': 'secondary',
    'university.degree': 'tertiary',
    'professional.course': 'tertiary'
}

# Substituir os valores na coluna "education" usando a função when() e col()
bank_final = bank_final.withColumn(
    'education',
    when(col('education').isin(list(substitution_dict.keys())),
         col('education')).otherwise(col('education'))
)

# Substituir os valores na coluna "education" usando a função replace()
for old_value, new_value in substitution_dict.items():
    bank_final = bank_final.replace(old_value, new_value, 'education')

# Exibir o DataFrame resultante
bank_final.count()

10941

## Coluna faixa_idades

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

# Defina as condições para cada faixa de idade
conditions = [
    (col("age").between(17, 25)),
    (col("age").between(26, 39)),
    (col("age").between(40, 59)),
    (col("age").between(60, 98))
]

# Defina os valores correspondentes para cada faixa de idade
values = ['17 - 25', '26 - 39', '40 - 59', '60 - 98']

# Use a função 'when' para atribuir os valores corretos com base nas condições
bank_final = bank_final.withColumn("faixa_idades", when(conditions[0], values[0])
                                                        .when(conditions[1], values[1])
                                                        .when(conditions[2], values[2])
                                                        .when(conditions[3], values[3])
                                                        .otherwise(None))

# Mostre o DataFrame resultante
bank_final.show()


+------------+---+--------+---------+-------+-------+----+-----------+---+------------+
|         job|age| marital|education|default|housing|loan|   poutcome|  y|faixa_idades|
+------------+---+--------+---------+-------+-------+----+-----------+---+------------+
| blue-collar| 41|divorced|    basic|unknown|    yes|  no|nonexistent|yes|     40 - 59|
|entrepreneur| 49| married| tertiary|unknown|    yes|  no|nonexistent|yes|     40 - 59|
|  technician| 49| married|    basic|     no|     no|  no|nonexistent|yes|     40 - 59|
|  technician| 41| married| tertiary|unknown|    yes|  no|nonexistent|yes|     40 - 59|
| blue-collar| 45| married|    basic|unknown|    yes|  no|nonexistent|yes|     40 - 59|
| blue-collar| 42| married|    basic|     no|    yes| yes|nonexistent|yes|     40 - 59|
|   housemaid| 39| married|    basic|     no|    yes|  no|nonexistent|yes|     26 - 39|
|     unknown| 28|  single|  unknown|unknown|    yes| yes|nonexistent|yes|     26 - 39|
|    services| 44| married|secon

In [None]:
bank_final.filter(col('faixa_idades') == '17 - 25').count()

726

In [None]:
bank_final.filter((bank_final['faixa_idades'] == '17 - 25') & (bank_final['y'] == 'yes') & (bank_final['poutcome'] == 'success')).count()

140

## Agrupamento faixa idade micro

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

# Defina as faixas de idades desejadas
idade_inicial = 17
idade_final = 98
intervalo_idades = 3

# Crie uma lista de tuplas para armazenar as condições e os valores
faixas_idade = []

# Use um loop for para criar as condições e os valores para todas as faixas de idades
for i in range(idade_inicial, idade_final + 1, intervalo_idades):
    faixa_idade = f"{i} - {i + intervalo_idades - 1}"
    condition = col("age").between(i, i + intervalo_idades - 1)
    faixas_idade.append((condition, faixa_idade))

# Crie uma coluna inicializada com None
faixa_idades_micro_col = None

# Use a função 'when' com a lista de tuplas de condições e valores para criar a coluna 'faixa_idades_micro'
for condition, value in faixas_idade:
    if faixa_idades_micro_col is None:
        faixa_idades_micro_col = when(condition, value)
    else:
        faixa_idades_micro_col = faixa_idades_micro_col.when(condition, value)

# Atribua o valor padrão None usando a função otherwise
faixa_idades_micro_col = faixa_idades_micro_col.otherwise(None)

# Adicione a coluna 'faixa_idades_micro' ao DataFrame
bank_final = bank_final.withColumn("faixa_idades_micro", faixa_idades_micro_col)

# Mostre o DataFrame resultante
bank_final.show()


## Tabela Perfil_Geral

In [None]:
perfil_geral = bank_final

## Tabela perfil_faixa_idades

In [None]:
from pyspark.sql import functions as F
from pyspark.sql import Window

# Agrupamento por faixa de idade e contagem
grouped_df = bank_final.groupBy('faixa_idades').agg(
    F.count(F.lit(1)).alias('count'),
    F.round((F.sum(F.when(F.col('poutcome') == 'success', 1).otherwise(0)) / F.count(F.lit(1))), 2).alias('conversão_anterior'),
    F.round((F.sum(F.when(F.col('y') == 'yes', 1).otherwise(0)) / F.count(F.lit(1))), 2).alias('conversão_atual'),
    F.round((F.sum(F.when(F.col('poutcome') == 'failure', 1).otherwise(0)) / F.count(F.lit(1))), 2).alias('churn_anterior'),
    F.round((F.sum(F.when(F.col('y') == 'yes', 1).otherwise(0)) / F.count(F.lit(1))), 2).alias('churn_atual')
)

# Cálculo da média da coluna age
average_age = bank_final.select(F.avg('age')).collect()[0][0]
average_age = int(average_age + 0.5)  # Arredonda para cima caso .5 ou maior

# Valores que aparecem com maior frequência em cada coluna
most_frequent_job = bank_final.groupBy('job').agg(F.count(F.lit(1)).alias('count')).orderBy(F.desc('count')).select('job').collect()[0][0]
most_frequent_marital = bank_final.groupBy('marital').agg(F.count(F.lit(1)).alias('count')).orderBy(F.desc('count')).select('marital').collect()[0][0]
most_frequent_education = bank_final.groupBy('education').agg(F.count(F.lit(1)).alias('count')).orderBy(F.desc('count')).select('education').collect()[0][0]
most_frequent_default = bank_final.groupBy('default').agg(F.count(F.lit(1)).alias('count')).orderBy(F.desc('count')).select('default').collect()[0][0]
most_frequent_housing = bank_final.groupBy('housing').agg(F.count(F.lit(1)).alias('count')).orderBy(F.desc('count')).select('housing').collect()[0][0]
most_frequent_loan = bank_final.groupBy('loan').agg(F.count(F.lit(1)).alias('count')).orderBy(F.desc('count')).select('loan').collect()[0][0]

# Criação do DataFrame com os resultados
teste = grouped_df.withColumn('age', F.lit(average_age)) \
                     .withColumn('job', F.lit(most_frequent_job)) \
                     .withColumn('marital', F.lit(most_frequent_marital)) \
                     .withColumn('education', F.lit(most_frequent_education)) \
                     .withColumn('default', F.lit(most_frequent_default)) \
                     .withColumn('housing', F.lit(most_frequent_housing)) \
                     .withColumn('loan', F.lit(most_frequent_loan))

teste.show()


+------------+-----+------------------+---------------+--------------+-----------+---+------+-------+---------+-------+-------+----+
|faixa_idades|count|conversão_anterior|conversão_atual|churn_anterior|churn_atual|age|   job|marital|education|default|housing|loan|
+------------+-----+------------------+---------------+--------------+-----------+---+------+-------+---------+-------+-------+----+
|     60 - 98| 1185|              0.37|            0.9|          0.12|        0.9| 41|admin.|married| tertiary|     no|     no|  no|
|     17 - 25|  726|              0.27|           0.92|          0.11|       0.92| 41|admin.|married| tertiary|     no|     no|  no|
|     26 - 39| 5326|              0.25|            0.9|          0.11|        0.9| 41|admin.|married| tertiary|     no|     no|  no|
|     40 - 59| 3704|              0.25|           0.91|          0.11|       0.91| 41|admin.|married| tertiary|     no|     no|  no|
+------------+-----+------------------+---------------+--------------

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, round, avg
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, rank

# Criar uma sessão no Spark
spark = SparkSession.builder.getOrCreate()

# Agrupar por faixa_idades e contar a quantidade de vezes que aparece cada faixa
grouped_df = bank_final.groupBy('faixa_idades').agg(count('faixa_idades').alias('count'))

# Calcular a taxa de conversão por faixa de idades
conversao_df = bank_final.filter((bank_final['poutcome'] == 'success') & (bank_final['y'] == 'yes')) \
    .groupBy('faixa_idades').agg((count('faixa_idades') / bank_final.count()).alias('conversão'))

# Obter o valor mais frequente de 'job' para cada faixa de idades
job_df = bank_final.groupBy('faixa_idades', 'job').agg(count('job').alias('job_count')) \
    .withColumn('rank', rank().over(Window.partitionBy('faixa_idades').orderBy(desc('job_count')))) \
    .filter(col('rank') == 1).select('faixa_idades', 'job')

# Calcular a média da coluna 'age' e arredondar para inteiro para cada faixa de idades
age_df = bank_final.groupBy('faixa_idades').agg(round(avg('age')).cast('integer').alias('age'))

# Obter o valor mais frequente de 'marital' para cada faixa de idades
marital_df = bank_final.groupBy('faixa_idades', 'marital').agg(count('marital').alias('marital_count')) \
    .withColumn('rank', rank().over(Window.partitionBy('faixa_idades').orderBy(desc('marital_count')))) \
    .filter(col('rank') == 1).select('faixa_idades', 'marital')

# Obter o valor mais frequente de 'education' para cada faixa de idades
education_df = bank_final.groupBy('faixa_idades', 'education').agg(count('education').alias('education_count')) \
    .withColumn('rank', rank().over(Window.partitionBy('faixa_idades').orderBy(desc('education_count')))) \
    .filter(col('rank') == 1).select('faixa_idades', 'education')

# Obter o valor mais frequente de 'default' para cada faixa de idades
default_df = bank_final.groupBy('faixa_idades', 'default').agg(count('default').alias('default_count')) \
    .withColumn('rank', rank().over(Window.partitionBy('faixa_idades').orderBy(desc('default_count')))) \
    .filter(col('rank') == 1).select('faixa_idades', 'default')

# Obter o valor mais frequente de 'housing' para cada faixa de idades
housing_df = bank_final.groupBy('faixa_idades', 'housing').agg(count('housing').alias('housing_count')) \
    .withColumn('rank', rank().over(Window.partitionBy('faixa_idades').orderBy(desc('housing_count')))) \
    .filter(col('rank') == 1).select('faixa_idades', 'housing')

# Obter o valor mais frequente de 'loan' para cada faixa de idades
loan_df = bank_final.groupBy('faixa_idades', 'loan').agg(count('loan').alias('loan_count')) \
    .withColumn('rank', rank().over(Window.partitionBy('faixa_idades').orderBy(desc('loan_count')))) \
    .filter(col('rank') == 1).select('faixa_idades', 'loan')

# Juntar todos os dataframes em um único dataframe
result_df = grouped_df.join(conversao_df, 'faixa_idades', 'left') \
    .join(job_df, 'faixa_idades', 'left') \
    .join(age_df, 'faixa_idades', 'left') \
    .join(marital_df, 'faixa_idades', 'left') \
    .join(education_df, 'faixa_idades', 'left') \
    .join(default_df, 'faixa_idades', 'left') \
    .join(housing_df, 'faixa_idades', 'left') \
    .join(loan_df, 'faixa_idades', 'left')

# Ordenar o DataFrame resultante na ordem '17 - 25', '26 - 39', '40 - 59', '60 - 98'
result_df = result_df.orderBy(col("faixa_idades").asc())

# Multiplicar a coluna 'nome_da_coluna' por 100
result_df = result_df.withColumn('conversão', col('conversão') * 100).drop('conversao')

result_df.show()

NameError: ignored

In [None]:
result_df.filter(col('faixa_idades') == '17 - 25').count()

NameError: ignored

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, round, avg, col, when
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, rank

# Criar uma sessão no Spark
spark = SparkSession.builder.getOrCreate()

# Recreate a coluna 'faixa_idades' usando a coluna 'age'
bank_final = bank_final.withColumn('faixa_idades',
                        when((col('age') >= 17) & (col('age') <= 25), '17 - 25')
                       .when((col('age') >= 26) & (col('age') <= 39), '26 - 39')
                       .when((col('age') >= 40) & (col('age') <= 59), '40 - 59')
                       .when((col('age') >= 60) & (col('age') <= 98), '60 - 98')
                       .otherwise('Outra'))

# Agrupar por faixa_idades e contar a quantidade de vezes que aparece cada faixa
grouped_df = bank_final.groupBy('faixa_idades').agg(count('faixa_idades').alias('count'))

# Calcular a taxa de conversão por faixa de idades
conversao_df = bank_final.filter((bank_final['poutcome'] == 'success') & (bank_final['y'] == 'yes')) \
    .groupBy('faixa_idades').agg((count('faixa_idades') / grouped_df.select('count').collect()[0][0]).alias('conversão'))

# Obter o valor mais frequente de 'job' para cada faixa de idades
job_df = bank_final.groupBy('faixa_idades', 'job').agg(count('job').alias('job_count')) \
    .withColumn('rank', rank().over(Window.partitionBy('faixa_idades').orderBy(desc('job_count')))) \
    .filter(col('rank') == 1).select('faixa_idades', 'job')

# Calcular a média da coluna 'age' e arredondar para inteiro para cada faixa de idades
age_df = bank_final.groupBy('faixa_idades').agg(round(avg('age')).cast('integer').alias('age'))

# Obter o valor mais frequente de 'marital' para cada faixa de idades
marital_df = bank_final.groupBy('faixa_idades', 'marital').agg(count('marital').alias('marital_count')) \
    .withColumn('rank', rank().over(Window.partitionBy('faixa_idades').orderBy(desc('marital_count')))) \
    .filter(col('rank') == 1).select('faixa_idades', 'marital')

# Obter o valor mais frequente de 'education' para cada faixa de idades
education_df = bank_final.groupBy('faixa_idades', 'education').agg(count('education').alias('education_count')) \
    .withColumn('rank', rank().over(Window.partitionBy('faixa_idades').orderBy(desc('education_count')))) \
    .filter(col('rank') == 1).select('faixa_idades', 'education')

# Obter o valor mais frequente de 'default' para cada faixa de idades
default_df = bank_final.groupBy('faixa_idades', 'default').agg(count('default').alias('default_count')) \
    .withColumn('rank', rank().over(Window.partitionBy('faixa_idades').orderBy(desc('default_count')))) \
    .filter(col('rank') == 1).select('faixa_idades', 'default')

# Obter o valor mais frequente de 'housing' para cada faixa de idades
housing_df = bank_final.groupBy('faixa_idades', 'housing').agg(count('housing').alias('housing_count')) \
    .withColumn('rank', rank().over(Window.partitionBy('faixa_idades').orderBy(desc('housing_count')))) \
    .filter(col('rank') == 1).select('faixa_idades', 'housing')

# Obter o valor mais frequente de 'loan' para cada faixa de idades
loan_df = bank_final.groupBy('faixa_idades', 'loan').agg(count('loan').alias('loan_count')) \
    .withColumn('rank', rank().over(Window.partitionBy('faixa_idades').orderBy(desc('loan_count')))) \
    .filter(col('rank') == 1).select('faixa_idades', 'loan')

# Juntar todos os dataframes em um único dataframe
perfil_faixa_idades = grouped_df.join(conversao_df, 'faixa_idades', 'left') \
    .join(job_df, 'faixa_idades', 'left') \
    .join(age_df, 'faixa_idades', 'left') \
    .join(marital_df, 'faixa_idades', 'left') \
    .join(education_df, 'faixa_idades', 'left') \
    .join(default_df, 'faixa_idades', 'left') \
    .join(housing_df, 'faixa_idades', 'left') \
    .join(loan_df, 'faixa_idades', 'left')

# Ordenar o DataFrame resultante na ordem '17 - 25', '26 - 39', '40 - 59', '60 - 98'
perfil_faixa_idades = perfil_faixa_idades.orderBy(col("faixa_idades").asc())

# Multiplicar a coluna 'conversão' por 100
perfil_faixa_idades = perfil_faixa_idades.withColumn('conversão', col('conversão') * 100)


perfil_faixa_idades.show()


NameError: ignored

## Tabela perfil_job

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col

# Agrupar por job e contar a quantidade de vezes que cada job aparece
grouped_df = bank_final.groupBy('job').agg(count('job').alias('count'))

# Calcular a conversão por job
conversao_df = bank_final.filter((bank_final['poutcome'] == 'success') & (bank_final['y'] == 'yes')) \
    .groupBy('job').agg((count('job') / bank_final.count()).alias('conversão'))

# Juntar os dois dataframes em um único dataframe
perfil_job = grouped_df.join(conversao_df, 'job', 'left').orderBy(col("count").desc())

perfil_job = perfil_job.withColumn('conversão', col('conversão') * 100)

# Exibir a tabela resultante
perfil_job.show()


+-------------+-----+-------------------+
|          job|count|          conversão|
+-------------+-----+-------------------+
|       admin.| 2207| 3.7290924047162046|
|   management| 1779|  3.034457545014167|
|   technician| 1749| 2.5317612649666392|
|  blue-collar| 1469| 1.3161502604880724|
|      retired| 1040| 2.2118636322091216|
|     services|  746| 0.9231331688145508|
|      student|  603| 1.3344301252170734|
|   unemployed|  381| 0.8317338451695457|
|self-employed|  369|0.47527648295402614|
| entrepreneur|  274|0.18279864729001005|
|    housemaid|  238| 0.4021570240380221|
|      unknown|   86|0.13709898546750754|
+-------------+-----+-------------------+



## tabela perfil_marital

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col

# Agrupar por marital e contar a quantidade de vezes que cada marital aparece
grouped_df = bank_final.groupBy('marital').agg(count('marital').alias('count'))

# Calcular a conversão por marital
conversao_df = bank_final.filter((bank_final['poutcome'] == 'success') & (bank_final['y'] == 'yes')) \
    .groupBy('marital').agg((count('marital') / bank_final.count()).alias('conversão'))

# Juntar os dois dataframes em um único dataframe
perfil_marital = grouped_df.join(conversao_df, 'marital', 'left').orderBy(col("count").desc())

perfil_marital = perfil_marital.withColumn('conversão', col('conversão') * 100)

# Exibir a tabela resultante
perfil_marital.show()


+--------+-----+--------------------+
| marital|count|           conversão|
+--------+-----+--------------------+
| married| 5830|  0.0928617128233251|
|  single| 3904|0.060963348871218355|
|divorced| 1193|0.017000274197970934|
| unknown|   14|2.741979709350150...|
+--------+-----+--------------------+



## Tabela perfil_education

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col

# Agrupar por education e contar a quantidade de vezes que cada education aparece
grouped_df = bank_final.groupBy('education').agg(count('education').alias('count'))

# Calcular a conversão por education
conversao_df = bank_final.filter((bank_final['poutcome'] == 'success') & (bank_final['y'] == 'yes')) \
    .groupBy('education').agg((count('education') / bank_final.count()).alias('conversão'))

# Juntar os dois dataframes em um único dataframe
perfil_education = grouped_df.join(conversao_df, 'education', 'left').orderBy(col("count").desc())

perfil_education = perfil_education.withColumn('conversão', col('conversão') * 100)

# Exibir a tabela resultante
perfil_education.show()


+----------+-----+--------------------+
| education|count|           conversão|
+----------+-----+--------------------+
|  tertiary| 4712| 0.08107120007311946|
| secondary| 3833|0.056393382688968104|
|     basic| 1200|0.015172287725070835|
|   primary|  643|0.007403345215245407|
|   unknown|  549|0.010967918837400604|
|illiterate|    4|9.139932364500503E-5|
+----------+-----+--------------------+



## teste k-means chat bing

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.sql import SparkSession

# Criando uma sessão Spark
spark = SparkSession.builder.getOrCreate()

# Selecionando as colunas relevantes do DataFrame
data = bank_final.select("age", "job", "marital", "education", "default", "housing", "loan", "poutcome", "y")

# Convertendo a coluna "age" para tipo inteiro
data = data.withColumn("age", data["age"].cast("integer"))

# Removendo quaisquer linhas com valores nulos
data = data.dropna()

# Criando um vetor de recursos a partir das colunas relevantes
assembler = VectorAssembler(inputCols=["age"], outputCol="features")
data = assembler.transform(data)

# Aplicando o algoritmo K-means para definir 3 clusters
kmeans = KMeans(k=3, seed=123)
model = kmeans.fit(data)

# Obtendo as previsões dos clusters
predictions = model.transform(data)

# Exibindo os resultados
predictions.show()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc, first, count

# Inicialize o SparkSession
spark = SparkSession.builder.getOrCreate()

# Carregue a base de dados 'bank_final' para um DataFrame
bank_final = spark.read.csv('caminho_do_arquivo/bank_final.csv', header=True, inferSchema=True)

# Tabela 1: Agrupe por faixa_idades_frequencia, job com maior frequência e contagem
tabela1 = bank_final.groupBy("faixa_idades_frequencia") \
                    .agg(first("job").alias("job_maior_frequencia"),
                         count("job").alias("quantidade")) \
                    .orderBy("faixa_idades_frequencia")

# Tabela 2: marital com maior frequência e contagem
tabela2 = bank_final.groupBy("marital") \
                    .agg(first("marital").alias("marital_maior_frequencia"),
                         count("marital").alias("quantidade")) \
                    .orderBy("marital")

# Tabela 3: education com maior frequência e contagem
tabela3 = bank_final.groupBy("education") \
                    .agg(first("education").alias("education_maior_frequencia"),
                         count("education").alias("quantidade")) \
                    .orderBy("education")

# Mostre as tabelas resultantes
tabela1.show()
tabela2.show()
tabela3.show()


## Salvando

In [None]:
perfil_geral.to_csv('/content/drive/MyDrive/17.UNI1500th/Projetos em grupo AWS UNI1500/Desafio final/Analise Perfil Cliente/perfil_geral.csv',index=False)
result_df.to_csv('/content/drive/MyDrive/17.UNI1500th/Projetos em grupo AWS UNI1500/Desafio final/Analise Perfil Cliente/perfil_faixa_idades.csv',index=False)
perfil_job.to_csv('/content/drive/MyDrive/17.UNI1500th/Projetos em grupo AWS UNI1500/Desafio final/Analise Perfil Cliente/perfil_job.csv',index=False)
perfil_marital.to_csv('/content/drive/MyDrive/17.UNI1500th/Projetos em grupo AWS UNI1500/Desafio final/Analise Perfil Cliente/perfil_marital.csv',index=False)
perfil_education.to_csv('/content/drive/MyDrive/17.UNI1500th/Projetos em grupo AWS UNI1500/Desafio final/Analise Perfil Cliente/perfil_education.csv',index=False)

AttributeError: ignored

# Analise evolução da campanha

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Crie uma sessão do Spark
spark = SparkSession.builder.getOrCreate()

bank_ad_campanha = spark.createDataFrame(bank_ad)
bank_ad_campanha = bank_ad_campanha.drop('age','job','marital','education','default','housing','loan')

  for column, series in pdf.iteritems():


In [None]:
bank_ad_campanha.show()

+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|  contact|month|day_of_week|duration|campaign|pdays|previous|   poutcome|emp.var.rate|cons.price.idx|cons.conf.idx|euribor3m|nr.employed|  y|
+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|telephone|  may|        mon|     261|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|     5191.0| no|
|telephone|  may|        mon|     149|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|     5191.0| no|
|telephone|  may|        mon|     226|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|     5191.0| no|
|telephone|  may|        mon|     151|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|     5191.0| no|

## Tabela campanha_geral

In [None]:
campanha_geral = bank_ad_campanha

## Tabela campanha_month_previous_campaing

## Tabela campanha_month_yes

In [None]:
Tabela campanha_month_yes

## Tabela campanha_conversão

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Criar uma sessão no Spark
spark = SparkSession.builder.getOrCreate()

# Supondo que 'bank_ad_campanha' é o DataFrame que contém os dados
df = bank_ad_campanha

# Agrupar por month e contar a quantidade de vezes que aparece cada month
grouped_df = df.groupBy('month').agg(F.count('month').alias('count'))

# Calcular a taxa de conversão anterior (poutcome = success)
conversao_anterio_df = df.filter(df['poutcome'] == 'success') \
    .groupBy('month').agg((F.count('poutcome') / df.filter(df['poutcome'].isNotNull()).count() * 100).alias('conversão_anterio'))

# Calcular a taxa de conversão atual (y = yes)
conversao_atual_df = df.filter(df['y'] == 'yes') \
    .groupBy('month').agg((F.count('y') / df.filter(df['y'].isNotNull()).count() * 100).alias('conversão_atual'))

# Juntar os DataFrames em um único DataFrame
result_df = grouped_df.join(conversao_anterio_df, 'month', 'left') \
    .join(conversao_atual_df, 'month', 'left')

# Ordenar o DataFrame resultante por mês (de janeiro a dezembro)
meses_ordenados = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec']
result_df = result_df.withColumn("month", F.substring("month", 0, 3)) \
    .withColumn("month", F.when(F.col("month").isin(meses_ordenados), F.col("month"))) \
    .orderBy(F.when(F.col("month") == "jan", 1)
              .when(F.col("month") == "feb", 2)
              .when(F.col("month") == "mar", 3)
              .when(F.col("month") == "apr", 4)
              .when(F.col("month") == "may", 5)
              .when(F.col("month") == "jun", 6)
              .when(F.col("month") == "jul", 7)
              .when(F.col("month") == "aug", 8)
              .when(F.col("month") == "sep", 9)
              .when(F.col("month") == "oct", 10)
              .when(F.col("month") == "nov", 11)
              .when(F.col("month") == "dec", 12)
              .otherwise(F.lit(99)))

# Exibir o resultado
result_df.show()

+-----+-----+-------------------+-------------------+
|month|count|  conversão_anterio|    conversão_atual|
+-----+-----+-------------------+-------------------+
|  mar|  546|0.19665922113236864| 0.6700980868214043|
|  apr| 2632| 0.2694959696999126|  1.308633582596873|
|  may|13769| 0.5584150723511703| 2.1511119743614646|
|  jun| 5318|0.34961639312421094| 1.3571914149752355|
|  jul| 7174| 0.2622122948431582| 1.5757016606778675|
|  aug| 6178|0.49771778187821697|  1.590269010391376|
|  sep|  570|0.35932795959988345| 0.6215402544430416|
|  oct|  718| 0.3301932601728659| 0.7647858599592114|
|  nov| 4101| 0.4006021171214917| 1.0100029134699426|
|  dec|  182|0.10925512285131592|0.21608235408371368|
+-----+-----+-------------------+-------------------+



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Criar uma sessão no Spark
spark = SparkSession.builder.getOrCreate()

# Supondo que 'bank_ad_campanha' é o DataFrame que contém os dados
df = bank_ad_campanha

# Agrupar por month e contar a quantidade de vezes que aparece cada month
grouped_df = df.groupBy('month').agg(F.count('month').alias('count'))

# Contar a quantidade de vezes que poutcome = success aparece em cada month
count_anterio_df = df.filter(df['poutcome'] == 'success') \
    .groupBy('month').agg(F.count('poutcome').alias('count_anterio'))

# Contar a quantidade de vezes que y = yes aparece em cada month
count_atual_df = df.filter(df['y'] == 'yes') \
    .groupBy('month').agg(F.count('y').alias('count_atual'))

# Contar a quantidade total de linhas em poutcome e y
total_poutcome = df.filter(df['poutcome'].isNotNull()).count()
total_y = df.filter(df['y'].isNotNull()).count()

# Calcular a taxa de conversão anterior (poutcome = success)
conversao_anterio_df = count_anterio_df.withColumn('conversão_anterio', (F.col('count_anterio') / total_poutcome * 100))

# Calcular a taxa de conversão atual (y = yes)
conversao_atual_df = count_atual_df.withColumn('conversão_atual', (F.col('count_atual') / total_y * 100))

# Juntar todos os DataFrames em um único DataFrame
result_df = grouped_df.join(count_anterio_df, 'month', 'left') \
    .join(count_atual_df, 'month', 'left') \
    .join(conversao_anterio_df, 'month', 'left') \
    .join(conversao_atual_df, 'month', 'left')

# Ordenar o DataFrame resultante por mês (de janeiro a dezembro)
meses_ordenados = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec']
result_df = result_df.withColumn("month", F.substring("month", 0, 3)) \
    .withColumn("month", F.when(F.col("month").isin(meses_ordenados), F.col("month"))) \
    .orderBy(F.when(F.col("month") == "jan", 1)
              .when(F.col("month") == "feb", 2)
              .when(F.col("month") == "mar", 3)
              .when(F.col("month") == "apr", 4)
              .when(F.col("month") == "may", 5)
              .when(F.col("month") == "jun", 6)
              .when(F.col("month") == "jul", 7)
              .when(F.col("month") == "aug", 8)
              .when(F.col("month") == "sep", 9)
              .when(F.col("month") == "oct", 10)
              .when(F.col("month") == "nov", 11)
              .when(F.col("month") == "dec", 12)
              .otherwise(F.lit(99)))

# Exibir o resultado
result_df.show()


+-----+-----+-------------+-----------+-------------+-------------------+-----------+-------------------+
|month|count|count_anterio|count_atual|count_anterio|  conversão_anterio|count_atual|    conversão_atual|
+-----+-----+-------------+-----------+-------------+-------------------+-----------+-------------------+
|  mar|  546|           81|        276|           81|0.19665922113236864|        276| 0.6700980868214043|
|  apr| 2632|          111|        539|          111| 0.2694959696999126|        539|  1.308633582596873|
|  may|13769|          230|        886|          230| 0.5584150723511703|        886| 2.1511119743614646|
|  jun| 5318|          144|        559|          144|0.34961639312421094|        559| 1.3571914149752355|
|  jul| 7174|          108|        649|          108| 0.2622122948431582|        649| 1.5757016606778675|
|  aug| 6178|          205|        655|          205|0.49771778187821697|        655|  1.590269010391376|
|  sep|  570|          148|        256|       

In [None]:
# Renomear as colunas
bank_ad_campanha = bank_ad_campanha.withColumnRenamed("emp.var.rate", "emp_var_rate") \
           .withColumnRenamed("cons.price.idx", "cons_price_idx") \
           .withColumnRenamed("cons.conf.idx", "cons_conf_idx") \
           .withColumnRenamed("nr.employed", "nr_employed")

bank_ad_campanha.show()

+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|  contact|month|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|
+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|telephone|  may|        mon|     261|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|     5191.0| no|
|telephone|  may|        mon|     149|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|     5191.0| no|
|telephone|  may|        mon|     226|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|     5191.0| no|
|telephone|  may|        mon|     151|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|     5191.0| no|

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Criar uma sessão no Spark
spark = SparkSession.builder.getOrCreate()

# Supondo que 'bank_ad_campanha' é o DataFrame que contém os dados
df = bank_ad_campanha

# Filtrar os valores para 'total', 'positiva_anterior' (poutcome = success) e 'positiva_atual' (y = yes)
total_df = df.groupBy().agg(F.lit('total').alias('campanha'),
                            F.mean('emp_var_rate').alias('taxa_variacao_emprego_media'),
                            F.mean('cons_price_idx').alias('indice_preco_consumidor_medio'),
                            F.mean('cons_conf_idx').alias('indice_confianca_consumidor_medio'),
                            F.mean('euribor3m').alias('eurbor3m_media'),
                            F.mean('nr_employed').alias('n_funcionarios_medio'))

positiva_anterior_df = df.filter(df['poutcome'] == 'success').groupBy().agg(F.lit('positiva_anterior').alias('campanha'),
                                                                             F.mean('emp_var_rate').alias('taxa_variacao_emprego_media'),
                                                                             F.mean('cons_price_idx').alias('indice_preco_consumidor_medio'),
                                                                             F.mean('cons_conf_idx').alias('indice_confianca_consumidor_medio'),
                                                                             F.mean('euribor3m').alias('eurbor3m_media'),
                                                                             F.mean('nr_employed').alias('n_funcionarios_medio'))

positiva_atual_df = df.filter(df['y'] == 'yes').groupBy().agg(F.lit('positiva_atual').alias('campanha'),
                                                              F.mean('emp_var_rate').alias('taxa_variacao_emprego_media'),
                                                              F.mean('cons_price_idx').alias('indice_preco_consumidor_medio'),
                                                              F.mean('cons_conf_idx').alias('indice_confianca_consumidor_medio'),
                                                              F.mean('euribor3m').alias('eurbor3m_media'),
                                                              F.mean('nr_employed').alias('n_funcionarios_medio'))

# Unir os DataFrames em um único DataFrame
result_df = total_df.union(positiva_anterior_df).union(positiva_atual_df)

# Exibir o resultado
result_df.show()


+-----------------+---------------------------+-----------------------------+---------------------------------+------------------+--------------------+
|         campanha|taxa_variacao_emprego_media|indice_preco_consumidor_medio|indice_confianca_consumidor_medio|    eurbor3m_media|n_funcionarios_medio|
+-----------------+---------------------------+-----------------------------+---------------------------------+------------------+--------------------+
|            total|        0.08188550063146356|            93.57566436825361|               -40.50260027191408|3.6212908128580406|   5167.035910943787|
|positiva_anterior|        -2.0912600145666183|            93.33435542607359|               -38.38856518572522|0.9965630007283336|   5030.622432629189|
|   positiva_atual|        -1.2334482758620777|             93.3543859913807|              -39.789784482758215|2.1231351293103584|   5095.115991379242|
+-----------------+---------------------------+-----------------------------+-----------

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Criar uma sessão no Spark
spark = SparkSession.builder.getOrCreate()

# Supondo que 'bank_ad_campanha' é o DataFrame que contém os dados
df = bank_ad_campanha

# Agrupar por month e contar a quantidade de vezes que aparece cada month
grouped_df = df.groupBy('month').agg(
    F.count('month').alias('count'),
    F.count(F.when(df['poutcome'] == 'success', True)).alias('count_anterio'),
    F.count(F.when(df['y'] == 'yes', True)).alias('count_atual')
)

# Calcular a quantidade total de linhas em poutcome e y
total_poutcome = df.filter(df['poutcome'].isNotNull()).count()
total_y = df.filter(df['y'].isNotNull()).count()

# Calcular a taxa de conversão anterior (poutcome = success)
conversao_anterio_df = grouped_df.withColumn('conversão_anterio', (F.col('count_anterio') / total_poutcome * 100))

# Calcular a taxa de conversão atual (y = yes)
conversao_atual_df = conversao_anterio_df.withColumn('conversão_atual', (F.col('count_atual') / total_y * 100))

# Juntar os DataFrames em um único DataFrame
result_df = conversao_atual_df.drop('count_anterio', 'count_atual')

# Ordenar o DataFrame resultante por mês (de janeiro a dezembro)
meses_ordenados = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec']
result_df = result_df.withColumn("month_num", F.month(F.from_unixtime(F.unix_timestamp("month", "MMM"))))
windowSpec = Window.orderBy(F.when(F.col("month_num").isin(meses_ordenados), F.col("month_num")))
result_df = result_df.withColumn("month_num", F.row_number().over(windowSpec)).drop("month_num")
result_df = result_df.orderBy(F.col("month_num"))

# Exibir o resultado
result_df.show()


+-----+-----+-------------------+-------------------+
|month|count|  conversão_anterio|    conversão_atual|
+-----+-----+-------------------+-------------------+
|  jun| 5318|0.34961639312421094| 1.3571914149752355|
|  aug| 6178|0.49771778187821697|  1.590269010391376|
|  may|13769| 0.5584150723511703| 2.1511119743614646|
|  sep|  570|0.35932795959988345| 0.6215402544430416|
|  mar|  546|0.19665922113236864| 0.6700980868214043|
|  oct|  718| 0.3301932601728659| 0.7647858599592114|
|  jul| 7174| 0.2622122948431582| 1.5757016606778675|
|  nov| 4101| 0.4006021171214917| 1.0100029134699426|
|  apr| 2632| 0.2694959696999126|  1.308633582596873|
|  dec|  182|0.10925512285131592|0.21608235408371368|
+-----+-----+-------------------+-------------------+

