<a href="https://colab.research.google.com/github/CarolinaNovaesGN/projeto-dataset-markt/blob/main/pyspark_mini_projeto.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Instalando pyspark**


In [None]:
pip install pyspark



**Importanto SparkSession e outros...** 

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import *
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import *
from pyspark.sql.window import Window

**Configurando SparkSession**

In [None]:
spark = (SparkSession.builder
        .master("local")
        .appName("mini-projeto")
        .config("spark.ui.port","4050")
        .getOrCreate())

**Definindo a estrutura com StructType**

In [None]:
schema = StructType([
              StructField('row', IntegerType(), False),
              StructField('ID', IntegerType(), False),
              StructField('ano_nascimento', IntegerType(), False),
              StructField('educacao', StringType(), False),
              StructField('status_conjugal', StringType(), True),
              StructField('renda', StringType(), True),
              StructField('criancas_casa', IntegerType(), False),
              StructField('adolescentes_casa', IntegerType(), False),
              StructField('data_cliente', DateType(), False),
              StructField('vinhos', IntegerType(), False),
              StructField('frutas', IntegerType(), False),
              StructField('carne_produtos', IntegerType(), False),
              StructField('peixe_produtos', IntegerType(), False),
              StructField('doces_produtos', IntegerType(), False),
              StructField('ouro_produtos', IntegerType(), False),
              StructField('qtd_oferta_compras', IntegerType(), False),
              StructField('qtd_web_compras', IntegerType(), False),
              StructField('qtd_produtos_catalogo', IntegerType(), False),
              StructField('qtd_produtos_armazenados', IntegerType(), False),
              StructField('qtd_web_visitantes_mensal', IntegerType(), False),
              StructField('reclamacoes', IntegerType(), False)
])

df = spark.read.format("csv").option("inferSchema","false").option("header","true").option("sep",",").load("/content/pandas.csv",schema=schema)

In [None]:
df.show()

+---+----+--------------+-------------+---------------+-------+-------------+-----------------+------------+------+------+--------------+--------------+--------------+-------------+------------------+---------------+---------------------+------------------------+-------------------------+-----------+
|row|  ID|ano_nascimento|     educacao|status_conjugal|  renda|criancas_casa|adolescentes_casa|data_cliente|vinhos|frutas|carne_produtos|peixe_produtos|doces_produtos|ouro_produtos|qtd_oferta_compras|qtd_web_compras|qtd_produtos_catalogo|qtd_produtos_armazenados|qtd_web_visitantes_mensal|reclamacoes|
+---+----+--------------+-------------+---------------+-------+-------------+-----------------+------------+------+------+--------------+--------------+--------------+-------------+------------------+---------------+---------------------+------------------------+-------------------------+-----------+
|  0|5524|          1957|    graduacao|       Solteiro|58138.0|            0|                0

In [None]:
#filtrando ano de nascimento após ver resultados estranhos
df = df.filter(F.col("ano_nascimento") > 1930)

In [None]:
df.printSchema()

