# ETL scaleout with EMR
> when you have money but don't have enough device to process your data, it's time to use EMR

## 🌌 Set AWS Credentials
> This notebook assumes that you have already set your AWS credentials in your local machine. If not, please follow the steps below to set your AWS credentials.

```bash
aws configure
    - key: <your access key>
    - secret: <your secret key>
    - region: <your region>
aws configure set aws_session_token <your session token>
```

In [1]:
from dataverse.utils.api import aws_check_credentials 

# check aws credentials
# NOTE: `True` means credentials are valid
if aws_check_credentials() == True:
    print("AWS credentials are valid")
else:
    raise Exception("AWS credentials are invalid")

AWS credentials are valid


## 🌌 Set up Temporary Data & Environment
> Here you don't need to prepare any data. We will create a temporary data and set temporary environment for you.

### 🌠 Create Temporary Folder at Local & AWS S3

In [2]:
import tempfile
import uuid

from dataverse.utils.api import aws_s3_upload
from dataverse.utils.api import aws_s3_create_bucket


# create temp local & s3 path
tmp_folder = tempfile.TemporaryDirectory()
tmp_bucket = uuid.uuid4().hex

aws_s3_create_bucket(bucket=tmp_bucket)

### 🌠 Create Temporary Data and upload to Local & AWS S3
> Data will be duplicated

In [3]:
import os
import pandas as pd
from dataverse.utils.api import aws_s3_upload


# create sample data and upload to s3
sample_path = os.path.join(tmp_folder.name, 'duplicate.json')

# create ufl data that has duplication
ufl = [
    {'text': "random text\nduplication"},
    {'text': "fixed text\nduplication"},
    {'text': "fixed text\nduplication\nDUPLICATION"},
]
df = pd.DataFrame(ufl)
df.to_parquet(sample_path)

bucket = aws_s3_upload(
    bucket=tmp_bucket,
    key='duplicate.json',
    local_path=sample_path,
)

### 🌠 Temporary Dynamic ETL
> To show you that you can add temporal dynamic ETL 

In [4]:
%%writefile dynamic_etl.py
from dataverse.etl import register_etl
from pyspark.rdd import RDD
from pyspark.sql import DataFrame


@register_etl
def test___add___one(spark, data, subset='text', *args, **kwargs):
    if isinstance(data, DataFrame):
        data = data.rdd
        data = data.map(lambda row: row.asDict())

    def _add_one(row):
        row[subset] = row[subset] + '1'
        return row

    data = data.map(_add_one)

    return data

Writing dynamic_etl.py


### 🌠 Create Temporary Config
- load parquet from s3
- exact deduplicate by line splitted by newline
- add `1` text at the end of each data `text`
- save as parquet to s3

In [5]:
from dataverse.config import Config
from omegaconf import OmegaConf

load_path = f"s3a://{tmp_bucket}/duplicate.json"
save_path = f"s3a://{tmp_bucket}/deduplicate.parquet"

config = Config.default()
config.etl.append({
    'name': 'data_ingestion___parquet___pq2ufl',
    'args': {
        'path': load_path,
        'repartition': 1
    }}
)
config.etl.append({'name': 'deduplication___common_crawl___exact_line'})
config.etl.append({'name': 'test___add___one'})
config.etl.append({
    'name': 'data_load___parquet___ufl2parquet',
    'args': {'save_path': save_path}})

print(OmegaConf.to_yaml(config))

Detected Dataverse Bucket: dataverse-dv42-d853ea88-c87d-486f-b3b5-d780203bc262
spark:
  master: local[10]
  appname: default
  driver:
    memory: 8G
    maxResultSize: 2G
  executor:
    memory: 1G
  local:
    dir: /root/.cache/dataverse/tmp
  ui:
    port: 4040
etl:
- name: data_ingestion___parquet___pq2ufl
  args:
    path: s3a://581f4bedcaf24703b248e73d4ecefabd/duplicate.json
    repartition: 1
