<a href="https://colab.research.google.com/github/Vinibianchi/Spark/blob/main/Spark_Tratamento_Dados_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 50.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=161bd7f34361edcb48b47911c35bbf0582771ba68b6ce3b085f1128115fe6654
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


# Importando a biblioteca

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [None]:
# Instanciando Sessao Spark

spark = (SparkSession
         .builder
         .getOrCreate())


## Mudando Tipo de Colunas

In [None]:
from pyspark.sql.functions import col


df = spark.createDataFrame([('Vinicius', '27'), ('Larissa', '24')], schema = ['nome','idade'])
df.select('nome', col('idade').cast('int'))

# ou (para tipos complexos)

from pyspark.sql.types import IntegerType

df.select('nome', col('idade').cast(IntegerType()))

DataFrame[nome: string, idade: int]

In [None]:
df.show()

+--------+-----+
|    nome|idade|
+--------+-----+
|Vinicius|   27|
| Larissa|   24|
+--------+-----+



# Definição de schema

In [None]:
from pyspark.sql.types import *

# Cria Schema
schema = StructType([
                     StructField('nome',StringType()),
                     StructField('idade', IntegerType())])

In [None]:
# Segunda forma

schema = 'nome STRING, idade INT'

# Criando DataFrame Spark

In [None]:
import numpy as np

nomes = ['Vinicius','Valéria','Larissa','Lucas']
notas = [int(v) for v in np.random.randint(1,11,4)]

In [None]:
# Criamos originalmente por uma lista de tuplas

data = [(nome,nota) for nome,nota in zip(nomes,notas)]
schema = 'nome STRING, nota INT'
df_custom = spark.createDataFrame(data=data,
                                  schema = schema)
df_custom.show()

+--------+----+
|    nome|nota|
+--------+----+
|Vinicius|   1|
| Valéria|   8|
| Larissa|   3|
|   Lucas|  10|
+--------+----+



In [None]:
spark.range(10).show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



# Leitura de Arquivos  
__Há duas formas de leitura de arquivos de dados.
A Primeira é lendo direto do tipo de arquivo e a segunda permite parametrizar a leitura em termos da fonte de dados utilizada__

In [None]:
# Primeira
## Lendo 

df = spark.read.csv('/content/ratings.tsv', sep = '\t', header =True)

df.show()

+---------+-------------+--------+
|   tconst|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.7|    1858|
|tt0000002|          6.0|     243|
|tt0000003|          6.5|    1630|
|tt0000004|          6.0|     158|
|tt0000005|          6.2|    2455|
|tt0000006|          5.2|     163|
|tt0000007|          5.4|     764|
|tt0000008|          5.5|    2003|
|tt0000009|          5.9|     193|
|tt0000010|          6.9|    6712|
|tt0000011|          5.2|     341|
|tt0000012|          7.5|   11521|
|tt0000013|          5.8|    1779|
|tt0000014|          7.1|    5174|
|tt0000015|          6.2|     982|
|tt0000016|          5.9|    1382|
|tt0000017|          4.6|     303|
|tt0000018|          5.3|     556|
|tt0000019|          5.2|      30|
|tt0000020|          4.9|     328|
+---------+-------------+--------+
only showing top 20 rows



In [None]:
# Python
## Usando option 
df1 = (spark.read.format('csv')
.option('header','True')
.option('infraSchema','True')
.option('sep','\t')
.load('/content/ratings.tsv')
)

## Forma 2 com .options
df2 = (
    spark.read.format('csv')
    .options(header=True,
             inferSchema=True,
             sep = '\t')
    .load('/content/ratings.tsv')
)

## Forma 3
options_dict = {'header':True,
    'inferSchema':True,
    'sep':'\t'}
df3 =(
    spark.read.format('csv')
    .options(**options_dict)
    .load('/content/ratings.tsv')
)

In [None]:
df1.show(5)

+---------+-------------+--------+
|   tconst|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.7|    1858|
|tt0000002|          6.0|     243|
|tt0000003|          6.5|    1630|
|tt0000004|          6.0|     158|
|tt0000005|          6.2|    2455|
+---------+-------------+--------+
only showing top 5 rows



In [None]:
df2.show(5)

+---------+-------------+--------+
|   tconst|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.7|    1858|
|tt0000002|          6.0|     243|
|tt0000003|          6.5|    1630|
|tt0000004|          6.0|     158|
|tt0000005|          6.2|    2455|
+---------+-------------+--------+
only showing top 5 rows



In [None]:
df3.show(5)

+---------+-------------+--------+
|   tconst|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.7|    1858|
|tt0000002|          6.0|     243|
|tt0000003|          6.5|    1630|
|tt0000004|          6.0|     158|
|tt0000005|          6.2|    2455|
+---------+-------------+--------+
only showing top 5 rows



# Escrevendo Dados / Save Dataframe

In [None]:
path = 'caminho do palvamento'
df.write.parquet(path)
df.write.format('csv').load(path)
df.write.format('ocr').save(path) 
# ETC .. 

In [None]:
# Utilizando o Pandas tbm é possível.

path = 'caminho'
df.ToPandas().to_csv(path, sep = ';')

