# Exemplo Spark.
Exemplo de dataflow especificado no SPARK instrumentado utilizando o wrapper desenvolvido em python.

## Composição:
Duas atividades de mapeamento que apenas recebem dados de entrada e os jogam na saída.

In [1]:
import uuid
from datetime import datetime
from random import randint
import findspark
findspark.init('/opt/spark-2.2.0-bin-hadoop2.7')
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import DataFrameWriter, Row
import sys
sys.path.append('../')
from PDE import PDE

In [2]:
# Configura o spark
conf = ( SparkConf()
         .setMaster("local[*]")
         .setAppName('pyspark')
        )
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
base_url = "http://localhost:22000/"

In [None]:
class MapTransformation(object):
    def __init__(self,rdd):
        self._rdd = rdd
    
    def do_nothing(self):
        return self._rdd.map(lambda s: s)

In [None]:
# Configura proveniência prospectiva.

# Define o dataflow.
dataflow = PDE.dataflow("spark-example")

# Define as transformações.
dt1 = PDE.transformation("dt1")
dt2 = PDE.transformation("dt2")

# Define os atributos.
att1 = PDE.attribute("att1", PDE.attribute_type().TEXT.value)
att2 = PDE.attribute("att2", PDE.attribute_type().NUMERIC.value)
att3 = PDE.attribute("att3", PDE.attribute_type().TEXT.value)
att4 = PDE.attribute("att4", PDE.attribute_type().NUMERIC.value)
att5 = PDE.attribute("att5", PDE.attribute_type().TEXT.value)
att6 = PDE.attribute("att6", PDE.attribute_type().NUMERIC.value)


# Define os extratores.
ext1 = PDE.extractor("ext1", PDE.extractor_cartridge().EXTRACTION.value, PDE.extractor_extension().CSV.value)
ext2 = PDE.extractor("ext2", PDE.extractor_cartridge().EXTRACTION.value, PDE.extractor_extension().CSV.value)
ext3 = PDE.extractor("ext3", PDE.extractor_cartridge().EXTRACTION.value, PDE.extractor_extension().CSV.value)

# Adiciona os extratores aos atributos.
att1._extractor = ext1._tag
att2._extractor = ext1._tag
att3._extractor = ext2._tag
att4._extractor = ext2._tag
att5._extractor = ext3._tag
att6._extractor = ext3._tag

# Define os Datasets.
ds1 = PDE.set("ds1", PDE.set_type().OUTPUT.value)
ds2 = PDE.set("ds2", PDE.set_type().INPUT.value)
ds3 = PDE.set("ds3", PDE.set_type().OUTPUT.value)

# Adiciona os atributos aos datasets.
ds1._attributes = [att1.get_json(),att2.get_json()]
ds2._attributes = [att3.get_json(),att4.get_json()]
ds3._attributes = [att5.get_json(),att6.get_json()]

program  = PDE.program("testando","path")

# Adiciona os extratores aos datasets.
ds1._extractors = [ext1.get_json()]
ds2._extractors = [ext2.get_json()]
ds3._extractors = [ext3.get_json()]

# Adiciona os datasets às transformações.
dt1._sets = [ds1.get_json()]
dt2._sets = [ds2.get_json(),ds3.get_json()]

dt1._programs = [program.get_json()]
# Adiciona as transformações ao dataflow
dataflow._transformations = [dt1.get_json(),dt2.get_json()]

# Realiza post na api restful do dfa para inserir o dataflow.
PDE.ingest_dataflow_json(base_url,dataflow.get_json())

In [None]:
# Configura proveniência retrospectiva.
t1 = PDE.task("t1",dataflow._tag, dt1._tag, "resource", "workspace",PDE.status_type().READY.value, "1")
t2 = PDE.task("t2",dataflow._tag, dt2._tag, "resource", "workspace",PDE.status_type().READY.value, "2")

# Cria performances para t1.
pf1 = PDE.performance("pf1",PDE.method_type().COMPUTATION.value,"ti" ,t1._id)

# Inicializa a task1
t1._status = PDE.status_type().RUNNING.value

In [None]:
# Cria input.
rdd = sc.parallelize([Row(att1=str(uuid.uuid4()),att2=randint(0,100)) for x in range(5)])

m1 = MapTransformation(rdd)

r2 = m1.do_nothing()

# Escreve em csv.
df1 = sqlContext.createDataFrame(r2)

# Arquivo de saída de t1.
f1 = PDE.file("dt1-output.csv", ".")

df1.coalesce(1).write.save(path=f1._tag, format='csv', mode='append', sep=',')

# Adiciona informações à tarefa 1.
pf1._end_time = "tf"
t1._performances.append(pf1.get_json())
t1._files.append(f1.get_json())
t1._status = PDE.status_type().FINISHED.value

# Armazena t1 na base.
PDE.ingest_task_json(base_url,t1.get_json())

# Lê do csv.

# Adiciona arquivo de entrada à t2.
t2._files.append(f1.get_json())
# Criar arquivo de performance para t2.
pf2 = PDE.performance("pf2",PDE.method_type().COMPUTATION.value,"ti" ,t2._id)

df2 = (sqlContext.read
         .format("com.databricks.spark.csv")
         .option("header", "false")
         .load(f1._tag))

# Carrega informações na task2.
m2 = MapTransformation(df2.rdd)

r3 = m2.do_nothing()

# Adiciona informações à tarefa 2.
pf2._end_time = "tf"
t2._performances.append(pf1.get_json())
t2._status = PDE.status_type().FINISHED.value

# Armazena t2 na base.
PDE.ingest_task_json(base_url,t2.get_json())