<a href="https://colab.research.google.com/github/AndersonGabrielCalasans/AnaliseDados-BootCampGeracaoTech-DIO/blob/master/SoulCode/Introdu%C3%A7%C3%A3o_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Instalação da biblioteca PySpark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 43 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 54.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=9b66a9c571be550527c83ddf4295c5b20ab4126b3227f84124070074a946ee08
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [31]:
# Importando a SparkSession
from pyspark.sql import SparkSession

# Import as functions
import pyspark.sql.functions as F

In [3]:
# Configurando a variável de ambiente (sessão) Spark 
spark = (SparkSession.builder
                    .master('local')
                    .appName('Intro-pyspark')
                    .config('spark.ui.port', '4050')
                    .getOrCreate()
)

# master: local de execução
# appname: nome da aplicação
# config: porta local
# getorcreate: retorna ou cria caso não exista

In [4]:
# Testando se iniciou a variável
spark

In [5]:
# Como funciona os DataFrames no Spark
schema = ['id', 'nome', 'cpf', 'celular']

dados = [
    (1, 'Anderson Gabriel', '999.999.999-99','(81)9.9999-9999'),
    (2, 'Ligia Paula', '111.111.111-11','(81)9.1111-1111'),
    (3, 'Tales Paulo', '222.222.222-22', '(82)9.8888-8888'),
    (4, 'Maria das Gracas', '333.333.333-33', '(21)9.8585-8585')
]

# Gerando o DataFrame
df = spark.createDataFrame(dados, schema)

In [8]:
# Mostrando o DataFrame
df.show()

+---+----------------+--------------+---------------+
| id|            nome|           cpf|        celular|
+---+----------------+--------------+---------------+
|  1|Anderson Gabriel|999.999.999-99|(81)9.9999-9999|
|  2|     Ligia Paula|111.111.111-11|(81)9.1111-1111|
|  3|     Tales Paulo|222.222.222-22|(82)9.8888-8888|
|  4|Maria das Gracas|333.333.333-33|(21)9.8585-8585|
+---+----------------+--------------+---------------+



No spark caso você não defina os tipos das variáveis, ele infere por si só.

In [9]:
# Schema do DataFrame
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- nome: string (nullable = true)
 |-- cpf: string (nullable = true)
 |-- celular: string (nullable = true)



In [34]:
schema2 = 'id INT, nome STRING, cpf STRING, celular STRING, cidade STRING'

dados2 = [
    (1, 'Anderson Gabriel', '999.999.999-99','(81)9.9999-9999', 'Recife'),
    (2, 'Ligia Paula', '111.111.111-11','(81)9.1111-1111', 'Jaboatao'),
    (3, 'Tales Paulo', '222.222.222-22', '(82)9.8888-8888', 'Escada'),
    (4, 'Maria das Gracas', '333.333.333-33', '(21)9.8585-8585','Escada')
]

# Gerando o DataFrame
df2 = spark.createDataFrame(dados2, schema2)

Ao realizar a inferência do schema manualmente você ganha em relação ao processamento (pois o spark não necessita percorrer a coluna para verificar o tipo dos dados) e evita a tipagem errada dos mesmo.

In [35]:
# Schema do DataFrame
df2.printSchema()

root
 |-- id: integer (nullable = true)
 |-- nome: string (nullable = true)
 |-- cpf: string (nullable = true)
 |-- celular: string (nullable = true)
 |-- cidade: string (nullable = true)



In [36]:
# verificar quantidade de linhas do DataFrame
df2.count()

4

In [37]:
# Visualizar colunas
df2.columns

['id', 'nome', 'cpf', 'celular', 'cidade']

Os DataFrames são imutáveis. O comando SELECT não altera o DataFrame, apenas mostra em tempo de execução

In [38]:
# 1º transformação dos dados - SELECT
df2.select('nome', 'cpf').show()

+----------------+--------------+
|            nome|           cpf|
+----------------+--------------+
|Anderson Gabriel|999.999.999-99|
|     Ligia Paula|111.111.111-11|
|     Tales Paulo|222.222.222-22|
|Maria das Gracas|333.333.333-33|
+----------------+--------------+



