<a href="https://colab.research.google.com/github/edercarvalho/pipelinepyspark/blob/master/Teste_DE_SAS_v1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Pipeline Engenharia de Dados - Teste SAS

**Info**

Este script foi criado utilizando Python 3.6 com Spark 2.4.5 

**Iniciar**

Para começar precisamos configurar o ambiente instalando os pacotes e biblioteca necessarios para o funcionamento do script.


In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
# Configurando as variaveis de Ambiente 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
# Iniciando o Spark
import findspark
findspark.init()

# Importando pacotes necessario para criação do contexo de DataLake utilizando SQL e HIVE que sera usado depois
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession,HiveContext

#Inicializando o Hive
SparkContext.setSystemProperty("hive.metastore.uris", "thrift://nn1:9083")

# Criando a Sessão Spark
spark = (SparkSession
       .builder
       .master("local[*]")
       .enableHiveSupport()
       .getOrCreate())


In [0]:
# Importando o arquivo data_question.json que iremos trabalhar para dentro do colab
from google.colab import files

#Ira abrir a opção de você importar o aquivo , vou deixar aqui github para você fazer o download
files.upload()

# **Criando o Schema**

Eu escolhi criar o Schema utilizando Spark SQL **StructType** e **StructField** devido ser mais simples e trabalhar bem com Jsons complexo e com campos aninhados.

In [0]:
# Importantado as Estruturas para criação do Schema
from pyspark.sql.types import *

# Criando o Schema
schema_questions = StructType([
    StructField("answer_key", IntegerType(), True),
    StructField("difficulty", StringType(), True),
    StructField("grade", LongType(), True),
    StructField("id", IntegerType(), True),
    StructField("lecture", IntegerType(), True),
    StructField("options", ArrayType(
            StructType([
                StructField("description",StringType() , True),
                StructField("id", IntegerType(), True),
                                      ])
        ), True), 
StructField("properties", ArrayType(
            StructType([
                StructField("id", StringType(), True),
                StructField("key",StringType() , True),
                StructField("value", IntegerType(), True)
                      ])
        ), True),
StructField("skill", StructType([
        StructField("code", LongType(), True),
        StructField("heading_topic", StructType([
              StructField("id", IntegerType(), True),
              StructField("name", StringType(), True),
             ])),
        StructField("name", StringType(), True),
    ])),
StructField("statement", StringType(), True)
   ])



In [0]:
# Carregando o Json e Inferindo o Schema
df_json = spark.read.option("multiLine",True).json("data_question.json",schema=schema_questions)

# Verificando o Schema
df_json.printSchema()

