# Setup PySpark

In [1]:
#!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#!wget -q http://www-eu.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
#!tar xf spark-3.0.1-bin-hadoop3.2.tgz
#!rm spark-3.0.1-bin-hadoop3.2.tgz
#!pip install -q findspark
#!pip install -q pyspark

In [2]:
import os

os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ['PROJECT_ID'] = 'sas-project-296818'

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [4]:
import json
import pyspark.sql.functions as f

#Setup BigQuery

In [5]:
#!pip install -q google-auth
#!pip install -q --upgrade google-api-python-client
#!pip install -q datalab

Estabele conexão com o BigQuery

In [6]:
from google.colab import auth
auth.authenticate_user()
print('Authenticated')

Authenticated


#Desafio 1 - Criar schema

Cria schema das tabelas

In [7]:
from google.cloud import bigquery

project_id = 'sas-project-296818'
client = bigquery.Client(project=project_id)

#for dataset in client.list_datasets():
#  print(dataset.dataset_id)

# Cria tbquestions
client.query('''
  DROP TABLE IF EXISTS `assesment_dataset.tbquestions`;
  CREATE TABLE IF NOT EXISTS `assesment_dataset.tbquestions`
  (id INT64
  ,statement STRING
  ,answer_key INT64
  ,difficulty STRING
  ,grade INT64
  ,lecture INT64)
''')  

# Cria tboptions
client.query('''
  DROP TABLE IF EXISTS `assesment_dataset.tboptions`;
  CREATE TABLE IF NOT EXISTS `assesment_dataset.tboptions`
  (id_question INT64
  ,id_option INT64
  ,description STRING)
''')

# Cria tbproperty
client.query('''
  DROP TABLE IF EXISTS `assesment_dataset.tbproperty`;
  CREATE TABLE IF NOT EXISTS `assesment_dataset.tbproperty`
  (id_question INT64
  ,id_property STRING
  ,key STRING
  ,value INT64)
''')

# Cria tbskill
client.query('''
  DROP TABLE IF EXISTS `assesment_dataset.tbskill`;
  CREATE TABLE IF NOT EXISTS `assesment_dataset.tbskill`
  (id_question INT64
  ,id_skill INT64
  ,name STRING
  ,heading_topic STRING)
''')


<google.cloud.bigquery.job.QueryJob at 0x7fd898ad87f0>

# Importa JSON

In [8]:
df_json_data = spark.read.json('data_question.json', multiLine=True)

In [10]:
#df_json_data.printSchema()

#Popula tabela de questões

Cria dataframe

In [11]:
df_questions = df_json_data.select(f.col("id")
                                  ,f.col("statement")
                                  ,f.col("answer_key")
                                  ,f.col("difficulty")
                                  ,f.col("grade")
                                  ,f.col("lecture"))

#df_questions.show()

Insere dataframe com as questões no BigQuery

In [12]:
import google.datalab.bigquery as bq

bigquery_dataset_name = 'assesment_dataset'
bigquery_table_name = 'tbquestions'

# Define BigQuery dataset and table
dataset = bq.Dataset(bigquery_dataset_name)
table = bq.Table(bigquery_dataset_name + '.' + bigquery_table_name)

# Write the DataFrame to a BigQuery table
table.insert(df_questions.toPandas()) 

BigQuery Table - name: sas-project-296818.assesment_dataset.tbquestions

#Popula tabela de opções

Cria dataframe

In [13]:
df_options_exp = df_json_data.select(f.col("id").alias("id_question")
                                    ,f.explode("options").alias("options"))

df_options = df_options_exp.select(f.col("id_question")
                                  ,f.col("options.id").alias("id_option")
                                  ,f.col("options.description").alias("description"))

#df_options.show()

Insere dataframe no BigQuery

In [14]:
bigquery_dataset_name = 'assesment_dataset'
bigquery_table_name = 'tboptions'

# Define BigQuery dataset and table
dataset = bq.Dataset(bigquery_dataset_name)
table = bq.Table(bigquery_dataset_name + '.' + bigquery_table_name)

# Write the DataFrame to a BigQuery table
table.insert(df_options.toPandas()) 

BigQuery Table - name: sas-project-296818.assesment_dataset.tboptions

#Popula tabela de propriedades

Cria dataframe

In [15]:
df_properties_exp = df_json_data.select(f.col("id").alias("id_question")
                                       ,f.explode("properties").alias("properties"))

df_properties = df_properties_exp.select(f.col("id_question")
                                        ,f.col("properties.id").alias("id_property")
                                        ,f.col("properties.key").alias("key")
                                        ,f.col("properties.value").alias("value"))

#df_properties.show()

Insere dataframe no BigQuery

In [16]:
bigquery_dataset_name = 'assesment_dataset'
bigquery_table_name = 'tbproperty'

# Define BigQuery dataset and table
dataset = bq.Dataset(bigquery_dataset_name)
table = bq.Table(bigquery_dataset_name + '.' + bigquery_table_name)

# Write the DataFrame to a BigQuery table
table.insert(df_properties.toPandas()) 

BigQuery Table - name: sas-project-296818.assesment_dataset.tbproperty

#Popula tabela de habilidades

Cria dataframe

In [17]:
df_skill = df_json_data.select(f.col("id").alias("id_question")
                              ,f.col("skill.code").alias("id_skill")
                              ,f.col("skill.name").alias("name")
                              ,f.col("skill.heading_topic.name").alias("heading_topic"))

#df_skill.show()

Insere dataframe no BigQuery

In [18]:
bigquery_dataset_name = 'assesment_dataset'
bigquery_table_name = 'tbskill'