In [39]:
# Outra forma de fazer SELECT (com F.col)
df2.select(F.col('nome'), F.col('cpf')).show()

+----------------+--------------+
|            nome|           cpf|
+----------------+--------------+
|Anderson Gabriel|999.999.999-99|
|     Ligia Paula|111.111.111-11|
|     Tales Paulo|222.222.222-22|
|Maria das Gracas|333.333.333-33|
+----------------+--------------+



In [40]:
df2.show()

+---+----------------+--------------+---------------+--------+
| id|            nome|           cpf|        celular|  cidade|
+---+----------------+--------------+---------------+--------+
|  1|Anderson Gabriel|999.999.999-99|(81)9.9999-9999|  Recife|
|  2|     Ligia Paula|111.111.111-11|(81)9.1111-1111|Jaboatao|
|  3|     Tales Paulo|222.222.222-22|(82)9.8888-8888|  Escada|
|  4|Maria das Gracas|333.333.333-33|(21)9.8585-8585|  Escada|
+---+----------------+--------------+---------------+--------+



In [41]:
# Salvando as alterações em um novo DF
df4 = df2.select('nome', 'cpf')
df4.show()

+----------------+--------------+
|            nome|           cpf|
+----------------+--------------+
|Anderson Gabriel|999.999.999-99|
|     Ligia Paula|111.111.111-11|
|     Tales Paulo|222.222.222-22|
|Maria das Gracas|333.333.333-33|
+----------------+--------------+



In [42]:
# Verificando o tipo do objeto
type(df4)

pyspark.sql.dataframe.DataFrame

In [45]:
# Aplicando um filtro para apenas cidade = Escada
df2.filter(F.col('cidade')=='Escada').show()

+---+----------------+--------------+---------------+------+
| id|            nome|           cpf|        celular|cidade|
+---+----------------+--------------+---------------+------+
|  3|     Tales Paulo|222.222.222-22|(82)9.8888-8888|Escada|
|  4|Maria das Gracas|333.333.333-33|(21)9.8585-8585|Escada|
+---+----------------+--------------+---------------+------+



In [64]:
# Filtrando e mostrando apenas colunas nome e cpf onde cidade = Escada
df2.select(F.col('nome'), F.col('cpf')).filter(F.col('cidade')=='Escada').show()

+----------------+--------------+
|            nome|           cpf|
+----------------+--------------+
|     Tales Paulo|222.222.222-22|
|Maria das Gracas|333.333.333-33|
+----------------+--------------+



In [65]:
# Filtrando e mostrando apenas colunas nome e cpf onde cidade = Escada ou Recife
df2.select(F.col('nome'), F.col('cpf')).filter("cidade='Escada' or cidade='Recife'").show()

+----------------+--------------+
|            nome|           cpf|
+----------------+--------------+
|Anderson Gabriel|999.999.999-99|
|     Tales Paulo|222.222.222-22|
|Maria das Gracas|333.333.333-33|
+----------------+--------------+



In [66]:
# Filtrando e mostrando apenas colunas nome e cpf onde cidade = Escada ou Recife (outra forma com | para 'or' e & para 'and')
df2.select(F.col('nome'), F.col('cpf')).filter((F.col('cidade')=='Escada') | (F.col('cidade')=='Recife')).show()

+----------------+--------------+
|            nome|           cpf|
+----------------+--------------+
|Anderson Gabriel|999.999.999-99|
|     Tales Paulo|222.222.222-22|
|Maria das Gracas|333.333.333-33|
+----------------+--------------+



In [67]:
# Filtrando e mostrando apenas colunas nome e cpf onde cidade = Escada e nome = Tales Paulo (outra forma com | para 'or' e & para 'and')
df2.select(F.col('nome'), F.col('cpf')).filter((F.col('cidade')=='Escada') & (F.col('nome')=='Tales Paulo')).show()

+-----------+--------------+
|       nome|           cpf|
+-----------+--------------+
|Tales Paulo|222.222.222-22|
+-----------+--------------+

