<a href="https://colab.research.google.com/github/LaisST/FIAP_202501_HandsOn_data_analytics/blob/main/BigData_Aula2_OperacoesBasicas.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Fase 3 - Aula 2
# Operações Básicas no Apche Spark

## Configuração de Ambiente

In [1]:
import os

In [2]:
! pip install -q findspark

In [3]:
import findspark

Apache Spark 3.5.7
https://dlcdn.apache.org/spark/spark-3.5.7/spark-3.5.7-bin-hadoop3.tgz

In [4]:
# Instalar o Java
! apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [5]:
# Baixar, descompactar e configurar o Apache Spark

! wget -q https://dlcdn.apache.org/spark/spark-3.5.7/spark-3.5.7-bin-hadoop3.tgz

In [6]:
! tar xf spark-3.5.7-bin-hadoop3.tgz

In [7]:
! rm -rf spark-3.5.7-bin-hadoop3.tgz

In [8]:
# COnfigurar as variaveis de ambiente
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_HOME'] = '/content/spark-3.5.7-bin-hadoop3'

In [12]:
# Inicializar o Spark e Criar uma sessão

findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').appName('PySpark DataFrame From RDD').getOrCreate()


In [13]:
from pyspark import SparkContext

In [17]:
sc = SparkContext.getOrCreate()

In [10]:
# Teste
df = spark.sql('''select 'spark' as hello''')
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



## Importação das Bibliotecas

In [14]:
# Importar bibliotecas spark
from pyspark.sql import Row, DataFrame
from pyspark.sql.types import StringType, StringType, StructField, IntegralType
from pyspark.sql.functions import col, expr, lit, substring, concat, concat_ws, when, coalesce
from pyspark.sql import functions as F
from functools import reduce


## Criando PySpark DataFrame para Existing RDD

In [19]:
rdd = sc.parallelize([('C',85,76,87,91), ('B',85,76,87,91), ("A", 85,78,96,92), ("A", 92,76,89,96)], 4)

In [20]:
print(type(rdd))

<class 'pyspark.rdd.RDD'>


In [21]:
sub = ['Division','English','Mathematics','Physics','Chemistry']
marks_df = spark.createDataFrame(rdd, schema=sub)

In [22]:
print(type(marks_df))

<class 'pyspark.sql.dataframe.DataFrame'>


In [23]:
marks_df.printSchema()

root
 |-- Division: string (nullable = true)
 |-- English: long (nullable = true)
 |-- Mathematics: long (nullable = true)
 |-- Physics: long (nullable = true)
 |-- Chemistry: long (nullable = true)



In [24]:
marks_df.show()

+--------+-------+-----------+-------+---------+
|Division|English|Mathematics|Physics|Chemistry|
+--------+-------+-----------+-------+---------+
|       C|     85|         76|     87|       91|
|       B|     85|         76|     87|       91|
|       A|     85|         78|     96|       92|
|       A|     92|         76|     89|       96|
+--------+-------+-----------+-------+---------+



## Spark para Pandas



In [30]:
csv_file = spark.read.csv('/content/cereal.csv', sep = ',', inferSchema = True, header = True)

In [28]:
df = csv_file.toPandas()

In [29]:
type(df)

## Ler o Schema

Define a estrutura do DataFrame, especificando os nomes, tipos e formatos das colunas. Exemplo: StructType([StructField("nome", StringType(), True)])

In [32]:
csv_file.printSchema()

root
 |-- name: string (nullable = true)
 |-- mfr: string (nullable = true)
 |-- type: string (nullable = true)
 |-- calories: integer (nullable = true)
 |-- protein: integer (nullable = true)
 |-- fat: integer (nullable = true)
 |-- sodium: integer (nullable = true)
 |-- fiber: double (nullable = true)
 |-- carbo: double (nullable = true)
 |-- sugars: integer (nullable = true)
 |-- potass: integer (nullable = true)
 |-- vitamins: integer (nullable = true)
 |-- shelf: integer (nullable = true)
 |-- weight: double (nullable = true)
 |-- cups: double (nullable = true)
 |-- rating: double (nullable = true)



In [33]:
df = csv_file

### SELECT()


Seleciona colunas específicas de um DataFrame, podendo também aplicar expressões.

Exemplo: df.select("nome", "idade")

In [34]:
df.select('name', 'mfr', 'rating').show()

+--------------------+---+---------+
|                name|mfr|   rating|
+--------------------+---+---------+
|           100% Bran|  N|68.402973|
|   100% Natural Bran|  Q|33.983679|
|            All-Bran|  K|59.425505|
|All-Bran with Ext...|  K|93.704912|
|      Almond Delight|  R|34.384843|
|Apple Cinnamon Ch...|  G|29.509541|
|         Apple Jacks|  K|33.174094|
|             Basic 4|  G|37.038562|
|           Bran Chex|  R|49.120253|
|         Bran Flakes|  P|53.313813|
|        Cap'n'Crunch|  Q|18.042851|
|            Cheerios|  G|50.764999|
|Cinnamon Toast Cr...|  G|19.823573|
|            Clusters|  G|40.400208|
|         Cocoa Puffs|  G|22.736446|
|           Corn Chex|  R|41.445019|
|         Corn Flakes|  K|45.863324|
|           Corn Pops|  K|35.782791|
|       Count Chocula|  G|22.396513|
|  Cracklin' Oat Bran|  K|40.448772|
+--------------------+---+---------+
only showing top 20 rows



### WithColumn()

Cria uma nova coluna ou modifica uma existente, aplicando transformações.

Exemplo: df.withColumn("idade_mais_10", df["idade"] + 10)

In [35]:
df.withColumn("Calories",df['calories'].cast("Integer")).printSchema()

