# <font color='blue'>Uni-Facef - Pyspark - Parte 4 </font>

Neste notabook vamos descobrir como funciona o método "readStream()" do DataFrame

In [None]:
!pip install pyspark

In [None]:
# coding: utf-8
import pyspark.sql.types as st
import pyspark.sql.functions as sf
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder \
    .appName('readStream Example') \
    .getOrCreate()

In [None]:
help(spark.readStream)

In [None]:
"""
root
 |-- id_cliente: string (nullable = true)
 |-- nome_cliente: string (nullable = true)
 |-- end_logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- dt_ult_atualizacao: timestamp (nullable = true)
 |-- datalog: string (nullable = false)
"""

SCHEMA = st.StructType([
    st.StructField("id_cliente", st.StringType(), True),
    st.StructField("nome_cliente", st.StringType(), True),
    st.StructField("end_logradouro", st.StringType(), True),
    st.StructField("numero", st.StringType(), True),
    st.StructField("cidade", st.StringType(), True),
    st.StructField("dt_ult_atualizacao", st.TimestampType(), True),
    st.StructField("datalog", st.StringType(), True),
])

In [None]:
df_cliente = spark.readStream \
    .option("delimiter", "|") \
    .csv('cliente_csv', header=True, schema=SCHEMA)

df_cliente.show(5)

#### Vamos criar um processo incremental

Para isso vamos precisar definir uma variável com o caminho/pasta do novo dataset e o caminho/pasta dos metadados do checkpoint do processo.

In [None]:
# Caminho da nossa pasta de trabalho
!pwd

In [None]:
HOME_PATH = "/content"

CHECKPOINT = HOME_PATH + "/checkpoint/cliente_raw/"
DATASET_PATH = HOME_PATH + "/cliente_raw/"

print(CHECKPOINT)
print(DATASET_PATH)

#### Vamos ler os arquivos de origem e criar um novo dataset. 

Com o "readStream" e "writeStream" o processo garante a integridade de processos incrementais, pois os arquivos já processados não serão processados novamente.

In [None]:
# Leitura do Dataset de origem
df_cliente = spark.readStream \
    .option("delimiter", "|") \
    .csv('cliente_csv', header=True, schema=SCHEMA)

# Transformação nos dados
df_cliente = df_cliente \
    .withColumn("datalog", sf.col("dt_ult_atualizacao").cast("date"))

# Escrita 
df_cliente.writeStream.trigger(once=True) \
    .start(
        path=DATASET_PATH,
        format="parquet",
        checkpointLocation=CHECKPOINT,
        partitionBy=["datalog"]).awaitTermination()

#### A pasta do dataset contem agora uma pasta de metadados chamada "_spark_metadata" ao invés de "_SUCCESS"

In [None]:
!ls -l /content/cliente_raw/

#### Pasta criada para armazenar os metadados do checkpoint deste dataset 

In [None]:
!ls -l /content/checkpoint/cliente_raw/

#### Testando a leitura "Full" do dataset e gravando o novo "cliente_duplicado" utilizando o "repartition(1)"

In [None]:
novo_cliente_raw = spark.read.parquet('cliente_raw')

novo_cliente_raw.orderBy("id_cliente").show(10, truncate=False)

In [None]:
# Escreve em formato parquet em apenas 1 arquivo
novo_cliente_raw.repartition(1).write.parquet(
    "cliente_duplicado", 
    mode="overwrite")

Obs: Se estiver utilizando o Google Colab, faça o download do arquivo para utilizá-lo no script "Uni-Facef - Pyspark - Parte 5"

### FIM
###### Documentação: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html