# Atividade 01
### *ETL utilizando o Pyspark*

#### Criação do ambiente

In [None]:
!pip install pyspark==3.1.3

In [None]:
!apt-get update

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget -q https://downloads.apache.org/spark/spark-3.1.3/spark-3.1.3-bin-hadoop2.7.tgz
!tar -xvf spark-3.1.3-bin-hadoop2.7.tgz

In [None]:
!pip install -q findspark

In [None]:
!pip install pydeequ==1.0.1

In [7]:
import os 

In [None]:
os.system("wget -q https://downloads.apache.org/spark/spark-3.1.3/spark-3.1.3-bin-hadoop2.7.tgz")
os.system("tar xf /spark-3.1.3-bin-hadoop2.7.tgz")

In [9]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.3-bin-hadoop2.7"
os.environ["SPARK_VERSION"] = "3.1.3"

In [10]:
import findspark
findspark.init() 

In [None]:
from pyspark import SparkContext as sc
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import pydeequ
from pydeequ.checks import *
from pydeequ.verification import *
spark = (SparkSession
.builder
.config("spark.jars.packages", pydeequ.deequ_maven_coord)
.config("spark.jars.excludes", pydeequ.f2j_maven_coord)
.appName("trabalho_01")
.master("local[*]")
.getOrCreate())
spark

In [12]:
sparkContext=spark.sparkContext

#### Leitura do arquivo

In [32]:
df_iris = spark.read.option("header",True).option("inferSchema",True).format('csv').load('iris.csv')

In [33]:
df_iris.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- class: string (nullable = true)
 |-- classEncoder: integer (nullable = true)



In [34]:
df_iris.show(5)

+------------+-----------+------------+-----------+-----------+------------+
|sepal_length|sepal_width|petal_length|petal_width|      class|classEncoder|
+------------+-----------+------------+-----------+-----------+------------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|           0|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|           0|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|           0|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|           0|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|           0|
+------------+-----------+------------+-----------+-----------+------------+
only showing top 5 rows



#### Criar uma task pre-processamento para validar se os dados se encontram no formato correto utilizando Pyspark:

- sepal_length range( 4.3,7.9)
- sepal_width range(2.0,4.4)
- petal_length range(1.0,6.9)
- petal_width range(0.1,2.5)
- classEncoder range(0,2)
- class ['Iris-setosa', 'Iris-versicolor', 'Iris-virginica']

##### Utilizar a lib pydeequ para gerar o data quality report.

In [35]:
check_df_iris = Check(spark, CheckLevel.Warning, "Review Check")

In [36]:
check_df_iris_result = VerificationSuite(spark) \
    .onData(df_iris) \
    .addCheck(
        check_df_iris \
        .hasMin("sepal_length", lambda x: x >= 4.3) \
        .hasMax("sepal_length", lambda x: x <= 7.9) \
        .hasMin("sepal_width", lambda x: x >= 2) \
        .hasMax("sepal_width", lambda x: x <= 4.4) \
        .hasMin("petal_length", lambda x: x >= 1 ) \
        .hasMax("petal_length", lambda x: x <= 6.9) \
        .hasMin("petal_width", lambda x: x >= 0.1) \
        .hasMax("petal_width", lambda x: x <= 2.5) \
        .hasMin("classEncoder", lambda x: x >= 0) \
        .hasMax("classEncoder", lambda x: x <= 2) \
        .isContainedIn("class", ["Iris-setosa", "Iris-versicolor", "Iris-virginica"]))\
    .run()

- Gerar o report após a leitura do arquivo.

In [37]:
check_df_iris_result_df = VerificationResult.checkResultsAsDataFrame(spark, check_df_iris_result)
check_df_iris_result_df.show(truncate=False)

+------------+-----------+------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+-----------------------------------------------------+
|check       |check_level|check_status|constraint                                                                                                                                                                           |constraint_status|constraint_message                                   |
+------------+-----------+------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+-----------------------------------------------------+
+------------+-----------+------------+-------------------------------------------------------------------------------

#### Criar dataset tratado no formato parquet.

In [38]:
df_iris.createOrReplaceTempView('iris')