root
 |-- name: string (nullable = true)
 |-- mfr: string (nullable = true)
 |-- type: string (nullable = true)
 |-- Calories: integer (nullable = true)
 |-- protein: integer (nullable = true)
 |-- fat: integer (nullable = true)
 |-- sodium: integer (nullable = true)
 |-- fiber: double (nullable = true)
 |-- carbo: double (nullable = true)
 |-- sugars: integer (nullable = true)
 |-- potass: integer (nullable = true)
 |-- vitamins: integer (nullable = true)
 |-- shelf: integer (nullable = true)
 |-- weight: double (nullable = true)
 |-- cups: double (nullable = true)
 |-- rating: double (nullable = true)



### groupBy()

Agrupa os dados por uma ou mais colunas, geralmente usado com funções agregadas.

Exemplo: df.groupBy("cidade").count()

In [36]:
df.groupBy("calories").count().show()

+--------+-----+
|calories|count|
+--------+-----+
|     140|    3|
|     120|   10|
|     100|   17|
|     130|    2|
|      50|    3|
|      80|    1|
|     160|    1|
|      70|    2|
|      90|    7|
|     110|   29|
|     150|    2|
+--------+-----+



### orderBy()

Ordena o DataFrame com base em uma ou mais colunas, em ordem crescente ou decrescente.

Exemplo: df.orderBy("idade", ascending=False)

In [37]:
df.orderBy("calories").show(50)

+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|                name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|   rating|
+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|All-Bran with Ext...|  K|   C|      50|      4|  0|   140| 14.0|  8.0|     0|   330|      25|    3|   1.0| 0.5|93.704912|
|         Puffed Rice|  Q|   C|      50|      1|  0|     0|  0.0| 13.0|     0|    15|       0|    3|   0.5| 1.0|60.756112|
|        Puffed Wheat|  Q|   C|      50|      2|  0|     0|  1.0| 10.0|     0|    50|       0|    3|   0.5| 1.0|63.005645|
|           100% Bran|  N|   C|      70|      4|  1|   130| 10.0|  5.0|     6|   280|      25|    3|   1.0|0.33|68.402973|
|            All-Bran|  K|   C|      70|      4|  1|   260|  9.0|  7.0|     5|   320|      25|    3|   1.0|0.33|59.425505|
|      Shredded 

### Case When()

Cria lógica condicional dentro de expressões, semelhante ao if-else.

In [38]:
from pyspark.sql.functions import when

In [39]:
df.select("name", df.vitamins, when(df.vitamins >= "25", "rich in vitamins")).show(50)

+--------------------+--------+----------------------------------------------------+
|                name|vitamins|CASE WHEN (vitamins >= 25) THEN rich in vitamins END|
+--------------------+--------+----------------------------------------------------+
|           100% Bran|      25|                                    rich in vitamins|
|   100% Natural Bran|       0|                                                NULL|
|            All-Bran|      25|                                    rich in vitamins|
|All-Bran with Ext...|      25|                                    rich in vitamins|
|      Almond Delight|      25|                                    rich in vitamins|
|Apple Cinnamon Ch...|      25|                                    rich in vitamins|
|         Apple Jacks|      25|                                    rich in vitamins|
|             Basic 4|      25|                                    rich in vitamins|
|           Bran Chex|      25|                                  

### filter()

Filtra linhas com base em uma condição booleana.

Exemplo: df.filter(df["idade"] > 18)

In [40]:
df.filter(df.calories >= "100").show(50)

+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|                name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|   rating|
+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|   100% Natural Bran|  Q|   C|     120|      3|  5|    15|  2.0|  8.0|     8|   135|       0|    3|   1.0| 1.0|33.983679|
|      Almond Delight|  R|   C|     110|      2|  2|   200|  1.0| 14.0|     8|    -1|      25|    3|   1.0|0.75|34.384843|
|Apple Cinnamon Ch...|  G|   C|     110|      2|  2|   180|  1.5| 10.5|    10|    70|      25|    1|   1.0|0.75|29.509541|
|         Apple Jacks|  K|   C|     110|      2|  0|   125|  1.0| 11.0|    14|    30|      25|    2|   1.0| 1.0|33.174094|
|             Basic 4|  G|   C|     130|      3|  2|   210|  2.0| 18.0|     8|   100|      25|    3|  1.33|0.75|37.038562|
|        Cap'n'C

### isNull()/isNotNull()

Verifica se os valores de uma coluna são nulos ou não nulos. Exemplo:

    df.filter(df["email"].isNull())

    df.filter(df["email"].isNotNull())

In [41]:
from pyspark.sql.functions import *

In [42]:
df.filter(df.name.isNotNull()).show()

+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|                name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|   rating|
+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|           100% Bran|  N|   C|      70|      4|  1|   130| 10.0|  5.0|     6|   280|      25|    3|   1.0|0.33|68.402973|
|   100% Natural Bran|  Q|   C|     120|      3|  5|    15|  2.0|  8.0|     8|   135|       0|    3|   1.0| 1.0|33.983679|
|            All-Bran|  K|   C|      70|      4|  1|   260|  9.0|  7.0|     5|   320|      25|    3|   1.0|0.33|59.425505|
|All-Bran with Ext...|  K|   C|      50|      4|  0|   140| 14.0|  8.0|     0|   330|      25|    3|   1.0| 0.5|93.704912|
|      Almond Delight|  R|   C|     110|      2|  2|   200|  1.0| 14.0|     8|    -1|      25|    3|   1.0|0.75|34.384843|
|Apple Cinnamon 

In [43]:
df.filter(df.name.isNull()).show()

+----+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+------+
|name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|rating|
+----+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+------+
+----+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+------+

