# Reader & Writer
##### Objetivos
1. Leia de arquivos CSV
1. Leia de arquivos JSON
1. Gravar DataFrame em arquivos
1. Gravar DataFrame nas tabelas
1. Gravar DataFrame em uma tabela Delta

##### Métodos
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#input-and-output" target="_blank">DataFrameReader</a>: `csv`, `json`, `option`, `schema`
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#input-and-output" target="_blank">DataFrameWriter</a>: `mode`, `option`, `parquet`, `format`, `saveAsTable`
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.types.StructType.html#pyspark.sql.types.StructType" target="_blank">StructType</a>: `toDDL`

##### Tipos Spark
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#data-types" target="_blank">Types</a>: `ArrayType`, `DoubleType`, `IntegerType`, `LongType`, `StringType`, `StructType`, `StructField`

## DataFrameReader

Interface usada para carregar um DataFrame de sistemas de armazenamento externos

```
spark.read.parquet("caminho/para/arquivos")
```

DataFrameReader é acessível através do atributo SparkSession `read`. Esta classe inclui métodos para carregar DataFrames de diferentes sistemas de armazenamento externo.

### Ler de arquivos CSV

Leia do CSV com o método `csv` do DataFrameReader e as seguintes opções:

Separador de tabulação, use a primeira linha como cabeçalho, infira o esquema

In [0]:
df1 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/jchambyd@gmail.com/users.csv")

In [0]:
display(df1)

# timestamp : representação numérica de um DataTime


user_id,user_first_touch_timestamp,email
UA000000102357305,1592182691348767,
UA000000102357308,1592183287634953,
UA000000102357309,1592183302736627,
UA000000102357321,1592184604178702,david23@orozco-parker.com
UA000000102357325,1592185154063628,
UA000000102357335,1592186122660210,
UA000000102357338,1592186300091435,
UA000000102357348,1592187663145345,phillipmorgan@hotmail.com
UA000000102357350,1592187732257656,
UA000000102357356,1592188311375015,


In [0]:
usersCsvPath = "dbfs:/FileStore/shared_uploads/jchambyd@gmail.com/users.csv"

usersDF = ( spark.read.format("csv")
    .option("header", True)
    .option("sep", ",")
    .load(usersCsvPath)
)

display(usersDF)

user_id,user_first_touch_timestamp,email
UA000000102357305,1592182691348767,
UA000000102357308,1592183287634953,
UA000000102357309,1592183302736627,
UA000000102357321,1592184604178702,david23@orozco-parker.com
UA000000102357325,1592185154063628,
UA000000102357335,1592186122660210,
UA000000102357338,1592186300091435,
UA000000102357348,1592187663145345,phillipmorgan@hotmail.com
UA000000102357350,1592187732257656,
UA000000102357356,1592188311375015,


In [0]:
usersDF.display()

user_id,user_first_touch_timestamp,email
UA000000102357305,1592182691348767,
UA000000102357308,1592183287634953,
UA000000102357309,1592183302736627,
UA000000102357321,1592184604178702,david23@orozco-parker.com
UA000000102357325,1592185154063628,
UA000000102357335,1592186122660210,
UA000000102357338,1592186300091435,
UA000000102357348,1592187663145345,phillipmorgan@hotmail.com
UA000000102357350,1592187732257656,
UA000000102357356,1592188311375015,


In [0]:
usersDF.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- user_first_touch_timestamp: string (nullable = true)
 |-- email: string (nullable = true)



A API Python do Spark também permite que você especifique as opções DataFrameReader como parâmetros para o método `csv`

In [0]:
#usersDF = ( spark.read.format("csv")
#    .option("header", True)
#    .option("sep", ",")
#    .load(usersCsvPath)
#)

usersDF = spark.read.csv(usersCsvPath, sep=",", header=True, inferSchema=True)

usersDF.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- user_first_touch_timestamp: long (nullable = true)
 |-- email: string (nullable = true)



In [0]:
# int -> long

# int32 -> int64

Defina manualmente o esquema criando um `StructType` com nomes de colunas e tipos de dados e leia dados mais rapidamente

In [0]:
from pyspark.sql.types import LongType, StringType, StructType, StructField

userDefinedSchema = StructType(
    [
        StructField("user_id", StringType()),
        StructField("user_first_touch_timestamp", LongType()),
        StructField("email", StringType()),
    ]
)

Leia de CSV usando este esquema definido pelo usuário em vez de inferir o esquema

