In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 51 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 64.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=fc30289c671ad077628c09c71d17b51207d120e40455059ce9adc30e1e56a7da
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [6]:
!pip install findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [2]:
# rodar o google drive no colab, puxa todo conteúdo do drive da pessoa
from google.colab import drive 
drive.mount('/content/drive') 

Mounted at /content/drive


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

Setup

In [7]:
import findspark

findspark.init()

In [8]:
spark = SparkSession.builder.getOrCreate()

In [9]:
df = spark.createDataFrame([('Pedro', '4'), ('João', '5')], schema = 'nome STRING, id STRING')

In [10]:
df.show()

+-----+---+
| nome| id|
+-----+---+
|Pedro|  4|
| João|  5|
+-----+---+



acessando os tipos do spark

In [11]:
from pyspark.sql.types import *

In [12]:
int_type = IntegerType()

In [13]:
array_type = ArrayType(IntegerType())

convertendo os tipos de colunas

In [14]:
df.show()

+-----+---+
| nome| id|
+-----+---+
|Pedro|  4|
| João|  5|
+-----+---+



In [16]:
df.dtypes

[('nome', 'string'), ('id', 'string')]

In [17]:
df.select('nome', col('id').cast(IntegerType())).dtypes

[('nome', 'string'), ('id', 'int')]

aula de schemas

In [22]:
df = spark.createDataFrame([('Emily', 4), ('Luna', 5), ('Cinthia', 2), ('Spike', 1)])

In [23]:
df.show()

+-------+---+
|     _1| _2|
+-------+---+
|  Emily|  4|
|   Luna|  5|
|Cinthia|  2|
|  Spike|  1|
+-------+---+



In [21]:
df.dtypes

[('_1', 'string'), ('_2', 'bigint')]

In [24]:
df = spark.createDataFrame([('Emily', 4), ('Luna', 5), ('Cinthia', 2), ('Spike', 1)], schema = ['nome', 'id'])

In [25]:
df.show()

+-------+---+
|   nome| id|
+-------+---+
|  Emily|  4|
|   Luna|  5|
|Cinthia|  2|
|  Spike|  1|
+-------+---+



In [26]:
df.dtypes

[('nome', 'string'), ('id', 'bigint')]

Criando schemas programaticamente

In [29]:
schema = StructType([
    StructField('nome', StringType()),
    StructField('id', IntegerType())
    ])

In [30]:
df = spark.createDataFrame([('Emily', 4), ('Luna', 5), ('Cinthia', 2), ('Spike', 1)], schema = schema)

In [31]:
df.dtypes

[('nome', 'string'), ('id', 'int')]

criando schemas com DDL

In [32]:
schema = 'nome STRING, id INT'

In [33]:
df = spark.createDataFrame([('Emily', 4), ('Luna', 5), ('Cinthia', 2), ('Spike', 1)], schema = schema)

In [34]:
df.dtypes

[('nome', 'string'), ('id', 'int')]

In [35]:
df.show()

+-------+---+
|   nome| id|
+-------+---+
|  Emily|  4|
|   Luna|  5|
|Cinthia|  2|
|  Spike|  1|
+-------+---+



criando dataframes

In [36]:
data = [('Emily', 4), ('Luna', 5), ('Cinthia', 2), ('Spike', 1)]

In [37]:
df = spark.createDataFrame(data)

In [38]:
df.dtypes

[('_1', 'string'), ('_2', 'bigint')]

In [39]:
df.schema

StructType([StructField('_1', StringType(), True), StructField('_2', LongType(), True)])

In [40]:
df.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)



In [41]:
spark.range(100).show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+
only showing top 20 rows



### Leitura e escrita de dataframes

In [None]:
data_path = 'C:/projects/aula'

In [None]:
file_path = 'C:/projects/aula/dados.csv'

### Dataframe reader

In [None]:
spark.read.format(formato).option(args).load(file/path)

Dataframe writer

In [None]:
df.write.format(format).option(args).save(file/path)

### Lendo e escrevendo CSV

Opções mais comuns: 
- header
- inferSchema
- sep
- encoding

In [None]:
df = spark.read.format('csv').load(file_path)

In [None]:
df.limit(5).show()

#### definindo schema

In [None]:
schema = 'cod_cnae STRING, descricao STRING'

opção 1 de correção

In [None]:
# encoding latino para corrigir acentos e cedilha
df = spark.read.csv(file_path, sep=';', encoding = 'ISO-8859-1', schema=schema)
df.limit(15).show()

opção de correção 2

In [None]:
df = (spark.read
      .format('csv')
      .option('sep', ';')
      .option('encoding', 'ISO-8859-1')
      .schema(schema)
      .load(file_path)
      )
df.limit(5).show()

opção correção 3

In [None]:
df = (spark.read
      .format('csv')
      .option(sep=';', encoding='ISO-8859-1' )
      .schema(schema)
      .load(file_path)
      )
df.limit(5).show()

obs: utilizando o metodo options podemos parametrizar melhor nossa função usando um dicionario

In [None]:
options_dict = {
    'sep': ';',
    'encoding': 'ISO-8859-1',
}

df = (spark.read
      .format('csv')
      .option(**options_dict)
      .schema(schema)
      .load(file_path)
      )

In [None]:
df.printSchema()

In [None]:
df.write.format('csv').save(data_path + 'df_cnae_teste', header=True)

In [None]:
spark.read.format('csv').load(data_path + 'df_cnae', header=True).printSchema()

Lendo e escrevendo em JSON

In [None]:
df.write.format('json').save(data_path + 'df_cnae.json')

In [None]:
df_json = spark.read.format('json').load(data_path + 'df_cnae.json')

In [None]:
df.show()

In [None]:
df_json.printSchema()

Lendo e escrevendo em formato ORC

In [None]:
formato = 'ord'
df.write.format(formato).save(data_path + 'df_cnae' + formato)

In [None]:
df_orc = spark.read.format('orc').load(data_path + 'df_cnae.ord').show()

In [None]:
df_orc.printSchema()

Lendo e escrevendo em Parquet 

- formato padrão do spark
- preserva metadados
- suporte para dados estruturados
- consegue armazenar tipos complexos
- compressão de dados
- otimizado para trabalhar com volumes gigantesco de dados
- integração com ferramentas de cloud como AWS Athena, Amazon Redshift, Google BigQuery e google Dataproc

In [None]:
df.write.format('parquet').save(data_path + 'df_cnae')

In [None]:
df_parquet = spark.read.format('parquet').load(data_path + 'df_cnae')

In [None]:
df_parquet.printSchema()