root
 |-- nmr_linha: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- ano_nascimento: integer (nullable = true)
 |-- educacao: string (nullable = true)
 |-- status_conjugal: string (nullable = true)
 |-- renda: string (nullable = true)
 |-- criancas_casa: integer (nullable = true)
 |-- adolescentes_casa: integer (nullable = true)
 |-- data_cliente: date (nullable = true)
 |-- vinhos: integer (nullable = true)
 |-- frutas: integer (nullable = true)
 |-- carne_produtos: integer (nullable = true)
 |-- peixe_produtos: integer (nullable = true)
 |-- doces_produtos: integer (nullable = true)
 |-- ouro_produtos: integer (nullable = true)
 |-- qtd_oferta_compras: integer (nullable = true)
 |-- qtd_web_compras: integer (nullable = true)
 |-- qtd_produtos_catalogo: integer (nullable = true)
 |-- qtd_produtos_armazenados: integer (nullable = true)
 |-- qtd_web_visitantes_mensal: integer (nullable = true)
 |-- reclamacoes: integer (nullable = true)
 |-- total_produtos: integer (nu

**Mudando nome da primeira e segunda coluna**

In [None]:

df = df.withColumnRenamed("row","nmr_linha")
df = df.withColumnRenamed("ID","id")


**Criando novas colunas**

In [None]:
def montar_soma_colunas(termo,dataframe):
  return '+'.join([x for x in dataframe.columns if termo in x])


soma_produtos = montar_soma_colunas('produto',df)
soma_compras=montar_soma_colunas('compra',df)
df=df.withColumn('total_produtos',F.expr(soma_produtos)).withColumn('total_compras',F.expr(soma_compras))


**Filtrando valores nulos**

In [None]:
df = df.filter(F.col("status_conjugal") != 'null').filter(F.col("renda") != 'null')
df.show()

+---------+----+--------------+-------------+---------------+-------+-------------+-----------------+------------+------+------+--------------+--------------+--------------+-------------+------------------+---------------+---------------------+------------------------+-------------------------+-----------+--------------+-------------+----+
|nmr_linha|  id|ano_nascimento|     educacao|status_conjugal|  renda|criancas_casa|adolescentes_casa|data_cliente|vinhos|frutas|carne_produtos|peixe_produtos|doces_produtos|ouro_produtos|qtd_oferta_compras|qtd_web_compras|qtd_produtos_catalogo|qtd_produtos_armazenados|qtd_web_visitantes_mensal|reclamacoes|total_produtos|total_compras| ano|
+---------+----+--------------+-------------+---------------+-------+-------------+-----------------+------------+------+------+--------------+--------------+--------------+-------------+------------------+---------------+---------------------+------------------------+-------------------------+-----------+---------

**Criando uma coluna apenas com os anos das compras**

In [None]:
df = df.withColumn("ano",F.substring(F.col("data_cliente"), 1, 4))

In [None]:
df.write.format('csv').save('content/csv/py_df_geral2',header = True)
df.show()

+---------+----+--------------+-------------+---------------+-------+-------------+-----------------+------------+------+------+--------------+--------------+--------------+-------------+------------------+---------------+---------------------+------------------------+-------------------------+-----------+--------------+-------------+----+
|nmr_linha|  id|ano_nascimento|     educacao|status_conjugal|  renda|criancas_casa|adolescentes_casa|data_cliente|vinhos|frutas|carne_produtos|peixe_produtos|doces_produtos|ouro_produtos|qtd_oferta_compras|qtd_web_compras|qtd_produtos_catalogo|qtd_produtos_armazenados|qtd_web_visitantes_mensal|reclamacoes|total_produtos|total_compras| ano|
+---------+----+--------------+-------------+---------------+-------+-------------+-----------------+------------+------+------+--------------+--------------+--------------+-------------+------------------+---------------+---------------------+------------------------+-------------------------+-----------+---------

**Mostrando id que não comprou nada.** 

In [None]:
df_sem_compras = df.filter(F.col("total_compras")== 0)
df_sem_compras.show() 
df_sem_compras.write.format('csv').save('content/csv/py_df_sem_compras2',header = True)

+---------+-----+--------------+---------+---------------+--------+-------------+-----------------+------------+------+------+--------------+--------------+--------------+-------------+------------------+---------------+---------------------+------------------------+-------------------------+-----------+--------------+-------------+----+
|nmr_linha|   id|ano_nascimento| educacao|status_conjugal|   renda|criancas_casa|adolescentes_casa|data_cliente|vinhos|frutas|carne_produtos|peixe_produtos|doces_produtos|ouro_produtos|qtd_oferta_compras|qtd_web_compras|qtd_produtos_catalogo|qtd_produtos_armazenados|qtd_web_visitantes_mensal|reclamacoes|total_produtos|total_compras| ano|
+---------+-----+--------------+---------+---------------+--------+-------------+-----------------+------------+------+------+--------------+--------------+--------------+-------------+------------------+---------------+---------------------+------------------------+-------------------------+-----------+--------------+

**Mostrando quantidades de compras por ano**

In [None]:
df.printSchema()

root
 |-- nmr_linha: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- ano_nascimento: integer (nullable = true)
 |-- educacao: string (nullable = true)
 |-- status_conjugal: string (nullable = true)
 |-- renda: string (nullable = true)
 |-- criancas_casa: integer (nullable = true)
 |-- adolescentes_casa: integer (nullable = true)
 |-- data_cliente: date (nullable = true)
 |-- vinhos: integer (nullable = true)
 |-- frutas: integer (nullable = true)
 |-- carne_produtos: integer (nullable = true)
 |-- peixe_produtos: integer (nullable = true)
 |-- doces_produtos: integer (nullable = true)
 |-- ouro_produtos: integer (nullable = true)
 |-- qtd_oferta_compras: integer (nullable = true)
 |-- qtd_web_compras: integer (nullable = true)
 |-- qtd_produtos_catalogo: integer (nullable = true)
 |-- qtd_produtos_armazenados: integer (nullable = true)
 |-- qtd_web_visitantes_mensal: integer (nullable = true)
 |-- reclamacoes: integer (nullable = true)
 |-- total_produtos: integer (nu

**Mostrando quantidade de compras por ano**

In [None]:
from pyspark.sql.functions import sum
import pyspark.sql.functions as F
df_qtd_compras_anual = df.groupby(F.col("ano")).agg(sum("total_compras").alias("total_compras_ano")).orderBy(F.col('total_compras_ano').desc())
df_qtd_compras_anual.show()
df_qtd_compras_anual.write.format('csv').save('content/csv/py_df_qtd_compras_anual2',header = True)

+----+-----------------+
| ano|total_compras_ano|
+----+-----------------+
|2013|            16662|
|2012|             8172|
|2014|             6519|
+----+-----------------+



**Mostrando a quantidade de pessoas por grau de escolaridade que realizaram alguma interação:**


In [None]:

df_qtd_compras_escolaridade = df.groupby(F.col("educacao")).count()
df_qtd_compras_escolaridade.show()
df_qtd_compras_escolaridade.write.format('csv').save('content/csv/py_df_qtd_compras_escolaridade2',header = True)

+-------------+-----+
|     educacao|count|
+-------------+-----+
|       basico|   40|
|       mestre|  261|
|    doutorado|  362|
|segundo ciclo|  142|
|    graduacao|  829|
+-------------+-----+



**Média de produtos comprados anualmente**

In [None]:
df_media_anual_produtos = df.groupby(F.col("ano")).mean("total_compras")
df_media_anual_produtos.show()
df_media_anual_produtos.write.format('csv').save('content/csv/py_df_media_anual_produtos2',header = True)

+----+------------------+
| ano|avg(total_compras)|
+----+------------------+
|2012|22.574585635359117|
|2014|15.822815533980583|
|2013| 19.37441860465116|
+----+------------------+



**Mostrando transações que houveram reclamações**

In [None]:
df_reclamacoes = df.filter(F.col("reclamacoes") != 0)
df_reclamacoes.show()

+---------+-----+--------------+-------------+---------------+-------+-------------+-----------------+------------+------+------+--------------+--------------+--------------+-------------+------------------+---------------+---------------------+------------------------+-------------------------+-----------+--------------+-------------+----+
|nmr_linha|   id|ano_nascimento|     educacao|status_conjugal|  renda|criancas_casa|adolescentes_casa|data_cliente|vinhos|frutas|carne_produtos|peixe_produtos|doces_produtos|ouro_produtos|qtd_oferta_compras|qtd_web_compras|qtd_produtos_catalogo|qtd_produtos_armazenados|qtd_web_visitantes_mensal|reclamacoes|total_produtos|total_compras| ano|
+---------+-----+--------------+-------------+---------------+-------+-------------+-----------------+------------+------+------+--------------+--------------+--------------+-------------+------------------+---------------+---------------------+------------------------+-------------------------+-----------+------

**Particionando pelo ano e quantidade de compras**

In [None]:
w0 = Window.partitionBy(F.col("ano")).orderBy("total_compras")
df_wp_number_row = df.withColumn("row_number",F.row_number().over(w0))
df_wp_number_row.write.format('csv').save('content/csv/py_df_wp_number_row2',header = True)

**Fazendo rankeamento por total de compras sem ''lacunas''**

In [None]:
df_rank_total_compras = df.withColumn("rank",F.dense_rank().over(w0))
df_rank_total_compras.write.format('csv').save('content/csv/py_df_rank_total_compras2',header = True)


In [None]:
pwd

'/content'