In [0]:
usersDF = spark.read.csv(usersCsvPath, sep=",", schema=userDefinedSchema, header=True)

usersDF.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- user_first_touch_timestamp: long (nullable = true)
 |-- email: string (nullable = true)



Como alternativa, defina o esquema usando a sintaxe <a href="https://en.wikipedia.org/wiki/Data_definition_language" target="_blank">linguagem de definição de dados (DDL)</a>.

In [0]:
DDLSchema = "user_id string, user_first_touch_timestamp long, email string"
# CREATE TABLE user (
#    user_id string, 
#    user_first_touch_timestamp long, 
#    email string
#)

usersDF = (
    spark.read.format("csv")
    .option("header", True)
    .option("sep", ",")
    .schema(DDLSchema)
    .load(usersCsvPath)
)

## Tratamento de dados


In [0]:
import pandas as pd 

df_pandas = pd.DataFrame([[1,2], [3,4]], columns=['col1', 'col2'])

display(df_pandas)

col1,col2
1,2
3,4


In [0]:
df_spark = spark.createDataFrame([(1,2), (3,4)], ['col1', 'col2'])

display(df_spark)

col1,col2
1,2
3,4


Databricks visualization. Run in Databricks to view.

In [0]:
df_pandas.describe()

Unnamed: 0,col1,col2
count,2.0,2.0
mean,2.0,3.0
std,1.414214,1.414214
min,1.0,2.0
25%,1.5,2.5
50%,2.0,3.0
75%,2.5,3.5
max,3.0,4.0


In [0]:
#info()
#df_pandas.describe()
#display(df_spark.describe())

display(df_spark.summary())

summary,col1,col2
count,2.0,2.0
mean,2.0,3.0
stddev,1.4142135623730951,1.4142135623730951
min,1.0,2.0
25%,1.0,2.0
50%,1.0,2.0
75%,3.0,4.0
max,3.0,4.0


Databricks visualization. Run in Databricks to view.

In [0]:
df_spark_pandas = df_spark.pandas_api() # interface para usar metodos de pandas

In [0]:
df_spark_pandas.loc[1,:]

Out[24]: col1    3
col2    4
Name: 1, dtype: int64

### LOC / ILOC em pyspark?

In [0]:
mask = (df_pandas['col1'] == 1)


#loc/iloc -> [linhas, colunas]
# df_pandas.loc[ : , ['col2'] ]   # nomes de linhas: indice/index,   nome das colunas
# df_pandas.iloc[ : , [0, 1] ]   # indice numerico da linha/ numero da linha,  indice posicional da coluna

# OBS: .loc aceita filtragem por mascara (condicao)
df_pandas.loc[mask, ['col2']]

# loc e iloc super TOP (pandas) : metodos para seleção/filtragem/acesso de dados do dataframe

Unnamed: 0,col2
0,2


In [0]:
df_spark = spark.createDataFrame([('Jorge', 28, 'SP'), ('Cristina', 24, 'RJ'), ('Mario', 29, 'MG')], 
                                 ['nome', 'idade', 'estado'])


df_pandas = df_spark.toPandas()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 3 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   nome    3 non-null      object
 1   idade   3 non-null      int64 
 2   estado  3 non-null      object
dtypes: int64(1), object(2)
memory usage: 200.0+ bytes


In [0]:
# metodos para filtrar colunas
#     df_pandas[['nome', 'idade']]
#     df_pandas.drop(columns=['nome'])
# Spark: metodo 1
df_spark[['nome', 'idade']].show()
# Spark: metodo 2
df_spark.select(['nome', 'idade']).show()
# Spark: metodo 3
df_spark.drop('estado').show()

+--------+-----+
|    nome|idade|
+--------+-----+
|   Jorge|   28|
|Cristina|   24|
|   Mario|   29|
+--------+-----+

+--------+-----+
|    nome|idade|
+--------+-----+
|   Jorge|   28|
|Cristina|   24|
|   Mario|   29|
+--------+-----+

+--------+-----+
|    nome|idade|
+--------+-----+
|   Jorge|   28|
|Cristina|   24|
|   Mario|   29|
+--------+-----+



In [0]:
mask = (df_pandas['nome'] == "Jorge") | (df_pandas['idade'] == 24)

df_pandas.loc[mask]

Unnamed: 0,nome,idade,estado
0,Jorge,28,SP
1,Cristina,24,RJ


In [0]:
def filter(input):
  a = 19 
  # monte de codigo ....
  return a

