# ETL how to run?
> At here we will talk about how to run ETL. There is 2 steps to run ETL.

1. prepare config
2. put config to ETLPipeline

## 🌌 1. prepare config

In [1]:
import os
from pathlib import Path
from dataverse.config import Config 
from omegaconf import OmegaConf

#### Option 1: When you cloned the Dataverse repository
- This method loads the config file from the directory based on the Dataverse repository.
- If you haven't cloned the repository, please follow option 2.

In [2]:
# E = Extract, T = Transform, L = Load
main_path = Path(os.path.abspath('../..'))
E_path = main_path / "./dataverse/config/etl/sample/data_ingestion___sampling.yaml"
T_path = main_path / "./dataverse/config/etl/sample/data_preprocess___dedup.yaml"
L_path = main_path / "./dataverse/config/etl/sample/data_save___hf_obj.yaml"

E_config = Config.load(E_path)
T_config = Config.load(T_path)
L_config = Config.load(L_path)

#### Option 2: When you HAVEN'T Cloned the Dataverse Repository
- With this method, we define each E, T, L config in the shell.
- These configs are exactly the same as each file mentioned above.

In [3]:
E_config = OmegaConf.create({
    'spark': { 
        'appname': 'dataverse_etl_sample',
        'driver': {'memory': '16g'},
    },
    'etl': [
        { 
          'name': 'data_ingestion___test___generate_fake_ufl', 
        },
        {
          'name': 'utils___sampling___random',
          'args': {'sample_n_or_frac': 0.1}
        },
        {
          'name': 'data_save___parquet___ufl2parquet',
          'args': {'save_path': "./sample/sample_ufl.parquet"}
        },
      ]
  })

T_config = OmegaConf.create({
    'spark': { 
        'appname': 'dataverse_etl_sample',
        'driver': {'memory': '16g'},
    },
    'etl': [
        { 
          'name': 'data_ingestion___parquet___pq2raw', 
          'args': {'path': "./sample/sample_ufl.parquet"}
        },
        {
          'name': 'deduplication___minhash___lsh_jaccard',
        },
        {
          'name': 'data_save___parquet___ufl2parquet',
          'args': {'save_path': "./sample/preprocess_ufl.parquet"}
        },
      ]
  })

L_config = OmegaConf.create({
    'spark': { 
        'appname': 'dataverse_etl_sample',
        'driver': {'memory': '16g'},
    },
    'etl': [
        { 
          'name': 'data_ingestion___parquet___pq2raw', 
          'args': {'path': './sample/preprocess_ufl.parquet'}
        },
        {
          'name': 'data_save___huggingface___ufl2hf_obj',
        },
      ]
  })

### 🌠 Extract Config

- load fake generation UFL data
- sample 10% of total data to reduce the size of dataset
- save to parquet `dataverse/sample/sample_ufl.parquet`

In [4]:
print(OmegaConf.to_yaml(E_config))

spark:
  appname: dataverse_etl_sample
  driver:
    memory: 16g
etl:
- name: data_ingestion___test___generate_fake_ufl
- name: utils___sampling___random
  args:
    sample_n_or_frac: 0.1
- name: data_save___parquet___ufl2parquet
  args:
    save_path: ./sample/sample_ufl.parquet



### 🌠 Transform Config

- load parquet `./sample/sample_ufl.parquet`
- deduplicate by `text` column, 15-gram minhash jaccard similarity
- save to parquet `./sample/preprocess_ufl.parquet`

In [5]:
print(OmegaConf.to_yaml(T_config))

spark:
  appname: dataverse_etl_sample
  driver:
    memory: 16g
etl:
- name: data_ingestion___parquet___pq2raw
  args:
    path: ./sample/sample_ufl.parquet
- name: deduplication___minhash___lsh_jaccard
- name: data_save___parquet___ufl2parquet
  args:
    save_path: ./sample/preprocess_ufl.parquet



### 🌠 Load Config

- load parquet `./sample/preprocess_ufl.parquet`
- convert to huggingface dataset and return the object

In [6]:
print(OmegaConf.to_yaml(L_config))

spark:
  appname: dataverse_etl_sample
  driver:
    memory: 16g
etl:
- name: data_ingestion___parquet___pq2raw
  args:
    path: ./sample/preprocess_ufl.parquet
- name: data_save___huggingface___ufl2hf_obj



## 🌌 2. put config to ETLPipeline

In [7]:
from dataverse.etl import ETLPipeline

etl_pipeline = ETLPipeline()

An error occurred (ExpiredToken) when calling the GetCallerIdentity operation: The security token included in the request is expired


In [8]:
# raw -> ufl
etl_pipeline.run(E_config)

# ufl -> dedup -> ufl
etl_pipeline.run(T_config)

# ufl -> hf_obj
spark, dataset = etl_pipeline.run(L_config)

[ No AWS Credentials Found] - Failed to set spark conf for S3


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/15 22:10:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/04/15 22:10:33 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


[ No AWS Credentials Found] - Failed to set spark conf for S3


24/04/15 22:10:38 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
                                                                                

[ No AWS Credentials Found] - Failed to set spark conf for S3


24/04/15 22:10:45 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
                                                                                

In [9]:
dataset

Dataset({
    features: ['id', 'meta', 'name', 'text'],
    num_rows: 14
})

In [10]:
dataset[0]

{'id': 'a3715cee-e252-4360-9a15-93a3fcc832fb',
 'meta': '{"name": "Caitlin Hughes", "age": 55, "address": "517 Cassandra Mountains\\nJamesberg, NM 13313", "job": "Orthoptist"}',
 'name': 'test_fake_ufl',
 'text': 'Necessary miss set choice car hour. Only man interest affect. Cover black protect successful president court memory.'}