# Descomplicando a análise de grandes bases de dados com PySpark



## O que é PySpark?

Uma API em Python para executar o Apache Spark. Sua linguagem nativa é em Scala.

## Vantagens

- Era Big Data: possibilita a análise de grandes volumes de dados em uma velocidade maior, pela capacidade de processamento e computação paralela.
- Ferramenta open source.
- Possui várias bibliotecas de machine learning.
- Pode ser integrado com pandas e o scikit-learn.

## Materiais de estudo:
- [Documentação PySpark](https://spark.apache.org/docs/latest/api/python/index.html)
- [Curso Udemy](https://www.udemy.com/share/109rKi3@BlHd2R3ilxuCjNjgJopHIwo2PvBkcVWjC3nn5rqzjnPi-kUgD9BWDs0JfIKabpJ_/)
- Livro "Advanced Analytics with PySpark: Patterns for Learning from Data at Scale using python and spark"
- Medium artigos, linkedin...

## Instalação do PySpark aqui no Colab


In [None]:
# 1. Instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
# 2. Configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
# 3. Começar a usar!

#encontrar o spark aqui no ambiente
import findspark
findspark.init()

#criar uma sessão spark aqui no local
from pyspark.sql import SparkSession
sc = SparkSession.builder.master("local[*]").getOrCreate()

## Carregando os dados

Como exemplo estou utilizando os dados sobre preços de roupas, presente no Kaggle: https://www.kaggle.com/datasets/mrsimple07/clothes-price-prediction?resource=download

In [None]:
from pyspark.sql import functions as F

df = sc.read.csv("/content/clothes_price_prediction_data.csv", header = True)

In [None]:
df.show(5, truncate=False)

+------------+--------+-----+----+--------+-----+
|Brand       |Category|Color|Size|Material|Price|
+------------+--------+-----+----+--------+-----+
|New Balance |Dress   |White|XS  |Nylon   |182  |
|New Balance |Jeans   |Black|XS  |Silk    |57   |
|Under Armour|Dress   |Red  |M   |Wool    |127  |
|Nike        |Shoes   |Green|M   |Cotton  |77   |
|Adidas      |Sweater |White|M   |Nylon   |113  |
+------------+--------+-----+----+--------+-----+
only showing top 5 rows



## Comandos iniciais

In [None]:
df.printSchema()

root
 |-- Brand: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- Material: string (nullable = true)
 |-- Price: string (nullable = true)



In [None]:
type(df)

pyspark.sql.dataframe.DataFrame

In [None]:
df.count()

1000

In [None]:
#sc.conf.set('spark.sql.repl.eagerEval.enabled', True)

In [None]:
df

Brand,Category,Color,Size,Material,Price
New Balance,Dress,White,XS,Nylon,182
New Balance,Jeans,Black,XS,Silk,57
Under Armour,Dress,Red,M,Wool,127
Nike,Shoes,Green,M,Cotton,77
Adidas,Sweater,White,M,Nylon,113
Reebok,Jacket,Red,XL,Nylon,19
Puma,Jacket,Red,XXL,Polyester,31
Adidas,Dress,Red,XS,Denim,46
Reebok,Dress,Black,S,Wool,97
Adidas,Jeans,Yellow,L,Wool,80


In [None]:
df.columns

['Brand', 'Category', 'Color', 'Size', 'Material', 'Price']

In [None]:
df_teste = df.select('Brand', 'Size', 'Price')

In [None]:
df_teste.show(10)

+------------+----+-----+
|       Brand|Size|Price|
+------------+----+-----+
| New Balance|  XS|  182|
| New Balance|  XS|   57|
|Under Armour|   M|  127|
|        Nike|   M|   77|
|      Adidas|   M|  113|
|      Reebok|  XL|   19|
|        Puma| XXL|   31|
|      Adidas|  XS|   46|
|      Reebok|   S|   97|
|      Adidas|   L|   80|
+------------+----+-----+
only showing top 10 rows



In [None]:
df.select('Color', 'Size').show()

+------+----+
| Color|Size|
+------+----+
| White|  XS|
| Black|  XS|
|   Red|   M|
| Green|   M|
| White|   M|
|   Red|  XL|
|   Red| XXL|
|   Red|  XS|
| Black|   S|
|Yellow|   L|
| White|  XL|
| White|  XL|
| White|   S|
|  Blue|  XL|
| Green|   S|
| Green| XXL|
|   Red|   S|
|  Blue|  XS|
|Yellow|  XL|
|  Blue|  XL|
+------+----+
only showing top 20 rows



In [None]:
df.select('Brand','Price').describe().show()

+-------+------------+-----------------+
|summary|       Brand|            Price|
+-------+------------+-----------------+
|  count|        1000|             1000|
|   mean|        null|          106.289|
| stddev|        null|53.69544375158548|
|    min|      Adidas|               10|
|    max|Under Armour|               99|
+-------+------------+-----------------+



In [None]:
df.select(F.col('Brand'), F.col('Price')).describe().show()

+-------+------------+-----------------+
|summary|       Brand|            Price|
+-------+------------+-----------------+
|  count|        1000|             1000|
|   mean|        null|          106.289|
| stddev|        null|53.69544375158548|
|    min|      Adidas|               10|
|    max|Under Armour|               99|
+-------+------------+-----------------+



In [None]:
#convertendo para um dataframe em pandas
df_pd = df.toPandas()

In [None]:
df_pd.head()

Unnamed: 0,Brand,Category,Color,Size,Material,Price
0,New Balance,Dress,White,XS,Nylon,182
1,New Balance,Jeans,Black,XS,Silk,57
2,Under Armour,Dress,Red,M,Wool,127
3,Nike,Shoes,Green,M,Cotton,77
4,Adidas,Sweater,White,M,Nylon,113


In [None]:
type(df_pd)

pandas.core.frame.DataFrame

In [None]:
#convertendo para um dataframe spark
df_spark = sc.createDataFrame(df_pd)
df_spark.show()

  for column, series in pdf.iteritems():


+------------+--------+------+----+---------+-----+
|       Brand|Category| Color|Size| Material|Price|
+------------+--------+------+----+---------+-----+
| New Balance|   Dress| White|  XS|    Nylon|  182|
| New Balance|   Jeans| Black|  XS|     Silk|   57|
|Under Armour|   Dress|   Red|   M|     Wool|  127|
|        Nike|   Shoes| Green|   M|   Cotton|   77|
|      Adidas| Sweater| White|   M|    Nylon|  113|
|      Reebok|  Jacket|   Red|  XL|    Nylon|   19|
|        Puma|  Jacket|   Red| XXL|Polyester|   31|
|      Adidas|   Dress|   Red|  XS|    Denim|   46|
|      Reebok|   Dress| Black|   S|     Wool|   97|
|      Adidas|   Jeans|Yellow|   L|     Wool|   80|
|        Nike|  Jacket| White|  XL|     Silk|   98|
|        Puma|  Jacket| White|  XL|     Silk|  150|
|Under Armour|  Jacket| White|   S|    Nylon|   68|
|Under Armour| T-shirt|  Blue|  XL|Polyester|   49|
| New Balance|  Jacket| Green|   S|    Nylon|   97|
|Under Armour|  Jacket| Green| XXL|     Silk|  184|
| New Balanc

## Começando as análises

In [None]:
#selecionando valores distintos
df.select('Brand').distinct().show()

+------------+
|       Brand|
+------------+
|        Nike|
|        Puma|
|      Reebok|
|Under Armour|
|      Adidas|
| New Balance|
+------------+



In [None]:
df.groupBy('Brand').count().orderBy('count', desc=True).show()

+------------+-----+
|       Brand|count|
+------------+-----+
|      Reebok|  158|
| New Balance|  164|
|        Nike|  165|
|      Adidas|  166|
|        Puma|  168|
|Under Armour|  179|
+------------+-----+



In [None]:
df.groupBy('Category').count().orderBy('count', desc=True).show()

+--------+-----+
|Category|count|
+--------+-----+
| T-shirt|  144|
| Sweater|  160|
|   Dress|  166|
|   Jeans|  167|
|   Shoes|  172|
|  Jacket|  191|
+--------+-----+



In [None]:
df.groupBy('Material').count().orderBy('count', desc=True).show()

+---------+-----+
| Material|count|
+---------+-----+
|    Nylon|  155|
|   Cotton|  162|
|    Denim|  163|
|     Wool|  172|
|     Silk|  173|
|Polyester|  175|
+---------+-----+



In [None]:
df.groupBy('Brand').agg(F.avg(F.col('Price'))).show()

+------------+------------------+
|       Brand|        avg(Price)|
+------------+------------------+
|        Nike| 101.9090909090909|
|        Puma|106.13690476190476|
|      Reebok|106.49367088607595|
|Under Armour| 103.9608938547486|
|      Adidas|104.05421686746988|
| New Balance|115.45731707317073|
+------------+------------------+



In [None]:
df.select(F.mean('Price').alias('Preço médio')).show()

+-----------+
|Preço médio|
+-----------+
|    106.289|
+-----------+



In [None]:
#groupBy
df.groupBy('Size').count().show()

+----+-----+
|Size|count|
+----+-----+
|  XL|  167|
|   M|  157|
|   L|  141|
|  XS|  196|
|   S|  166|
| XXL|  173|
+----+-----+



In [None]:
#filter
df.filter(F.col('Brand') == 'New Balance').show()

+-----------+--------+------+----+---------+-----+
|      Brand|Category| Color|Size| Material|Price|
+-----------+--------+------+----+---------+-----+
|New Balance|   Dress| White|  XS|    Nylon|  182|
|New Balance|   Jeans| Black|  XS|     Silk|   57|
|New Balance|  Jacket| Green|   S|    Nylon|   97|
|New Balance|   Jeans|   Red|   S|   Cotton|   98|
|New Balance|   Jeans| Green|   S|   Cotton|   82|
|New Balance|   Shoes|  Blue|   M|   Cotton|   92|
|New Balance|   Dress|Yellow|  XL|    Nylon|   39|
|New Balance|   Shoes|  Blue|  XL|     Silk|  124|
|New Balance|   Dress|   Red|   L|   Cotton|   44|
|New Balance|  Jacket| White|  XS|     Silk|   27|
|New Balance| Sweater| Black|   L|    Nylon|  142|
|New Balance|  Jacket|   Red|  XL|   Cotton|   75|
|New Balance| T-shirt| Black|   M|Polyester|  112|
|New Balance| T-shirt|Yellow|  XL|     Wool|  152|
|New Balance|   Dress|Yellow|  XS|     Silk|  150|
|New Balance|   Dress| White| XXL|Polyester|   57|
|New Balance|   Shoes|  Blue|  

In [None]:
#where
df.where((F.col('Color') == 'White') & (F.col('Size') == 'XL')).show()

+-----------+--------+-----+----+---------+-----+
|      Brand|Category|Color|Size| Material|Price|
+-----------+--------+-----+----+---------+-----+
|       Nike|  Jacket|White|  XL|     Silk|   98|
|       Puma|  Jacket|White|  XL|     Silk|  150|
|       Nike|   Shoes|White|  XL|     Wool|  131|
|       Nike| Sweater|White|  XL|     Silk|   68|
|       Puma|  Jacket|White|  XL|   Cotton|  199|
|     Reebok| T-shirt|White|  XL|     Silk|  111|
|     Reebok| Sweater|White|  XL|    Denim|   83|
|       Puma|   Dress|White|  XL|     Wool|  126|
|       Nike|  Jacket|White|  XL|    Nylon|  146|
|       Puma| Sweater|White|  XL|Polyester|   78|
|     Adidas| Sweater|White|  XL|    Denim|   86|
|New Balance| T-shirt|White|  XL|     Silk|  143|
|       Nike| T-shirt|White|  XL|    Denim|  110|
|       Nike| T-shirt|White|  XL|    Nylon|  171|
|       Puma|   Dress|White|  XL|Polyester|  128|
|     Reebok|  Jacket|White|  XL|     Wool|   58|
|     Reebok| Sweater|White|  XL|   Cotton|  164|


In [None]:
#buscas específicas
df.select('Brand', 'Category', 'Size').where(((F.col('Size') == 'XXL') | (F.col('Size') == 'L')) & (F.col('Brand') == 'Under Armour')).show()

+------------+--------+----+
|       Brand|Category|Size|
+------------+--------+----+
|Under Armour|  Jacket| XXL|
|Under Armour| T-shirt| XXL|
|Under Armour|  Jacket| XXL|
|Under Armour| Sweater|   L|
|Under Armour|  Jacket| XXL|
|Under Armour|  Jacket| XXL|
|Under Armour|   Jeans| XXL|
|Under Armour| T-shirt| XXL|
|Under Armour|   Dress|   L|
|Under Armour|  Jacket| XXL|
|Under Armour|   Dress|   L|
|Under Armour|  Jacket| XXL|
|Under Armour|   Shoes|   L|
|Under Armour| Sweater| XXL|
|Under Armour|   Jeans| XXL|
|Under Armour| T-shirt|   L|
|Under Armour|  Jacket|   L|
|Under Armour|  Jacket|   L|
|Under Armour|   Shoes| XXL|
|Under Armour|   Dress|   L|
+------------+--------+----+
only showing top 20 rows



In [None]:
df.select('Brand', 'Category', 'Color', 'Size').where((F.col('Color') == 'White') & (F.col('Size') == 'XL')).show(truncate=False)

+-----------+--------+-----+----+
|Brand      |Category|Color|Size|
+-----------+--------+-----+----+
|Nike       |Jacket  |White|XL  |
|Puma       |Jacket  |White|XL  |
|Nike       |Shoes   |White|XL  |
|Nike       |Sweater |White|XL  |
|Puma       |Jacket  |White|XL  |
|Reebok     |T-shirt |White|XL  |
|Reebok     |Sweater |White|XL  |
|Puma       |Dress   |White|XL  |
|Nike       |Jacket  |White|XL  |
|Puma       |Sweater |White|XL  |
|Adidas     |Sweater |White|XL  |
|New Balance|T-shirt |White|XL  |
|Nike       |T-shirt |White|XL  |
|Nike       |T-shirt |White|XL  |
|Puma       |Dress   |White|XL  |
|Reebok     |Jacket  |White|XL  |
|Reebok     |Sweater |White|XL  |
|Adidas     |Jeans   |White|XL  |
|Adidas     |Sweater |White|XL  |
|New Balance|Shoes   |White|XL  |
+-----------+--------+-----+----+



In [None]:
#is in
df.select('Brand','Category', 'Size', 'Material','Color').filter(F.col('Material').isin(['Cotton'])).show()

+------------+--------+----+--------+------+
|       Brand|Category|Size|Material| Color|
+------------+--------+----+--------+------+
|        Nike|   Shoes|   M|  Cotton| Green|
| New Balance|   Jeans|   S|  Cotton|   Red|
|        Puma| Sweater|   L|  Cotton|  Blue|
| New Balance|   Jeans|   S|  Cotton| Green|
| New Balance|   Shoes|   M|  Cotton|  Blue|
|        Puma|   Shoes|   M|  Cotton| Black|
| New Balance|   Dress|   L|  Cotton|   Red|
|Under Armour|  Jacket| XXL|  Cotton| Black|
|        Puma|   Shoes|  XL|  Cotton|Yellow|
|      Adidas|   Shoes| XXL|  Cotton| Green|
|Under Armour|  Jacket| XXL|  Cotton|Yellow|
| New Balance|  Jacket|  XL|  Cotton|   Red|
|Under Armour|  Jacket|   M|  Cotton| White|
|Under Armour|   Dress|   S|  Cotton| Black|
|        Nike|   Shoes|   L|  Cotton| White|
|      Reebok| Sweater|  XS|  Cotton| White|
|Under Armour|   Jeans|  XL|  Cotton|Yellow|
|Under Armour|   Dress|   L|  Cotton|  Blue|
|Under Armour| Sweater|  XS|  Cotton|  Blue|
|        N

In [None]:
#like
df.select('Brand', 'Size', 'Color').where((F.col('Color')).like('%Blue%')).show()

+------------+----+-----+
|       Brand|Size|Color|
+------------+----+-----+
|Under Armour|  XL| Blue|
|      Reebok|  XS| Blue|
|        Puma|  XL| Blue|
|        Puma|   L| Blue|
| New Balance|   M| Blue|
|      Reebok|   M| Blue|
| New Balance|  XL| Blue|
|      Adidas| XXL| Blue|
|        Puma| XXL| Blue|
|Under Armour| XXL| Blue|
|Under Armour|   L| Blue|
| New Balance|   S| Blue|
|      Reebok|   L| Blue|
|Under Armour|  XS| Blue|
|        Puma|   S| Blue|
|        Puma|   L| Blue|
|        Nike| XXL| Blue|
|      Reebok|  XS| Blue|
| New Balance|  XS| Blue|
|Under Armour| XXL| Blue|
+------------+----+-----+
only showing top 20 rows



In [None]:
#between
df.select('Price', 'Brand', 'Material').where(F.col('Price').between(100,300)).show()

+-----+------------+---------+
|Price|       Brand| Material|
+-----+------------+---------+
|  182| New Balance|    Nylon|
|  127|Under Armour|     Wool|
|  113|      Adidas|    Nylon|
|  150|        Puma|     Silk|
|  184|Under Armour|     Silk|
|  175|      Adidas|    Denim|
|  158|        Puma|     Wool|
|  125|        Puma|    Denim|
|  185|      Adidas|Polyester|
|  109|      Adidas|    Denim|
|  187|      Reebok|    Denim|
|  157|      Reebok|     Wool|
|  157|      Reebok|    Nylon|
|  152|        Puma|   Cotton|
|  177|Under Armour|Polyester|
|  195|Under Armour|    Nylon|
|  137|        Nike|     Silk|
|  161|      Adidas|    Nylon|
|  173|      Adidas|    Nylon|
|  124| New Balance|     Silk|
+-----+------------+---------+
only showing top 20 rows



## Manipulação de dados

In [None]:
#renomear colunas
df = df.withColumnRenamed('Category', 'categoria')
df.show(5)

+------------+---------+-----+----+--------+-----+
|       Brand|categoria|Color|Size|Material|Price|
+------------+---------+-----+----+--------+-----+
| New Balance|    Dress|White|  XS|   Nylon|  182|
| New Balance|    Jeans|Black|  XS|    Silk|   57|
|Under Armour|    Dress|  Red|   M|    Wool|  127|
|        Nike|    Shoes|Green|   M|  Cotton|   77|
|      Adidas|  Sweater|White|   M|   Nylon|  113|
+------------+---------+-----+----+--------+-----+
only showing top 5 rows



In [None]:
#outra forma
novo_nome = ['marca', 'categoria', 'cor', 'tamanho', 'material', 'preco']
novo_df = df.toDF(*novo_nome)
novo_df.show()

+------------+---------+------+-------+---------+-----+
|       marca|categoria|   cor|tamanho| material|preco|
+------------+---------+------+-------+---------+-----+
| New Balance|    Dress| White|     XS|    Nylon|  182|
| New Balance|    Jeans| Black|     XS|     Silk|   57|
|Under Armour|    Dress|   Red|      M|     Wool|  127|
|        Nike|    Shoes| Green|      M|   Cotton|   77|
|      Adidas|  Sweater| White|      M|    Nylon|  113|
|      Reebok|   Jacket|   Red|     XL|    Nylon|   19|
|        Puma|   Jacket|   Red|    XXL|Polyester|   31|
|      Adidas|    Dress|   Red|     XS|    Denim|   46|
|      Reebok|    Dress| Black|      S|     Wool|   97|
|      Adidas|    Jeans|Yellow|      L|     Wool|   80|
|        Nike|   Jacket| White|     XL|     Silk|   98|
|        Puma|   Jacket| White|     XL|     Silk|  150|
|Under Armour|   Jacket| White|      S|    Nylon|   68|
|Under Armour|  T-shirt|  Blue|     XL|Polyester|   49|
| New Balance|   Jacket| Green|      S|    Nylon

In [None]:
#criar coluna nova com condição
df.withColumn('Flag_preco', F.when((F.col('Price') > 100), 'CARO').otherwise(F.lit('BARATO'))).show(10)

+------------+---------+------+----+---------+-----+----------+
|       Brand|categoria| Color|Size| Material|Price|Flag_preco|
+------------+---------+------+----+---------+-----+----------+
| New Balance|    Dress| White|  XS|    Nylon|  182|      CARO|
| New Balance|    Jeans| Black|  XS|     Silk|   57|    BARATO|
|Under Armour|    Dress|   Red|   M|     Wool|  127|      CARO|
|        Nike|    Shoes| Green|   M|   Cotton|   77|    BARATO|
|      Adidas|  Sweater| White|   M|    Nylon|  113|      CARO|
|      Reebok|   Jacket|   Red|  XL|    Nylon|   19|    BARATO|
|        Puma|   Jacket|   Red| XXL|Polyester|   31|    BARATO|
|      Adidas|    Dress|   Red|  XS|    Denim|   46|    BARATO|
|      Reebok|    Dress| Black|   S|     Wool|   97|    BARATO|
|      Adidas|    Jeans|Yellow|   L|     Wool|   80|    BARATO|
+------------+---------+------+----+---------+-----+----------+
only showing top 10 rows



In [None]:
#filtrando nulos
df_limpo = df.dropna()

In [None]:
df_limpo.show()

+------------+---------+------+----+---------+-----+
|       Brand|categoria| Color|Size| Material|Price|
+------------+---------+------+----+---------+-----+
| New Balance|    Dress| White|  XS|    Nylon|  182|
| New Balance|    Jeans| Black|  XS|     Silk|   57|
|Under Armour|    Dress|   Red|   M|     Wool|  127|
|        Nike|    Shoes| Green|   M|   Cotton|   77|
|      Adidas|  Sweater| White|   M|    Nylon|  113|
|      Reebok|   Jacket|   Red|  XL|    Nylon|   19|
|        Puma|   Jacket|   Red| XXL|Polyester|   31|
|      Adidas|    Dress|   Red|  XS|    Denim|   46|
|      Reebok|    Dress| Black|   S|     Wool|   97|
|      Adidas|    Jeans|Yellow|   L|     Wool|   80|
|        Nike|   Jacket| White|  XL|     Silk|   98|
|        Puma|   Jacket| White|  XL|     Silk|  150|
|Under Armour|   Jacket| White|   S|    Nylon|   68|
|Under Armour|  T-shirt|  Blue|  XL|Polyester|   49|
| New Balance|   Jacket| Green|   S|    Nylon|   97|
|Under Armour|   Jacket| Green| XXL|     Silk|

In [None]:
#outra opção
df_limpo = df.filter(df.Brand.isNotNull())

In [None]:
df.fillna({'Brand': 'sem marca', 'Price': 106}).show()

In [None]:
#removendo duplicados
df_limpo2 = df.dropDuplicates()

# Modelagem