root
 |-- answer_key: integer (nullable = true)
 |-- difficulty: string (nullable = true)
 |-- grade: long (nullable = true)
 |-- id: integer (nullable = true)
 |-- lecture: integer (nullable = true)
 |-- options: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- id: integer (nullable = true)
 |-- properties: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: integer (nullable = true)
 |-- skill: struct (nullable = true)
 |    |-- code: long (nullable = true)
 |    |-- heading_topic: struct (nullable = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |-- name: string (nullable = true)
 |-- statement: string (nullable = true)



In [0]:
# Verificando o df_json
df_json.show(5)

+----------+----------+-----+---+-------+--------------------+--------------------+--------------------+--------------------+
|answer_key|difficulty|grade| id|lecture|             options|          properties|               skill|           statement|
+----------+----------+-----+---+-------+--------------------+--------------------+--------------------+--------------------+
|         3|      EASY|    0|  0|      0|[[Government acco...|[[8f24467f-f351-4...|[5897001673776287...|Foreign while bas...|
|         5|    MEDIUM|    0|  1|      0|[[France, Luxembo...|[[dd29752f-83ce-4...|[6712208284592011...|Machines designed...|
|         5|    MEDIUM|    0|  2|      0|[[Is to interim p...|[[0761cdc8-b9ef-4...|[8179027570588556...|Esa to the predom...|
|         1|      HARD|    0|  3|      0|[[A planetary Con...|[[4d8d5899-493c-4...|[7409025453606141...|The critique new ...|
|         3|      EASY|    0|  4|      0|[[Bleues), Edgar ...|[[9ece5940-df29-4...|[7147975464900988...|Sheri killacke

In [0]:
# Criando o dataframe Exame
data_exam = [
         {
          'id': 0,
          'tipo': 'Enem',
          'ano': 2019,
         },
        {
          'id': 1,
          'tipo': 'Enem',
          'ano': 2020,
         },
        {
          'id': 2,
          'tipo': 'Fuvest',
          'ano': 2020,
         },
        {
          'id': 3,
          'tipo': 'Fuvest',
          'ano': 2019,
         },
        {
          'id': 4,
          'tipo': 'uerj',
          'ano': 2020,
         }
]

# Criando o Schema para DataFrame Exame
schema_exame = StructType([
    StructField("id", IntegerType(), True),
    StructField("tipo", StringType(), True),
    StructField("ano", IntegerType(), True)
])

df_exam = spark.createDataFrame(data_exam, schema_exame)

# Mostrando o df
df_exam.show()

+---+------+----+
| id|  tipo| ano|
+---+------+----+
|  0|  Enem|2019|
|  1|  Enem|2020|
|  2|Fuvest|2020|
|  3|Fuvest|2019|
|  4|  uerj|2020|
+---+------+----+



**TABELA 1**

Criando o data frame que servirar para criar a Tabela1 

In [0]:
# Importando as funções do pacote Sql para o tratamento dos dados
from pyspark.sql.functions import *

# Criando o primiro data frame para base das sontrução das futuras tabelas
df_base = df_json.select(
                      expr("id as question_id"),
                      expr("statement as question"),
                      expr("properties.id as properties_id"),
                      expr("properties.key  as properties_name")
                      )

# Criando e Adicionando a Coluna calculada "exam_id" para se relacionar com df_exam
df_0 = df_base.withColumn('exam_id',df_base['question_id'] % 5)

#Juntando as Tabelas para a criação do data framae que se tornara a Tabela1 do solicitado no Teste
df_1 = df_0.join(df_exam,df_0.exam_id == df_exam.id).drop(df_exam.id)

# Ordenando e removendo a coluna ano 
tab1 = df_1.orderBy("question_id").drop(df_1.ano)

tab1.show()

+-----------+--------------------+--------------------+--------------------+-------+------+
|question_id|            question|       properties_id|     properties_name|exam_id|  tipo|
+-----------+--------------------+--------------------+--------------------+-------+------+
|          0|Foreign while bas...|[8f24467f-f351-47...|[THE, MAL, THE, F...|      0|  Enem|
|          1|Machines designed...|[dd29752f-83ce-45...|[THERÓN, BROO, OF...|      1|  Enem|
|          2|Esa to the predom...|[0761cdc8-b9ef-43...|[INED, MODGES, CO...|      2|Fuvest|
|          3|The critique new ...|[4d8d5899-493c-47...|[KM, TORLD, INABL...|      3|Fuvest|
|          4|Sheri killackeyal...|[9ece5940-df29-49...|[BUIRLY, ANG, AK)...|      4|  uerj|
|          5|Other tributary n...|[8b58e1b0-93b7-4d...|[FUNCEL, TOGRA,, ...|      0|  Enem|
|          6|One-way movement....|[63a27710-6caa-4a...|[FORTMO, HAROPL, ...|      1|  Enem|
|          7|Involved, or we o...|[7d146dbe-6d7f-41...|[IN, EIRTWE, AND,...|    

**TABELA 2**

Criando o data frame que servirar para criar a Tabela2

In [0]:
# Agrupando a base de acordo com a quantidade de vezes que a propiedade caiu
df_2 = df_1.groupby(
                 expr('properties_id'),
                 expr("properties_name"),
                 expr('exam_id'),expr('tipo'),
                 expr('ano')
                  ).agg(
                count('properties_id')
                .alias('properties_size')
                 )
#Organizando a sequencia das colunas
tab2 = df_2.select("properties_id","properties_name","properties_size","exam_id","tipo","ano")

tab2.show()

+--------------------+--------------------+---------------+-------+------+----+
|       properties_id|     properties_name|properties_size|exam_id|  tipo| ano|
+--------------------+--------------------+---------------+-------+------+----+
|[93e00441-0796-49...| [LAT, BY, 4, PERTA]|              1|      0|  Enem|2019|
|[c023203d-4b95-47...|       [IND, SUCTED]|              1|      0|  Enem|2019|
|[a4b0bfa7-4ef2-43...|          [UN., LAW]|              1|      0|  Enem|2019|
|[522dcd15-0968-43...|        [BEFUND, OF]|              1|      0|  Enem|2019|
|[4aeeb7a5-2cc8-4a...|[WEACHA, DAYN, CA...|              1|      0|  Enem|2019|
|[77be37cf-dcc9-42...|[OF, ITA, FORIVE,...|              1|      0|  Enem|2019|
|[025d1f4e-3e4f-4c...|[MORDPA, NEUQUE, ...|              1|      0|  Enem|2019|
|[87791be8-d26c-4e...|  [AS, ARST, CLAGAR]|              1|      0|  Enem|2019|
|[22574ef3-518f-48...|        [DITINK, IS]|              1|      1|  Enem|2020|
|[01e5cc4a-7776-4d...|[THE, AS, COVITI,.

#**Escrevendo as Tabelas no Hive**

Como ja tinha estanciado no incio do script o contexto do Hive na Sessão do Spark então já posso criar direto as Tabelas

In [0]:
# Criando a Tabela1
tab1.write.saveAsTable('Tabela1')

# Lendo a Tabela1
spark.sql("SELECT * FROM Tabela1").show()

+-----------+--------------------+--------------------+--------------------+-------+------+
|question_id|            question|       properties_id|     properties_name|exam_id|  tipo|
+-----------+--------------------+--------------------+--------------------+-------+------+
|       1525|Solve specific su...|[5002eae8-14be-48...|[INTS, THE, ST, W...|      0|  Enem|
|       1526|Main part poor te...|[3cbce0e1-590a-48...|[OF, CONS., LEAGE...|      1|  Enem|
|       1527|Egypt web henry r...|[9cf00a98-b35c-46...|  [A, IT?, 695, THE]|      2|Fuvest|
|       1528|Ivan albright and...|[84cd5fb4-2088-42...|[THED, SOLUDG, IN...|      3|Fuvest|
|       1529|Ottawa, calgary, ...|[2c47ef6d-294f-49...|[FARE, TOR, REMON...|      4|  uerj|
|       1530|Second republic n...|[10e50069-6d11-40...|[MONCH-, CREAL, E...|      0|  Enem|
|       1531|Around mid-may de...|[23cf3fc3-a9f0-47...|[DAYN, BESED, THE...|      1|  Enem|
|       1532|The geological ho...|[dfaf8a1e-2532-47...|[AS, LIND, ETHE, ...|    

In [0]:
# Criando a Tabela2
tab2.write.saveAsTable('Tabela2')

# Lendo a Tabela1
spark.sql("SELECT * FROM Tabela2").show()

+--------------------+--------------------+---------------+-------+------+----+
|       properties_id|     properties_name|properties_size|exam_id|  tipo| ano|
+--------------------+--------------------+---------------+-------+------+----+
|[fa398b5c-24d5-40...|     [WCL-93, PERTA]|              1|      0|  Enem|2019|
|[9af924f5-a6de-40...|         [NUE, SOUT]|              1|      0|  Enem|2019|
|[6b723293-60c0-4e...|[TH'S, UNTIMS, KN...|              1|      0|  Enem|2019|
|[6924781d-aff5-4e...|[COVITI, RECING, ...|              1|      0|  Enem|2019|
|[e101ade9-508d-49...|     [THER., POPLEN]|              1|      0|  Enem|2019|
|[43037bb9-ccf4-48...|[HATION, TO, THE, A]|              1|      0|  Enem|2019|
|[390bc16f-c039-41...|[WIDESS, AL, (SUS...|              1|      1|  Enem|2020|
|[290d0ef1-9fe0-4d...|    [BEIROU, JOSPER]|              1|      1|  Enem|2020|
|[1b5fdd61-3d2a-4b...|      [THEIRD, HANE]|              1|      1|  Enem|2020|
|[680624e2-78a1-42...|               [TH

#**Referencias**

https://spark.apache.org/docs/latest/sql-data-sources.html
https://sparkbyexamples.com/spark/spark-schema-explained-with-examples/
https://cwiki.apache.org/confluence/display/Hive/AdminManual+Metastore+Administration
https://creativedata.atlassian.net/wiki/spaces/SAP/pages/82255289/Pyspark+-+Read+Write+files+from+Hive