# Stack - Trabalhando com Spark

- **Task**
  - Converta os arquivos em csv para parquet e os envie para processing zone.

- **Dataset**
  - Usaremos esse dataset https://www.kaggle.com/nhs/general-practice-prescribing-data

### Modos de leitura
- **permissive**: *Define todos os campos para NULL quando encontra registros corrompidos e coloca todos registros corrompidos em uma coluna chamada _corrupt_record.* (default)

- **dropMalformed**: *Apaga uma linha corrompida ou que este não consiga ler.*

- **failFast**: *Falha imediatamente quando encontra uma linha que não consiga ler.*

In [0]:
# ler arquivos vários arquivos csv do dbfs com spark
# Lendo todos os arquivos .csv do diretório bigdata (>4GB)

df = spark.read.format("csv")\
.option("header", "True")\
.option("inferSchema","True")\
.load("/FileStore/tables/RawData/*.csv")

In [0]:
df.printSchema()

root
 |-- practice: integer (nullable = true)
 |-- bnf_code: integer (nullable = true)
 |-- bnf_name: integer (nullable = true)
 |-- items: integer (nullable = true)
 |-- nic: double (nullable = true)
 |-- act_cost: double (nullable = true)
 |-- quantity: integer (nullable = true)



In [0]:
# imprime as 10 primeiras linhas do dataframe
display(df.head(10))

practice,bnf_code,bnf_name,items,nic,act_cost,quantity
5668,8092,592,2,44.1,40.84,189
1596,17512,16983,2,1.64,1.64,35
1596,25587,16124,1,1.26,1.28,42
1596,12551,1282,2,0.86,1.02,42
1596,18938,10575,1,1.85,1.82,56
1596,8777,21507,1,3.31,3.18,56
1596,9369,12008,1,63.15,58.56,56
1596,27926,17643,2,158.66,147.07,56
1596,26148,10230,1,0.35,0.44,14
1596,9148,3381,1,0.26,0.35,7


In [0]:
# conta a quantidade de linhas
df.count()

412902128

#### Leva os dados convertidos para a Processing Zone

- *Atente para NÃO escrever e ler arquivos parquet em versoes diferentes*

In [0]:
# Converte para formato parquet
df.write.format("parquet")\
.mode("overwrite")\
.save("/FileStore/tables/processing/df-parquet-file.parquet")

In [0]:
# lendo arquivos parquet
# atente para a velocidade de leitura

df_parquet = spark.read.format("parquet")\
.load("/FileStore/tables/processing/df-parquet-file.parquet")

In [0]:
# conta a quantidade de linhas do dataframe
df_parquet.count()

412902128

In [0]:
%scala
// script para pegar tamanho em Gigabytes
val path="/FileStore/tables/processing/df-parquet-file.parquet"
val filelist=dbutils.fs.ls(path)
val df_temp = filelist.toDF()
df_temp.createOrReplaceTempView("adlsSize")


In [0]:
%sql
-- consulta a view criada.
select round(sum(size)/(1024*1024*1024),3) as sizeInGB from adlsSize

sizeInGB
4.107


In [0]:
display(df_parquet.head(10))

practice,bnf_code,bnf_name,items,nic,act_cost,quantity
3626,12090,20521,3,8.4,7.82,168
3626,23511,11576,1,32.18,29.81,28
3626,14802,14672,162,141.13,133.93,4760
3626,14590,10011,17,15.01,14.12,532
3626,24483,13726,69,57.57,54.67,2121
3626,7768,22070,155,113.03,109.41,4144
3626,1877,13598,102,68.5,67.4,2370
3626,18110,3990,189,156.66,150.44,5222
3626,14058,2144,23,23.52,22.48,588
3626,4558,5695,32,116.64,109.21,756


In [0]:
#Add columns to DataFrame using SQL
df_parquet.createOrReplaceTempView("view_df_parquet")

spark.sql("SELECT BNF_CODE as Bnf_code \
                  ,SUM(ACT_COST) as Soma_Act_cost \
                  ,SUM(QUANTITY) as Soma_Quantity \
                  ,SUM(ITEMS) as Soma_items \
                  ,SUM(ACT_COST) as Media_Act_cost \
           FROM view_df_parquet \
           GROUP BY bnf_code").show()

+--------+--------------------+-------------+----------+--------------------+
|Bnf_code|       Soma_Act_cost|Soma_Quantity|Soma_items|      Media_Act_cost|
+--------+--------------------+-------------+----------+--------------------+
|   18498|2.5857266359999824E7|       334546|    334028|2.5857266359999824E7|
|   10206|   686586.2599999988|     39952400|     52506|   686586.2599999988|
|    9376|  3462964.9900000207|     13596394|    291217|  3462964.9900000207|
|    8389|   8558775.460000006|     74951150|   5737494|   8558775.460000006|
|    7253|   9033206.659999918|    112079470|   2175438|   9033206.659999918|
|   16861|   793183.7800000156|     16390558|    346210|   793183.7800000156|
|   11141|   919899.5200000005|       743743|      8747|   919899.5200000005|
|   24347|  232912.30999999994|       716567|     17985|  232912.30999999994|
|    5803|  250780.74999999988|        59106|      4324|  250780.74999999988|
|    7982|   417322.7799999998|        27200|     12316|   41732