# Ghi Parquet file từ CSV
### Trong phần bên dưới, chúng ta sẽ chuyển đổi file CSV thành Parquet file
* Sử dụng Spark trên EMR cluster để đọc data từ S3 Bucket
* Thực hiện ghi Parquet file từ dataframe được đọc từ file CSV
* Thực hiện đọc Parquet file
* Thực hiện một số thao tác truy vấn cơ bản với dataframe được tạo từ Parquet file

#### Kiểm tra kết nối giữa AWS EMR Cluster và Notebook instance

In [24]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3,application_1670081014291_0004,spark,idle,Link,Link,,✔


#### Đọc file CSV

In [3]:
val df = spark.read.format("csv").option("header", true).load("s3://group8bucket/train.csv")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

df: org.apache.spark.sql.DataFrame = [row_id: string, timestamp: string ... 8 more fields]


#### Thực hiện một số lệnh cơ bản với dataframe
* Hàm count()
* Hàm show()

In [4]:
df.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res1: Long = 101230332


In [5]:
df.show(50)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+---------+-------+----------+---------------+-----------------+-----------+------------------+---------------------------+------------------------------+
|row_id|timestamp|user_id|content_id|content_type_id|task_container_id|user_answer|answered_correctly|prior_question_elapsed_time|prior_question_had_explanation|
+------+---------+-------+----------+---------------+-----------------+-----------+------------------+---------------------------+------------------------------+
|     0|        0|    115|      5692|              0|                1|          3|                 1|                       null|                          null|
|     1|    56943|    115|      5716|              0|                2|          2|                 1|                    37000.0|                         False|
|     2|   118363|    115|       128|              0|                0|          0|                 1|                    55000.0|                         False|
|     3|   131167|    115|  

#### Thêm thự viện SaveMode
##### Thư viện này dùng để thay đổi chế độ ghi file parquet bao gồm:
* Append: Dữ liệu sẽ được nối thêm ở địa chỉ ghi
* Overwrite: Dữ liệu có sẵn sẽ được thay thế bởi dữ liệu được ghi mới

In [7]:
import org.apache.spark.sql.SaveMode

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.sql.SaveMode


#### Thực hiện ghi Parquet file với chế độ Append với dữ liệu được lấy từ df (CSV)

In [8]:
df.select("*").write.mode(SaveMode.Append).format("parquet").save("s3://group8bucket/output/")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Biến môi trường để đọc Parquet file

In [9]:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@72f53673


#### Thực hiện đọc Parquet file từ S3 Bucket

In [10]:
val dftrain = sqlContext.read.parquet("s3://group8bucket/output/")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

dftrain: org.apache.spark.sql.DataFrame = [row_id: string, timestamp: string ... 8 more fields]


In [11]:
dftrain.show(15)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----------+---------+----------+---------------+-----------------+-----------+------------------+---------------------------+------------------------------+
| row_id| timestamp|  user_id|content_id|content_type_id|task_container_id|user_answer|answered_correctly|prior_question_elapsed_time|prior_question_had_explanation|
+-------+----------+---------+----------+---------------+-----------------+-----------+------------------+---------------------------+------------------------------+
|4876812|1316378367|104334599|      7888|              0|              249|          0|                 1|                    21000.0|                          True|
|4876813|1320646728|104334599|      8011|              0|              251|          2|                 0|                    22000.0|                         False|
|4876814|1322487215|104334599|      6051|              0|              252|          2|                 0|                    51000.0|                          True|
|487

#### Thực hiện tạo view ParquetTable từ dataframe được đọc từ Parquet file 

In [12]:
dftrain.createOrReplaceTempView("ParquetTable")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Thực hiện một số truy vấn cơ bản với table ParquetTable

In [25]:
spark.sql("SELECT * FROM ParquetTable").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----------+---------+----------+---------------+-----------------+-----------+------------------+---------------------------+------------------------------+
| row_id| timestamp|  user_id|content_id|content_type_id|task_container_id|user_answer|answered_correctly|prior_question_elapsed_time|prior_question_had_explanation|
+-------+----------+---------+----------+---------------+-----------------+-----------+------------------+---------------------------+------------------------------+
|4876812|1316378367|104334599|      7888|              0|              249|          0|                 1|                    21000.0|                          True|
|4876813|1320646728|104334599|      8011|              0|              251|          2|                 0|                    22000.0|                         False|
|4876814|1322487215|104334599|      6051|              0|              252|          2|                 0|                    51000.0|                          True|
|487

In [13]:
spark.sql("SELECT * FROM ParquetTable where user_answer >= 2").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----------+---------+----------+---------------+-----------------+-----------+------------------+---------------------------+------------------------------+
| row_id| timestamp|  user_id|content_id|content_type_id|task_container_id|user_answer|answered_correctly|prior_question_elapsed_time|prior_question_had_explanation|
+-------+----------+---------+----------+---------------+-----------------+-----------+------------------+---------------------------+------------------------------+
|4876813|1320646728|104334599|      8011|              0|              251|          2|                 0|                    22000.0|                         False|
|4876814|1322487215|104334599|      6051|              0|              252|          2|                 0|                    51000.0|                          True|
|4876815|1322505627|104334599|      4436|              0|              253|          2|                 1|                    12000.0|                          True|
|487

In [14]:
spark.sql("SELECT * FROM ParquetTable where answered_correctly = 0").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----------+---------+----------+---------------+-----------------+-----------+------------------+---------------------------+------------------------------+
| row_id| timestamp|  user_id|content_id|content_type_id|task_container_id|user_answer|answered_correctly|prior_question_elapsed_time|prior_question_had_explanation|
+-------+----------+---------+----------+---------------+-----------------+-----------+------------------+---------------------------+------------------------------+
|4876813|1320646728|104334599|      8011|              0|              251|          2|                 0|                    22000.0|                         False|
|4876814|1322487215|104334599|      6051|              0|              252|          2|                 0|                    51000.0|                          True|
|4876830|1325577017|104334599|      7789|              0|              250|          0|                 0|                    17000.0|                         False|
|487

#### Kết thúc bài làm.