In [39]:
df_iris_tratado = spark.sql('''select * from iris
where sepal_length between 4.3 and 7.9 
and sepal_width between 2 and 4.4
and petal_length between 1 and 6.9
and petal_width between 0.1 and 2.5
and classEncoder between 0 and 2
and class in ('Iris-setosa', 'Iris-versicolor', 'Iris-virginica')
''')

In [40]:
df_iris_tratado.write.format('parquet').mode('overwrite').save('iris_tratado.parquet')

- Gerar o report após ter realizado o tratamento dos dados.

In [41]:
check_df_iris_tratado = Check(spark, CheckLevel.Warning, "Review Check")

In [42]:
check_df_iris_tratado_result = VerificationSuite(spark) \
    .onData(df_iris_tratado) \
    .addCheck(
        check_df_iris_tratado \
        .hasMin("sepal_length", lambda x: x >= 4.3) \
        .hasMax("sepal_length", lambda x: x <= 7.9) \
        .hasMin("sepal_width", lambda x: x >= 2) \
        .hasMax("sepal_width", lambda x: x <= 4.4) \
        .hasMin("petal_length", lambda x: x >= 1 ) \
        .hasMax("petal_length", lambda x: x <= 6.9) \
        .hasMin("petal_width", lambda x: x >= 0.1) \
        .hasMax("petal_width", lambda x: x <= 2.5) \
        .hasMin("classEncoder", lambda x: x >= 0) \
        .hasMax("classEncoder", lambda x: x <= 2) \
        .isContainedIn("class", ["Iris-setosa", "Iris-versicolor", "Iris-virginica"]))\
    .run()

In [43]:
check_df_iris_tratado_result_df = VerificationResult.checkResultsAsDataFrame(spark, check_df_iris_tratado_result)
check_df_iris_tratado_result_df.show(truncate=False)

+------------+-----------+------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+------------------+
|check       |check_level|check_status|constraint                                                                                                                                                                           |constraint_status|constraint_message|
+------------+-----------+------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+------------------+
+------------+-----------+------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--

- Criar um arquivo com os registros removidos contendo a mensagem do erro.

In [44]:
df_iris_removidos = spark.sql('''select * ,
case when sepal_length < 4.3 then 'sepal_length menor que 4.3'
      when sepal_length > 7.9 then 'sepal_length maior que 7.9'
      when sepal_width < 2 then 'sepal_width menor que 2'
      when sepal_width > 4.4 then 'sepal_width maior que 4.4'
      when petal_length < 1 then 'petal_length menor que 1'
      when petal_length > 6.9 then 'petal_length maior que 6.9'
      when petal_width < 0.1 then 'petal_width menor que 0.1'
      when petal_width > 2.5 then 'petal_width maior que 2.5'
      when classEncoder < 0 then 'classEncoder menor que 0'
      when classEncoder > 2 then 'classEncoder maior que 2'
      when class not in ('Iris-setosa', 'Iris-versicolor', 'Iris-virginica') then 'class diferente de Iris-setosa, Iris-versicolor ou Iris-virginica' 
      else '' end as erro
from iris
where sepal_length not between 4.3 and 7.9 
or sepal_width not between 2 and 4.4
or petal_length not between 1 and 6.9
or petal_width not between 0.1 and 2.5
or classEncoder not between 0 and 2
or class not in ('Iris-setosa', 'Iris-versicolor', 'Iris-virginica')
''')

In [45]:
df_iris_removidos.show(truncate=False)

+------------+-----------+------------+-----------+-----------+------------+--------------------------+
|sepal_length|sepal_width|petal_length|petal_width|class      |classEncoder|erro                      |
+------------+-----------+------------+-----------+-----------+------------+--------------------------+
|15.0        |3.9        |1.7         |0.4        |Iris-setosa|0           |sepal_length maior que 7.9|
|5.7         |15.0       |1.5         |0.4        |Iris-setosa|0           |sepal_width maior que 4.4 |
|4.6         |15.0       |1.0         |0.2        |Iris-setosa|0           |sepal_width maior que 4.4 |
|5.0         |3.0        |1.6         |20.0       |Iris-setosa|0           |petal_width maior que 2.5 |
|5.2         |3.4        |39.0        |0.2        |Iris-setosa|0           |petal_length maior que 6.9|
+------------+-----------+------------+-----------+-----------+------------+--------------------------+



In [46]:
df_iris_removidos.write.format('parquet').mode('overwrite').save('iris_removidos.parquet')