# ETL Using Apache Spark

**Estimated time needed:** 20 minutes

The purpose of this lab is to show you how to pull data from a source system, transform it and store it back to the target system.

## Objectives

After completing this lab, you will be able to:

1.  Pull data from the HMP dataset.
2.  Create a Spark Dataframe from the raw data.
3.  Write the Dataframe to CSV.
4.  Convert the CSV data to PARQUET.
5.  Condense PARQUET to a single file.
6.  Upload the Parquet file to Cloud Object Store

But don’t be scared, we provide you with a set of sample notebooks you can modify and hook together. The library where you can get the notebooks from is called CLAIMED – the Component Library for AI, Machine Learning, ETL, and Data Science and is an open-source library available on the [CLAIMED repo](https://github.com/IBM/claimed/tree/master/component-library).

You’ll use [Elyra – a JupyterLab extension](https://elyra.readthedocs.io/en/stable/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork26766988-2021-01-01) for editing the notebooks; and, if you like you can use the [pipeline editor](https://elyra.readthedocs.io/en/latest/getting_started/overview.html?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork26766988-2021-01-01#ai-pipelines-visual-editor) of Elyra to visually join them into a data pipeline.

Elyra is the foundation of the [IBM Watson Studio Pipelines](https://medium.com/ibm-data-ai/automating-the-ai-lifecycle-with-ibm-watson-studio-orchestration-flow-4450f1d725d6?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork26766988-2021-01-01) tool which can be used in the cloud. Feel free to give it a shot as well to experience how a business user would do the job.

We’ll use the HMP dataset. The dataset is publically available [here](https://github.com/wchill/HMP_Dataset).

In previous generation BigData systems, HDFS was the core data store. Nowadays, S3 compatible Cloud Object Storage (COS) is the de-facto standard across clouds and also starts to get traction in local data centers (Ceph, Minio).

So let’s get started with the lab!

## Exercise 1 : Import the CLAIMED library to JupyterLab

1.  In a separate browser tab, please open the CLAIMED component library: [GitHub Link](https://github.com/IBM/claimed/tree/master/component-library)
2.  Don’t hesitate to give us a star (1) :), then please click the Fork button. (2)

(Taking this action enables you to work using your own copy - you then can replace the link to your fork below in the next cell)

![Elyra Github repo](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/labs/images/fp4.png)

Now let's clone (download) the library:


In [1]:
%%bash
rm -Rf claimed
git clone https://github.com/IBM/claimed.git
cd claimed

Cloning into 'claimed'...
Updating files: 100% (432/432), done.


Congratulations, you’ve successfully imported the component library.

## Exercise 2 : Explore the CLAIMED component library

Now it’s time to familiarize yourself a bit with some components (Jupyter Notebooks) in the component library.

1.  In JuypterLab, please go to the file explorer (1), double-click on folder "claimed" and then double click on the folder “component-library” (2)

![jupyerlab exploration](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/labs/images/fp7.png)

2.  Please open the folder called “transform” (1) and open the notebook called spark-csv-to-parquet (2). The prefix “spark” indicates that the notebook is using Apache Spark to perform its task. From the name you can deduce that this component transforms a file from “parquet” to “csv” format. Each notebook starts with a title and description of what it is supposed to do, followed by commands to install library dependencies. Then, a set of parameters this notebook accepts is provided (3), followed by an actual implementation of pulling those parameters from the environment (4). Finally, the actual task is implemented in source code (5).

![jupyerlab exploration](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/labs/images/fp8.png)

3.  Please explore the other components in the library and look at how they are implemented. They serve as cookie-cutters, or examples, for an abundance of daily data science tasks and hopefully you can learn from them.


## Exercise 3 : Start the ETL process

### Task A : Pull the data


Now we need to pull data from a remote github repository and convert it to CSV.
You can open the notebook called *input-hmp.ipynb* in folder *claimed/component-library/input* and run
each cell top down, one by one. Alternatively, the following cell is doing the job as well.
If you run the code by executing the cell below, please open the notebook in parallel and
investigate how it was implemented.

Please note, that that way we can pass a parameter called *sample* which reduces data size and processing
time. Sampling is often done on "Non Big Data" Systems, but in Apache Spark this is not necessary most of the
times because Apache Spark can handle any amount of data - so sampling is done here only for saving
time during the lab.

Note: A cell which creates a lot of output can be switched into scrolling mode. Just right-click into the output canvas (1) and click on “Enable Scrolling for Outputs” (2)

![creating a pipeline](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/labs/images/fp10.png)

3.  After quite some time you should see a folder called “data.csv” in the “data” directory you’ve previously created. Please note that Apache Spark always creates folders containing individual files, one per partition. This is not a problem because Spark doesn’t distinguish between files and folders and threats folders as they were files. (The only way to get a file is to repartition the data frame to one and extract/rename the file inside the folder df = df.repartition(1))


In [None]:
!ipython ./claimed/component-library/input/input-hmp.ipynb data_dir=./data/ sample=0.01

### Task B : Convert from CSV to PARQUET

CSV isn't a memory and storage efficient file format - the same holds for JSON by the way. Therefore,
formats like [Avro](https://en.wikipedia.org/wiki/Apache_Avro?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork26766988-2021-01-01), [Parquet](https://en.wikipedia.org/wiki/Apache_Parquet?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork26766988-2021-01-01), [ORC](https://en.wikipedia.org/wiki/Apache_ORC?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork26766988-2021-01-01) or [RCFile](https://en.wikipedia.org/wiki/RCFile?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork26766988-2021-01-01) emerged. Therefore we convert the CSV data to PARQUET before uploading it to Cloud Object Store.
Again, [CLAIMED](https://github.com/IBM/claimed/tree/master/component-library) provides a component as jupyter notebook
exactly doing that. Please explore the notebook under *component-library/transform/spark-csv-to-parquet.ipynb*. You can either directly run the notebook or execute the following cell.


In [7]:
!ipython ./claimed/component-library/transform/spark-csv-to-parquet.ipynb data_dir=./data/

[22;0t]0;IPython: coursera Data Engineering/Spark
Currently only python 3.6, 3.7 , 3.8 and 3.9 are supported, in case you need a different version please open an issue at https://github.com/IBM/claimed/issues
Python 2.7.16
bash: line 5: [: ==: unary operator expected
bash: line 5: [: ==: unary operator expected
bash: line 14: [: ==: unary operator expected
bash: line 14: [: ==: unary operator expected
[0;31m---------------------------------------------------------------------------[0m
[0;31mCalledProcessError[0m                        Traceback (most recent call last)
[0;32m~/Documents/coursera Data Engineering/Spark/claimed/component-library/transform/spark-csv-to-parquet.ipynb[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0mget_ipython[0m[0;34m([0m[0;34m)[0m[0;34m.[0m[0mrun_cell_magic[0m[0;34m([0m[0;34m'bash'[0m[0;34m,[0m [0;34m''[0m[0;34m,[0m [0;34m'export version=`python --version |awk \'{print $2}\' |awk -F"." \'{print $1$2}\'`\n\necho $vers

### Task C: Condense parquet file

As mentioned before - Apache Spark creates folders instead of files - in the folder, besides some status information, you will see that one sub-file per partition is written. This is handy for parallel processing but sometimes we
want all data to be self-contained in a single file. Therefore, the "spark-condense-parquet.ipynb" component
in the *./claimed/component-library/transform/* folder does this job. Let's execute it using the cell below, but please
also open the notebook to see what it is doing - alternatively you can also run the notebook cell by cell.


In [None]:
!ipython ./claimed/component-library/transform/spark-condense-parquet.ipynb data_dir=./data/

### Task D : Obtain access to Cloud Object Store

1.  Please create an account on [IBM Cloud](https://cloud.ibm.com/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork26766988-2021-01-01) - it is free, doesn't expire and no credit card is required. You also get 25 GB storage on Cloud Object Store (COS) for free. IBM COS is compatible to S3. So what you learn here will work on all S3 compliant storage system in different clouds and also in your local data center.

2.  Once you create and verify your account you can create an S3 compliant Object Storage service by clicking the following [link](https://cloud.ibm.com/objectstorage/create?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork26766988-2021-01-01) - please select the lite plan as it includes 25 GB of free storage

![Create COS](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/labs/images/ibm_cos_create.png)

3.  You can always go back to the service created by accessing the [IBM Cloud Service Resource List](https://cloud.ibm.com/resources?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork26766988-2021-01-01). Just look for an entry in the table under the "Storage" category and click on it.

![Resource List](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/labs/images/ibm_resource_list_cos.png)

4.  Now please click "Create Bucket"

![Create Bucket](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/labs/images/ibm_cos_create_bucket.png)

5.  Select the "Quickly get started" option by clicking the arrow

![Create Bucket\_2](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/labs/images/ibm_cos_create_bucket\_2.png)

6.  Click on "next" twice, then click "View Bucket Configuration"

7.  Here, please note the bucket name, and the location, then click "Service Credentials"

![Create SC](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/labs/images/ibm_cos_location_sc.png)

8.  Click on "New Credential", then under "Advanced Options", please ensure that "Include HMAC Credential" is active. Then click "Add"

![hmac](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/labs/images/hmac.png)

9.  Please open the newly created credentials, an in the "cos_hmac_keys" section, please note down "access_key_id" and "secret_access_key"

![sc](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/labs/images/sc.png)

10. Click on "Endpoints", then select the appropriate location you've obtained four steps before. That way the correct public endpoint is displayed, please note it somewhere for future use.

![endpoint](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/labs/images/endpoint.png)

Congratulations! You've obtained all information to store your date to Cloud Object Storage


### Task E: Upload file to Cloud Object Storage

Now it's time to use the data you've gathered from task B and fill in the following variables in the next cell.
Then please run the cell. This will upload the parquet file you've created to Cloud Object Store where is it
stored at the lowest possible cost and made available to others.

<span style="color:red">Please note: The endpoint you need to set is not the "endpoints" from the service credentials section but from Task D Step 11</span>


> Note :After executing the below code, if you  get an error as:

```
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. 

This behaviour is the source of the following dependency conflicts.aiobotocore 1.4.1 requires botocore<1.20.107,>=1.20.106,
but you have botocore 1.10.62 which is incompatible.
```

Kindly ignore the error and proceed to **TaskF** .Your file will be deployed.


In [10]:
%%bash
export access_key_id='access_key_id=5b0919f8d4aa4c23afe7376c74b9003c'
export secret_access_key='secret_access_key=d99fd47d2ae6cfadc65af35e5429d2418c89f30f71fcd3c7'
export endpoint='endpoint=https://s3.us-south.cloud-object-storage.appdomain.cloud'
export bucket_name='bucket_name=cloud-object-storage-vz-cos-standard-7uq'
export source_file='source_file=data_condensed.parquet'
export destination_file='destination_file=data.parquet'
export data_dir='data_dir=./data/'
ipython ./claimed/component-library/output/upload-to-cos.ipynb $access_key_id $secret_access_key $endpoint $bucket_name $source_file $destination_file $data_dir

  warn('Unknown failure executing file: <%s>' % fname)


[22;0t]0;IPython: coursera Data Engineering/Spark[0;31m---------------------------------------------------------------------------[0m
[0;31mModuleNotFoundError[0m                       Traceback (most recent call last)
[0;32m/usr/local/lib/python3.9/site-packages/IPython/core/interactiveshell.py[0m in [0;36mget_cells[0;34m()[0m
[1;32m   2833[0m             [0;34m"""generator for sequence of code blocks to run"""[0m[0;34m[0m[0;34m[0m[0m
[1;32m   2834[0m             [0;32mif[0m [0mfname[0m[0;34m.[0m[0mendswith[0m[0;34m([0m[0;34m'.ipynb'[0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m-> 2835[0;31m                 [0;32mfrom[0m [0mnbformat[0m [0;32mimport[0m [0mread[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m   2836[0m                 [0mnb[0m [0;34m=[0m [0mread[0m[0;34m([0m[0mfname[0m[0;34m,[0m [0mas_version[0m[0;34m=[0m[0;36m4[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m   2837[0m                 [0;32mif[0m 

### Task F: Double-check if the file arrived at its destination

To make sure everything worked as designed (actually an unnecessary step if you haven't received any errors)
we open the IBM Cloud user interface. Please open the bucket contents as you've learned in Task C and check if
the file arrived as shown below.

![cos_contents](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/labs/images/check_cos.png)

Congratulations, this concludes this lab.


### Task F: (Optional) Pull another file and upload it as well to COS

To make things easier let's first flush the data dir:


In [None]:
!rm -Rf ./claimed/data/*

Now it's time for you to pull some other data set. You can choose anything you like but you can also use
the "input-pardata.ipynb" notebook in folder "./component-library/input". This component pulls the JFK
weather data set as data.csv. From there, please convert this CSV file to PARQUET, condense it and upload
it to Object Storage as "jfk_data.parquet". You can either directly open the notebooks, run it inline this
notebook below or even create a pipeline using the [Elyra Pipeline editor](https://youtu.be/0SWRMnhIZMA?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork26766988-2021-01-01&t=1810).

We have provided an inline solution for you below in case you get stuck.


In [None]:
!ipython ./claimed/component-library/input/input-pardata.ipynb data_dir=./data/

In [None]:
!ipython ./claimed/component-library/transform/spark-csv-to-parquet.ipynb data_dir=./data/

In [None]:
!ipython ./claumed/component-library/transform/spark-condense-parquet.ipynb data_dir=./data/

In [None]:
%%bash
export access_key_id='access_key_id=<your_access_key_id_goes_here>'
export secret_access_key='secret_access_key=<your_secret_access_key_goes_here>'
export endpoint='endpoint=https://<your_endpoint_goes_here>'
export bucket_name='bucket_name=<your_cos_bucket_name_goes_here>'
export source_file='source_file=data_condensed.parquet'
export destination_file='destination_file=jfk_data.parquet'
export data_dir='data_dir=./data/'
ipython ./claimed/component-library/output/upload-to-cos.ipynb $access_key_id $secret_access_key $endpoint $bucket_name $source_file $destination_file $data_dir

## Author(s)

[Romeo Kienzler](https://ibm.com/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork26766988-2021-01-01)

[Karthik Muthuraman](https://ibm.com/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork26766988-2021-01-01)

## Changelog

| Date       | Version | Changed by         | Change Description       |
| ---------- | ------- | ------------------ | ------------------------ |
| 2021-05-12 | 0.1     | Romeo Kienzler     | Initial version created  |
| 2021-06-22 | 1.0     | Karthik Muthuraman | First editorial pass     |
| 2021-05-12 | 1.1     | Romeo Kienzler     | Adjusted for API changes |
| 2021-05-26 | 1.2     | Romeo Kienzler     | Moved to jupyter lab     |
| 2021-05-26 | 1.3     | Romeo Kienzler     | Add optional task        |
| 2021-12-06 | 1.4     | Lakshmi Holla      | Added the warning note   |
|            |         |                    |                          |

## <h3 align="center"> © IBM Corporation 2021. All rights reserved. <h3/>