# Define BigQuery dataset and table
dataset = bq.Dataset(bigquery_dataset_name)
table = bq.Table(bigquery_dataset_name + '.' + bigquery_table_name)

# Write the DataFrame to a BigQuery table
table.insert(df_skill.toPandas()) 

BigQuery Table - name: sas-project-296818.assesment_dataset.tbskill

#Desafio 2 - Tabela 1

Busca todas as propriedades de cada questão

In [20]:
df_questions.join(df_properties, df_questions.id == df_properties.id_question, "inner").drop("id_question") \
            .select(f.col("id_property").alias("property_id"),
                    f.col("key").alias("property_name"),
                    f.col("id").alias("question_id"),
                    f.col("statement").alias("question_statement")) \
            .show()

+--------------------+-------------+-----------+--------------------+
|         property_id|property_name|question_id|  question_statement|
+--------------------+-------------+-----------+--------------------+
|8f24467f-f351-47a...|          THE|          0|Foreign while bas...|
|52f7abf2-65b8-406...|          MAL|          0|Foreign while bas...|
|cb6e4a29-d88f-45d...|          THE|          0|Foreign while bas...|
|0e028c51-329e-4e7...|       FOUND.|          0|Foreign while bas...|
|bc124a46-891c-4b0...|         2,00|          0|Foreign while bas...|
|dd29752f-83ce-45d...|       THERÓN|          1|Machines designed...|
|541ec6fe-9cc4-4f9...|         BROO|          1|Machines designed...|
|ce09a035-f102-43a...|          OFT|          1|Machines designed...|
|8c73eb74-b05e-4dc...|       LEAGEN|          1|Machines designed...|
|c8e9917d-5545-477...|           OF|          1|Machines designed...|
|0761cdc8-b9ef-43e...|         INED|          2|Esa to the predom...|
|f539cf2d-d16c-4df..

#Desafio 2 - Tabela 2

Lê tabela de **Assesments** e suas dimensões  no BigQuery para relacionar com a tabela de propriedades

In [87]:
df_assesment = client.query('''
  SELECT * 
  FROM `assesment_dataset.tbassesment`
''').to_dataframe()

df_assesment_date = client.query('''
  SELECT assesment_id
      ,EXTRACT(YEAR FROM date) assesment_year
  FROM `assesment_dataset.tbassesment_date`
  GROUP BY assesment_id, EXTRACT(YEAR FROM date)
''').to_dataframe()

df_assesment_type = client.query('''
  SELECT * 
  FROM `assesment_dataset.tbassesment_type`
''').to_dataframe()

In [88]:
df_assesment = spark.createDataFrame(df_assesment)
df_assesment_date = spark.createDataFrame(df_assesment_date)
df_assesment_type = spark.createDataFrame(df_assesment_type)

Resgata ano e tipo de cada Assesment

In [89]:
#df_assesment_date = df_assesment_date.select(f.col("assesment_id"), year(f.col("date")).alias("assesment_year")).groupBy("assesment_id").max("assesment_year")

df_assesment_type = df_assesment_type.select(df_assesment_type.id,df_assesment_type.name.alias("assesment_type"))

df_assesment = df_assesment.join(df_assesment_type, df_assesment.assesment_type_id == df_assesment_type.id, "left").drop(df_assesment_type.id) \
                           .join(df_assesment_date, df_assesment.id == df_assesment_date.assesment_id, "inner").drop("assesment_id") \
                           .select(f.col("id"),
                                   f.col("assesment_type"),
                                   f.col("name"),
                                   f.col("assesment_year"))

In [90]:
df_assesment.show()

+---+--------------+-----------+--------------+
| id|assesment_type|       name|assesment_year|
+---+--------------+-----------+--------------+
|  0|        TIPO 1|PROVA TESTE|          2017|
|  0|        TIPO 1|PROVA TESTE|          2018|
+---+--------------+-----------+--------------+



Retorna características e quantidade de cada propriedade das questões que caem em cada avaliação por ano.

In [93]:
df_properties.join(df_assesment, df_properties.id_question % 7 == df_assesment.id, "inner").drop("id_question") \
            .select(f.col("id_property").alias("property_id"),
                    f.col("key").alias("property_name"),
                    f.col("id").alias("assesment_id"),
                    f.col("assesment_type"),
                    f.col("name").alias("assesment_name"),
                    f.col("assesment_year")) \
            .groupBy("property_id","property_name","assesment_id","assesment_type","assesment_name","assesment_year").agg(f.count("property_id").alias("property_size")) \
            .orderBy("assesment_id","property_id","assesment_year") \
            .show()

+--------------------+-------------+------------+--------------+--------------+--------------+-------------+
|         property_id|property_name|assesment_id|assesment_type|assesment_name|assesment_year|property_size|
+--------------------+-------------+------------+--------------+--------------+--------------+-------------+
|0030c029-aafd-465...|           IN|           0|        TIPO 1|   PROVA TESTE|          2017|            1|
|0030c029-aafd-465...|           IN|           0|        TIPO 1|   PROVA TESTE|          2018|            1|
|003e8cb0-ebbd-4dd...|           HE|           0|        TIPO 1|   PROVA TESTE|          2017|            1|
|003e8cb0-ebbd-4dd...|           HE|           0|        TIPO 1|   PROVA TESTE|          2018|            1|
|00414d6a-a7b1-440...|        VIKEY|           0|        TIPO 1|   PROVA TESTE|          2017|            1|
|00414d6a-a7b1-440...|        VIKEY|           0|        TIPO 1|   PROVA TESTE|          2018|            1|
|0042fbcd-ebf8-413.