- name: deduplication___common_crawl___exact_line
- name: test___add___one
- name: data_load___parquet___ufl2parquet
  args:
    save_path: s3a://581f4bedcaf24703b248e73d4ecefabd/deduplicate.parquet



## 🌌 ETLPipeline with `Local`
> We will test our ETL pipeline with local machine first

### 🌠 Import `dynamic_etl.py` to add custom ETL

In [5]:
# you can import before running the etl
import dynamic_etl

Detected Dataverse Bucket: dataverse-dv42-d853ea88-c87d-486f-b3b5-d780203bc262


### 🌠 run ETL Pipeline with Local machine
> as the config specified

- we will load data from s3
- exact deduplicate by line splitted by newline
- add `1` text at the end of each data `text`
- and save as parquet to s3

In [7]:
from dataverse.etl import ETLPipeline

etl_pipeline = ETLPipeline()
spark, data = etl_pipeline.run(config=config)

spark conf is set with [ temporary ] S3 credentials
:: loading settings :: url = jar:file:/data/project/private/ducky/anaconda3/envs/llm/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-bbfd8d7f-e9d3-48d9-b3a0-6ac07189c03d;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.592 in central
:: resolution report :: resolve 128ms :: artifacts dl 4ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.592 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.4 from central in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
	:: evicted modules:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 by [com.amazonaws#aws-java-sdk-bundle;1.12.592] in [default]
	---------------------------------------------------------------------
	|      

### 🌠 download data from s3 and check the result

In [8]:
# aws s3 path
print(save_path)

s3a://581f4bedcaf24703b248e73d4ecefabd/deduplicate.parquet


In [9]:
from dataverse.utils.api import aws_s3_path_parse
from dataverse.utils.api import aws_s3_download


bucket, key = aws_s3_path_parse(save_path)
aws_s3_download(
    bucket=bucket,
    key=key,
    local_path=os.path.join(tmp_folder.name, 'deduplicate.parquet'),
)
pd.read_parquet(os.path.join(tmp_folder.name, 'deduplicate.parquet'))

Unnamed: 0,text
0,random text\nduplication1
1,fixed text1


### 🌠 Remove Result at local & AWS S3

In [10]:
import shutil
from dataverse.utils.api import aws_s3_delete

# remove saved deduplicate.parquet
shutil.rmtree(os.path.join(tmp_folder.name, 'deduplicate.parquet'))
aws_s3_delete(bucket=tmp_bucket, key='deduplicate.parquet')

## 🌌 ETLPipeline with `EMR`
> Works good? Let's scale out with EMR!


### 🌠 run ETL Pipeline with EMR Machine
> add `emr=True` to ETL pipeline. that's all! Auto handle EMR cluster for you!


- set `verbose=True` to see the log of EMR cluster
- return value `data` will be returned as config set by `Dataverse` EMR Manager

```python
# before - local
spark, data = etl_pipeline(config)

# after - EMR
spark, config = etl_pipeline(config, emr=True)
```

In [11]:
from dataverse.etl import ETLPipeline

etl_pipeline = ETLPipeline()
spark, config = etl_pipeline.run(config=config, emr=True)

Default instance type is [ c5.xlarge ]
 vCPU: 4
 Memory: 8192
 Price: 0.088100

[ Dataverse ] step status: COMPLETED. Done.
DependencyViolation occured when terminating EMR cluster. Retrying one more time


In [None]:
from omegaconf import OmegaConf

print(OmegaConf.to_yaml(config))

### 🌠 download data from s3 and check the result

In [15]:
from dataverse.utils.api import aws_s3_path_parse
from dataverse.utils.api import aws_s3_download


bucket, key = aws_s3_path_parse(save_path)
aws_s3_download(
    bucket=bucket,
    key=key,
    local_path=os.path.join(tmp_folder.name, 'deduplicate.parquet'),
)
pd.read_parquet(os.path.join(tmp_folder.name, 'deduplicate.parquet'))

Unnamed: 0,text
0,random text\nduplication1
1,fixed text1


### 🌠 Remove Result at local & AWS S3

In [16]:
import shutil
from dataverse.utils.api import aws_s3_delete

# remove saved deduplicate.parquet
shutil.rmtree(os.path.join(tmp_folder.name, 'deduplicate.parquet'))
aws_s3_delete(bucket=tmp_bucket, key='deduplicate.parquet')

## 🌌 Set `EMR` custom config
> Wanna customize your EMR cluster? Let's do it!

```python
from dataverse.config import Config

# if you have your own EMR cluster, you can set your own EMR cluster config
config = Config.default(emr=True)
config.emr.id = 'j-XXXXXXXXXXXXX'(your emr cluster id)
```

In [6]:
from dataverse.config import Config
from omegaconf import OmegaConf

load_path = f"s3a://{tmp_bucket}/duplicate.json"
save_path = f"s3a://{tmp_bucket}/deduplicate.parquet"

# TODO: add `emr=True` to get the emr config
# =========================================
config = Config.default(emr=True)
# =========================================

config.etl.append({
    'name': 'data_ingestion___parquet___pq2ufl',
    'args': {
        'path': load_path,
        'repartition': 1
    }}
)
config.etl.append({'name': 'deduplication___common_crawl___exact_line'})
config.etl.append({'name': 'test___add___one'})
config.etl.append({
    'name': 'data_load___parquet___ufl2parquet',
    'args': {'save_path': save_path}})

# TODO: add `emr=True` to get the emr config
# =========================================
config.emr.core_instance.count = 5

# TODO: there are more config options for emr
#       check `dataverse.config.Config.default`
# =========================================

print(OmegaConf.to_yaml(config))

spark:
  master: local[10]
  appname: default
  driver:
    memory: 8G
    maxResultSize: 2G
  executor:
    memory: 1G
  local:
    dir: /root/.cache/dataverse/tmp
  ui:
    port: 4040
etl:
- name: data_ingestion___parquet___pq2ufl
  args:
    path: s3a://576768809f8a4181b034ef7921613d41/duplicate.json
    repartition: 1
- name: deduplication___common_crawl___exact_line
- name: test___add___one
- name: data_load___parquet___ufl2parquet
  args:
    save_path: s3a://576768809f8a4181b034ef7921613d41/deduplicate.parquet
emr:
  id: null
  working_dir: null
  name: dataverse_emr
  release: emr-6.15.0
  idle_timeout: 3600
  master_instance:
    type: null
  core_instance:
    type: null
    count: 5
  task_instance:
    type: null
    count: 0
  auto_generated: null
  role:
    ec2:
      name: null
      policy_arns: null
    emr:
      name: null
      policy_arns: null
  instance_profile:
    name: null
    ec2_role: null
  vpc:
    id: null
  subnet:
    id: null
    public_id: null
    

In [None]:
from dataverse.etl import ETLPipeline

etl_pipeline = ETLPipeline()
spark, config = etl_pipeline.run(config=config, emr=True)

### 🌠 download data from s3 and check the result

In [8]:
from dataverse.utils.api import aws_s3_path_parse
from dataverse.utils.api import aws_s3_download


bucket, key = aws_s3_path_parse(save_path)
aws_s3_download(
    bucket=bucket,
    key=key,
    local_path=os.path.join(tmp_folder.name, 'deduplicate.parquet'),
)
pd.read_parquet(os.path.join(tmp_folder.name, 'deduplicate.parquet'))

Unnamed: 0,text
0,random text\nduplication1
1,fixed text1


## 🌌 Remove Temporary Data & Environment
> it's time to clean up

In [9]:
from dataverse.utils.api import aws_s3_delete
from dataverse.utils.api import aws_s3_delete_bucket

!rm dynamic_etl.py

# remove temp folder
tmp_folder.cleanup()

# remove temp bucket
aws_s3_delete(bucket=tmp_bucket, key='')
aws_s3_delete_bucket(bucket=tmp_bucket)