# Atividade 01

#### Dependencias

In [1]:
!pip install pyspark findspark pydeequ==1.0.1

Looking in indexes: https://pypi.org/simple, http://nexus.olxbr.io/repository/pypi-olx/simple


#### Imports

In [2]:
import findspark
findspark.init()

import pydeequ
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult

from pyspark.sql import SparkSession, functions as F

Please set env variable SPARK_VERSION


#### Constantes

In [3]:
spark = (
    SparkSession.builder
    .appName("Atividade_01")
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .master("local[*]")
    .getOrCreate()
)

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
com.amazon.deequ#deequ added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4d593b9c-317d-4726-82dc-7f1712ebd75f;1.0
	confs: [default]
	found com.amazon.deequ#deequ;1.2.2-spark-3.0 in central
	found org.scalanlp#breeze_2.12;0.13.2 in central
	found org.scalanlp#breeze-macros_2.12;0.13.2 in central
	found org.scala-lang#scala-reflect;2.12.1 in central
	found com.github.fommil.netlib#core;1.1.2 in central
	found net.sf.opencsv#opencsv;2.3 in central
	found com.github.rwl#jtransforms;2.4.0 in central
	found junit#junit;4.8.2 in central
	found org.apache.commons#commons-math3;3.2 in central
	found org.spire-math#spire_2.12;0.13.0 in central
	found org.spire-math#spire-macros_2.12;0.13.0 in central
	found org.typelevel#machinist_2.12;0.6.1 in central
	found com.chuusai#shapeless_2.12;2.3.2 in central
	found org.typelevel#macro-compat_2.12;1.1.1 i

#### Leitura dos dados

In [4]:
df = (
    spark.read
    .option("delimiter", ",")
    .option("header", "true")
    .csv("./iris.txt")
)

#### Schema de Validação

In [5]:
check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult = (
    VerificationSuite(spark)
    .onData(df)
    .addCheck(
        (
            check
            .isComplete("sepal_length")
            .isComplete("sepal_width")
            .isComplete("sepal_length")
            .isComplete("petal_width")
            .isComplete("classEncoder")
            .isContainedIn("class", ['Iris-setosa', 'Iris-versicolor', 'Iris-virginica'])
        )
    )
    .run()
)

In [6]:
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

+------------+-----------+------------+--------------------+-----------------+------------------+
|       check|check_level|check_status|          constraint|constraint_status|constraint_message|
+------------+-----------+------------+--------------------+-----------------+------------------+
+------------+-----------+------------+--------------------+-----------------+------------------+



#### Filtrando dados validos 

In [7]:
df_valid = (
    df
    .filter(
        (df.sepal_length >= 4.3) & (df.sepal_length <= 7.9)
    )
    .filter(
        (df.sepal_width >= 2.0) & (df.sepal_width <= 4.4)
    )
    .filter(
        (df.petal_length >= 1.0) & (df.petal_length <= 6.9)
    )
    .filter(        
        (df.petal_width >= 0.1) & (df.petal_width <= 3.5)
    )
    .filter(
        (df.classEncoder >= 0) & (df.classEncoder <= 2)
    )
    .filter(
        df['class'].isin(['Iris-setosa', 'Iris-versicolor', 'Iris-virginica'])
    )
)

In [8]:
# Salvando dados validos
(
    df_valid
    .repartition(1)
    .write
    .mode("overwrite")
    .parquet("./iris-valid-data")
)

                                                                                

#### Dados Invalidos

In [9]:
df_invalid = df.exceptAll(df_valid)

In [10]:
df_invalid = df_invalid.withColumn(
    "messageError",
    F.when(
        df.sepal_length <= 4.3, "sepal_length menor que 4.3"
    ).when(
        df.sepal_length >= 7.9, "sepal_length maior que 7.9"
    ).when(
        df.sepal_width <= 2.0, "sepal_width menor que 2.0"
    ).when(
        df.sepal_width >= 4.4, "sepal_width maior que 4.4"
    ).when(
        df.petal_length <= 1.0, "petal_length menor que 1.0"
    ).when(
        df.petal_length >= 6.9, "petal_length maior que 6.9"
    ).when(
        df.petal_width <= 0.1, "petal_width menor que 0.1"
    ).when(
        df.petal_width >= 3.5, "petal_width maior que 3.5"
    ).when(
        df.classEncoder <= 0, "classEncoder menor que 0"
    ).when(
        df.classEncoder >= 2, "classEncoder maior que 2"
    ).otherwise(
        "class não esta é 'Iris-setosa', 'Iris-versicolor' ou 'Iris-virginica'"
    )
)

In [11]:
(
    df_invalid
    .repartition(1)
    .write
    .option("header", "true")
    .mode("overwrite")
    .csv("./iris-invalid-data")
)