# Operações básicas com DataFrames

## Colunas e Expressões

In [None]:
data = [['Pedro',7],['João', 5],['Barbara', 10]]
schema = 'nome STRING, nota INTEGER'
df =spark.createDataFrame(data=data, schema=schema)
df.show()

+-------+----+
|   nome|nota|
+-------+----+
|  Pedro|   7|
|   João|   5|
|Barbara|  10|
+-------+----+



In [None]:
# Nomenado / Renomeando com select
df.withColumn('nota_plus', col('nota') +6).show()

+-------+----+---------+
|   nome|nota|nota_plus|
+-------+----+---------+
|  Pedro|   7|       13|
|   João|   5|       11|
|Barbara|  10|       16|
+-------+----+---------+



## Seleção de Colunas SELECT

In [None]:
from pyspark.sql.functions import expr, col, round

In [None]:
df.select('nota').show()

+----+
|nota|
+----+
|   7|
|   5|
|  10|
+----+



In [None]:
# Criando Colunas com select

data = [['Pedro',7],['João', 5],['Barbara', 10]]
schema = 'nome STRING, nota INTEGER'
df =spark.createDataFrame(data=data, schema=schema)

df.select('nota', col('nota')+10).show()

+----+-----------+
|nota|(nota + 10)|
+----+-----------+
|   7|         17|
|   5|         15|
|  10|         20|
+----+-----------+



In [None]:
# Renomeando colunas
## Usando a função .alias()

df.select('nome','nota',expr('nota +5').alias('nota_plus_five'))
# ou
df.select('nome', 'nota', (col('nota')+5).alias('nota_plus_five')).show()



+-------+----+--------------+
|   nome|nota|nota_plus_five|
+-------+----+--------------+
|  Pedro|   7|            12|
|   João|   5|            10|
|Barbara|  10|            15|
+-------+----+--------------+



In [None]:
#ou ainda usando sintaxe sql
df.select('nome','nota',expr('nota + 5 as nota_plus_five')).show()

+-------+----+--------------+
|   nome|nota|nota_plus_five|
+-------+----+--------------+
|  Pedro|   7|            12|
|   João|   5|            10|
|Barbara|  10|            15|
+-------+----+--------------+



### Distinct e dropDuplicates()

In [None]:
data = [['Pedro',7],['João', 7],['Barbara', 10]]
schema = 'nome STRING, nota INTEGER'
df =spark.createDataFrame(data=data, schema=schema)
df.show()

+-------+----+
|   nome|nota|
+-------+----+
|  Pedro|   7|
|   João|   7|
|Barbara|  10|
+-------+----+



In [None]:
df.select('nota').distinct().show()

+----+
|nota|
+----+
|   7|
|  10|
+----+



In [None]:
# dropDuplicates()
df.select('nota').dropDuplicates().show()

+----+
|nota|
+----+
|   7|
|  10|
+----+



In [None]:
df.show()

+-------+----+
|   nome|nota|
+-------+----+
|  Pedro|   7|
|   João|   7|
|Barbara|  10|
+-------+----+



## Renomeando as Colunas

### df.withColumnRenamed('nome antigo', 'nome novo')

In [None]:
# Gerando dataset
df = spark.createDataFrame(data = [['João'],['Lucas'],['Lidiane']],
                           schema= 'aluno STRING')

df = df.select('*',expr('round(rand()*10) as nota'), expr('round(rand()*10 + 10) as idade'))
df.show()

+-------+----+-----+
|  aluno|nota|idade|
+-------+----+-----+
|   João| 3.0| 11.0|
|  Lucas| 4.0| 11.0|
|Lidiane| 3.0| 18.0|
+-------+----+-----+



In [None]:
## O método . columns retorna o nome das colunas do dataframe
col_names = df.columns

## Itera sobre as colunas acrescentando um sufixo
for col in col_names:
  if col !='aluno':
    df = df.withColumnRenamed(col,col+'2021')
    #df.show()

df.show()

+-------+--------+---------+
|  aluno|nota2021|idade2021|
+-------+--------+---------+
|   João|     3.0|     11.0|
|  Lucas|     4.0|     11.0|
|Lidiane|     3.0|     18.0|
+-------+--------+---------+



# Filtro de Linhas

In [None]:
df = spark.createDataFrame(data = [['João'],['Lucas'],['Lidiane']],
                           schema= 'aluno STRING')

df = df.select('*',expr('round(rand()*10) as nota'), expr('round(rand()*10 + 10) as idade'))

In [None]:
#df.filter('nota2021 >= 4').show()

# ou 
df.filter(df.nota > 2).show()

+-----+----+-----+
|aluno|nota|idade|
+-----+----+-----+
| João| 7.0| 19.0|
|Lucas| 5.0| 15.0|
+-----+----+-----+



In [None]:
df.show()

+-------+----+-----+
|  aluno|nota|idade|
+-------+----+-----+
|   João| 7.0| 19.0|
|  Lucas| 5.0| 15.0|
|Lidiane| 2.0| 14.0|
+-------+----+-----+



In [None]:
df.filter(df.nota >3 and df.idade <18)

['aluno', 'nota', 'idade']