def where(input):
  return filter(input)

In [0]:

from pyspark.sql.functions import col, substring

# metodos para filtrar linhas
# Spark: metodo 1 (string com condicionais)
# df_spark.where('idade = 24').show()
df_spark.where('SUBSTR(nome,1, 1) = "M" or idade = 24').show()
# Spark: metodo 2 (condicionais)
mask = (substring(col('nome'),1,1) == "M") | (col('idade') == 24)
df_spark.where(mask).show()
# Spark: metodo 3 (condicionais)
df_spark.filter('SUBSTR(nome,1, 1) = "M" or idade = 24').show() 

+--------+-----+------+
|    nome|idade|estado|
+--------+-----+------+
|Cristina|   24|    RJ|
|   Mario|   29|    MG|
+--------+-----+------+

+--------+-----+------+
|    nome|idade|estado|
+--------+-----+------+
|Cristina|   24|    RJ|
|   Mario|   29|    MG|
+--------+-----+------+

+--------+-----+------+
|    nome|idade|estado|
+--------+-----+------+
|Cristina|   24|    RJ|
|   Mario|   29|    MG|
+--------+-----+------+



In [0]:
from pyspark.sql.functions import lit, concat, when

# Criando colunas novas
# df_pandas['nova_coluna'] = 'A'

df_spark_2 = df_spark.withColumn('nova_coluna', lit('A'))

df_spark_2.withColumn('idade_nova', col('idade') + 10)#.show()


# criar uma coluna que faça concatenação da coluna estado e nome:
# Exemplo: SP-Jorge, RJ-Cristina ...
df_spark_2.withColumn('concat', concat(col('estado'), lit('-'), col('nome')))#.show()


# crie uma coluna nova com valor 'menor' quando a pessoa tem menos de 25 anos,
# e 'maior' quando a pessoa tem 25 ou mais anos

# funcao "when" :  when(condicao, valor).otherwise(outro_valor)

df_spark_2.withColumn('faixa_idade', when( col('idade') < 25, lit('menor')).otherwise(lit('maior'))).show()

df_spark_2.withColumn('faixa_idade', when(df_spark_2.idade < 25, "Menor").otherwise('maior'))#.show()

# Atualize a coluna 'nova_coluna' com o valor 'B' quando a pessoa mora no 'RJ'

# when(condicao_1, valor_1).when(condicao_2, valor_2).otherwise(outro_valor)

# UF?

# "RJ" , "RJN"

df_spark_2.withColumn('nova_coluna', 
                      when(col('estado') == 'RJ', lit("B"))
                      .when(col('estado') == 'SP', lit("C"))
                      .otherwise(col('nova_coluna'))
                      ).show()

# Substituir 'NULL' como texto
df_spark_2.replace( {'NULL': None}, subset=['estado']).show()


+--------+-----+------+-----------+-----------+
|    nome|idade|estado|nova_coluna|faixa_idade|
+--------+-----+------+-----------+-----------+
|   Jorge|   28|    SP|          A|      maior|
|Cristina|   24|    RJ|          A|      menor|
|   Mario|   29|    MG|          A|      maior|
+--------+-----+------+-----------+-----------+

+--------+-----+------+-----------+
|    nome|idade|estado|nova_coluna|
+--------+-----+------+-----------+
|   Jorge|   28|    SP|          C|
|Cristina|   24|    RJ|          B|
|   Mario|   29|    MG|          A|
+--------+-----+------+-----------+

+--------+-----+------+-----------+
|    nome|idade|estado|nova_coluna|
+--------+-----+------+-----------+
|   Jorge|   28|  null|          A|
|Cristina|   24|    RJ|          A|
|   Mario|   29|    MG|          A|
+--------+-----+------+-----------+



### Tamanho DataFrame

In [0]:
df_pandas.shape

Out[126]: (3, 3)

In [0]:
print('Num linhas:', df_spark.count())
print('Num colunas:', len(df_spark.columns))

Num linhas: 3
Num colunas: 3


In [0]:
df_pandas.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 3 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   nome    3 non-null      object
 1   idade   3 non-null      int64 
 2   estado  3 non-null      object
dtypes: int64(1), object(2)
memory usage: 200.0+ bytes


In [0]:
display(df_pandas['nome'])

0       Jorge
1    Cristina
2       Mario
Name: nome, dtype: object

In [0]:
df_pandas['nome'].str.contains('.', regex = False)

df_pandas['nome'].str.contains('\.')

