# ETL Using Apache Spark

In this lab I will pull data from a source system, transform it and store it back to the target system.

## Objectives

After completing this lab, you will be able to:

1.  Pull data from the HMP dataset.
2.  Create a Spark Dataframe from the raw data.
3.  Write the Dataframe to CSV.
4.  Convert the CSV data to PARQUET.
5.  Condense PARQUET to a single file.
6.  Upload the Parquet file to Cloud Object Store


I will use the HMP dataset for this notebook. The dataset is publically available [here](https://github.com/wchill/HMP_Dataset).

This notebook will also be utilizing the **CLAIMED** library, which is used for AI, Machine Learning, ETL and Data Science work. CLAIMED is a library of jupyter notebooks used for specific or general tasks. 

Example: Using CLAIMED to run a SQL statement against a PostgreSQL database and storing the results to a CSV file

Below, two notebooks from the CLAIMED library are dragged and dropped to the **Elyra** pipeline editor canvas. The first notebook reads data from a postgresql database, and the second notebook writes to a cloud object store. On the right there are eight parameters that have to be specified for the code in the notebooks to work. This whole process can be completed without having to start from step 0 and google searching as you go, and fixing any unexpected outcomes or bugs along the way. The cool thing about using Elyra is that it transpiles to either AirFlow or KubeFlow pipelines out of the box.

For info about CLAIMED and Elyra checkout the links below:
https://elyra.readthedocs.io/
https://github.com/elyra-ai/component-library

## Import the CLAIMED library to JupyterLab

In [1]:
%%bash
rm -Rf claimed
git clone https://github.com/IBM/claimed.git
cd claimed

Cloning into 'claimed'...
Updating files:  69% (290/417)Updating files:  70% (292/417)Updating files:  71% (297/417)Updating files:  72% (301/417)Updating files:  73% (305/417)Updating files:  74% (309/417)Updating files:  75% (313/417)Updating files:  76% (317/417)Updating files:  77% (322/417)Updating files:  78% (326/417)Updating files:  79% (330/417)Updating files:  80% (334/417)Updating files:  81% (338/417)Updating files:  82% (342/417)Updating files:  83% (347/417)Updating files:  84% (351/417)Updating files:  85% (355/417)Updating files:  86% (359/417)Updating files:  87% (363/417)Updating files:  88% (367/417)Updating files:  89% (372/417)Updating files:  90% (376/417)Updating files:  91% (380/417)Updating files:  92% (384/417)Updating files:  93% (388/417)Updating files:  94% (392/417)Updating files:  95% (397/417)Updating files:  96% (401/417)Updating files:  97% (405/417)Updating files:  98% (409/417)Updating files:  99% (413/417)Updating file

## Start the ETL process

### Task A : Pull the data


Pull the data from the remote github repository and convert it to CSV. Open the notebook called *input-hmp.ipynb* in folder *claimed/component-library/input* and run each cell top down, one by one. 

After some time there should see a folder called “data.csv” in the “data” directory you’ve previously created. Note that Apache Spark always creates folders containing individual files, one per partition. This is not a problem because Spark doesn’t distinguish between files and folders and threats folders as they were files.


In [2]:
!ipython ./claimed/component-library/input/input-hmp.ipynb data_dir=./data/ sample=0.01

[22;0t]0;IPython: adriancortes/Downloads38
Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)
22/11/18 10:12:43 WARN Utils: Your hostname, Adrians-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.4 instead (on interface en0)
22/11/18 10:12:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/11/18 10:12:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Cloning into 'HMP_Dataset'...
remote: Enumerating objects: 865, done.[K
remote: Total 865 (delta 0), reused 0 (delta 0), pack-reused 865[K
Receiving objects: 100% (865/865), 1010.96 KiB | 1.58 MiB/s, done.
Skipping: Accelerometer-2011-05-30-20-59-04-liedown_bed-f1.txt
Skipping

### Convert from CSV to PARQUET

CSV isn't a memory and storage efficient file format - the same holds for JSON by the way. Therefore,
formats like [Avro](https://en.wikipedia.org/wiki/Apache_Avro?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork26766988-2022-01-01), [Parquet](https://en.wikipedia.org/wiki/Apache_Parquet?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork26766988-2022-01-01), [ORC](https://en.wikipedia.org/wiki/Apache_ORC?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork26766988-2022-01-01) or [RCFile](https://en.wikipedia.org/wiki/RCFile?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork26766988-2022-01-01) emerged. Convert the CSV data to PARQUET before uploading it to Cloud Object Store.
Again, [CLAIMED](https://github.com/IBM/claimed/tree/master/component-library) provides a component as jupyter notebook
exactly doing that. Please explore the notebook under **component-library/transform/spark-csv-to-parquet.ipynb**


In [None]:
!ipython ./claimed/component-library/transform/spark-csv-to-parquet.ipynb data_dir=./data/

### Condense parquet file

The "spark-condense-parquet.ipynb" component
in the *./claimed/component-library/transform/* folder condense the parquet file.


In [7]:
!ipython ./claimed/component-library/transform/spark-condense-parquet.ipynb data_dir=./data/

[22;0t]0;IPython: labs/BD0231EN37
Starting installation...
Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)
22/11/04 20:10:14 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

### Upload file to Cloud Object Storage

Upload the parquet file created to Cloud Object Store where is it
stored at the lowest possible cost and made available to others.

<span style="color:red">Please note: The endpoint you need to set is not the "endpoints" from the service credentials section but from Task D Step 11</span>


In [9]:
%%bash
export access_key_id='access_key_id=access_key_id'
export secret_access_key='secret_access_key=secret_access_key'
export endpoint='endpoint=https://control.cloud-object-storage.cloud.ibm.com/v2/endpoints'
export bucket_name='bucket_name=cloud-object-storage-00-cos-standard-rbw'
export source_file='source_file=data_condensed.parquet'
export destination_file='destination_file=data.parquet'
export data_dir='data_dir=./data/'
ipython ./claimed/component-library/output/upload-to-cos.ipynb $access_key_id $secret_access_key $endpoint $bucket_name $source_file $destination_file $data_dir

[22;0t]0;IPython: labs/BD0231EN[0;31m---------------------------------------------------------------------------[0m
[0;31mPartialCredentialsError[0m                   Traceback (most recent call last)
[0;32m/resources/labs/BD0231EN/claimed/component-library/output/upload-to-cos.ipynb[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0ms3[0m[0;34m.[0m[0mput[0m[0;34m([0m[0mdata_dir[0m [0;34m+[0m [0msource_file[0m[0;34m,[0m [0mbucket_name[0m [0;34m+[0m [0;34m'/'[0m [0;34m+[0m [0mdestination_file[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m~/conda/envs/python/lib/python3.7/site-packages/fsspec/asyn.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m    109[0m     [0;32mdef[0m [0mwrapper[0m[0;34m([0m[0;34m*[0m[0margs[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m    110[0m         [0mself[0m [0;34m=[0m [0mobj[0m [0;32mor[0m [0margs[0m[0;34m[[0m[0;36m0[0m



CalledProcessError: Command 'b"export access_key_id='access_key_id=RqLKvB-VEfzcxjnE-F7y9Ldpe6yGdJ1-5vp2qax2PqCM'\n#export secret_access_key='secret_access_key=a/331d3e9b776c45b893f18a4414b97945:f80723d7-3c56-4e09-bdd3-982bf4c9bbfa'\nexport endpoint='endpoint=https://control.cloud-object-storage.cloud.ibm.com/v2/endpoints'\nexport bucket_name='bucket_name=cloud-object-storage-00-cos-standard-rbw'\nexport source_file='source_file=data_condensed.parquet'\nexport destination_file='destination_file=data.parquet'\nexport data_dir='data_dir=./data/'\nipython ./claimed/component-library/output/upload-to-cos.ipynb $access_key_id $secret_access_key $endpoint $bucket_name $source_file $destination_file $data_dir\n"' returned non-zero exit status 1.