# regex = regular expression
#   .  ->  qualquer caractere 
#   J  ->  J
#  \.  ->  .

Out[150]: 0     True
1    False
2    False
Name: nome, dtype: bool

In [0]:
#   [numeros] .? [numero]

In [0]:
list_valores = [1,2,3,4]

for valor in list_valores:
  if valor < 3:
    print('menor')
  else:
    print(valor)

menor
menor
3
4


In [0]:
df_spark.createOrReplaceTempView('tabela_pessoas')

df_novo = spark.sql('''
SELECT *, 'A' AS col_nova
FROM tabela_pessoas
''')

df_novo.show()

+--------+-----+------+--------+
|    nome|idade|estado|col_nova|
+--------+-----+------+--------+
|   Jorge|   28|    SP|       A|
|Cristina|   24|    RJ|       A|
|   Mario|   29|    MG|       A|
+--------+-----+------+--------+



In [0]:
%sql
SELECT nome, idade
FROM tabela_pessoas
WHERE idade > 20 and SUBSTR(nome, 1, 1) = 'J'
GROUP BY 
ORDER BY 

nome,idade
Jorge,28


## DataFrameWriter
Interface usada para gravar um DataFrame em sistemas de armazenamento externo

```
(df.write                         
  .option("compression", "snappy")
  .mode("overwrite")      
  .parquet(outPath)       
)
```

DataFrameWriter é acessível por meio do atributo SparkSession `write`. Esta classe inclui métodos para gravar DataFrames em diferentes sistemas de armazenamento externo.

### Gravar DataFrames em arquivos

Escreva `usersDF` no parquet com o método `parquet` do DataFrameWriter e as seguintes configurações:

Compressão `snappy`, modo `overwrite`

In [0]:
usersOutputPath = "dbfs:/FileStore/shared_uploads/jchambyd@gmail.com/" + "users.parquet"

(
    usersDF.write
    .mode("overwrite")
    .parquet(usersOutputPath)
)

In [0]:
usersDF2 = spark.read.parquet(usersOutputPath)

display(usersDF2)

In [0]:
display(
    dbutils.fs.ls(usersOutputPath)
)

Assim como o DataFrameReader, a API Python do Spark também permite que você especifique as opções do DataFrameWriter como parâmetros para o método `parquet`

In [0]:
(usersDF.write.parquet(usersOutputPath, compression="snappy", mode="overwrite"))

### Gravar DataFrames em tabelas

Escreva `eventsDF` em uma tabela usando o método DataFrameWriter `saveAsTable`

<img src="https://files.training.databricks.com/images/icon_note_32.png" alt="Note"> Isso cria uma tabela global, ao contrário da visão local criada pelo método DataFrame `createOrReplaceTempView`

In [0]:
usersDF.write.mode('overwrite').saveAsTable("users")

In [0]:
usersDF.createOrReplaceTempView("users1")

In [0]:
%sql

select * from users

## Delta Lake

Em quase todos os casos, a prática recomendada é usar o formato Delta Lake, especialmente sempre que os dados forem referenciados de um espaço de trabalho do Databricks.

<a href="https://delta.io/" target="_blank">Delta Lake</a> é uma tecnologia de código aberto projetada para funcionar com o Spark para trazer confiabilidade aos data lakes.

![delta](https://files.training.databricks.com/images/aspwd/delta_storage_layer.png)

#### Principais recursos do Delta Lake
- Transações ACID
- Handline de metadados escalável
- Streaming unificado e processamento em lote
- Viagem no tempo (versionamento de dados)
- Aplicação e evolução do esquema
- Histórico de auditoria
- Formato parquet
- Compatível com Apache Spark API

### Gravar resultados em uma tabela delta

Escreva `eventsDF` com o método `save` do DataFrameWriter e as seguintes configurações: formato `delta`, modo `overwrite`

In [0]:
usersOutputPath = "dbfs:/FileStore/shared_uploads/jchambyd@gmail.com/" + "/delta/users"

(usersDF.write.format("delta").mode("overwrite").save(usersOutputPath) )

In [0]:
usersDFDelta = (
    spark.read.format("delta")
    .load(usersOutputPath)
)

usersDFDelta.display()

# Laboratório de dados de ingestão

Leia em arquivos CSV contendo dados de produtos.

##### Tarefas
1. Leia com esquema de inferência
2. Leia com esquema definido pelo usuário
3. Leia com esquema como string formatada em DDL
4. Escreva usando o formato Delta