In [None]:
##### ANOVOS - Data Ingest
Following notebook shows the list of "data ingest" related functions supported under ANOVOS package and how it can be invoked accordingly.
* [Read Dataset](#Read-Dataset)
* [Select Columns](#Select-Columns)
* [Delete Columns](#Delete-Columns)
* [Rename Columns](#Rename-Columns)
* [Recast Columns](#Recast-Columns)
* [Concatenate Datasets](#Concatenate-Datasets)
* [Join Datasets](#Join-Datasets)
* [Write Datasets](#Write-Datasets)

**Setting Spark Session**

In [2]:
from anovos.shared.spark import *

sc.setLogLevel("ERROR")
import warnings
warnings.filterwarnings('ignore')

**Input/Output Path**

In [2]:
inputPath = "../data/income_dataset/csv"
inputPath_parq = "../data/income_dataset/parquet"
inputPath_join = "../data/income_dataset/join"
outputPath = "../output/income_dataset/"

# Read Dataset

- API specification of function **read_dataset** can be found <a href="https://docs.anovos.ai/api/data_ingest/data_ingest.html">here</a>
- Currently supports - csv, parquet, avro

In [3]:
from anovos.data_ingest.data_ingest import read_dataset

In [4]:
df = read_dataset(spark, file_path = inputPath, file_type = "csv",file_configs = {"header": "True", 
                                                                           "delimiter": "," , 
                                                                           "inferSchema": "True"})
df.toPandas().head(5)

Unnamed: 0,ifa,age,workclass,fnlwgt,logfnl,empty,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,income,dt_1,dt_2
0,1a,,State-gov,77516.0,4.889391,,Bachelors,13.0,Never-married,Adm-clerical,Not-in-family,White,Male,2174.0,0.0,40.0,UnitedStates,<=50K,1/8/16 5:59,1/16/16 5:59
1,2a,,Self-emp-not-inc,83311.0,4.920702,,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,13.0,UnitedStates,<=50K,1/8/16 21:09,1/12/16 21:09
2,3a,38.0,Private,215646.0,5.333741,,HS-grad,9.0,Divorced,Handlers-cleaners,Not-in-family,White,Male,0.0,0.0,40.0,UnitedStates,<=50K,3/8/16 2:21,3/20/16 2:21
3,4a,53.0,Private,234721.0,5.370552,,11th,7.0,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0.0,0.0,40.0,UnitedStates,<=50K,3/8/16 6:31,3/14/16 6:31
4,5a,,Private,338409.0,5.529442,,Bachelors,13.0,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0.0,0.0,40.0,Cuba,<=50K,3/8/16 9:45,3/10/16 9:45


# Write Datasets and export feature definitions

A description of feature store related configuration can be found <a href="https://docs.anovos.ai/using-anovos/feature_store.html">here</a>
- API specification of function **generate_feature_description** can be found <a href="https://docs.anovos.ai/api/feature_store/feast_exporter.html">here</a> <br>
- Limitations:
    - repartition for file output needs to be set to 1
    - no incremental updates possible
       

In [None]:
from anovos.feature_store import feast_exporter

In [None]:
#Example 1 - add timestamp columns to df 
entity_config = {
    "name": "income",
    "id_col": "ifa",
    "description": "write_feast_features",
}

file_source_config = {
    "owner": "test@owner.com",
    "description": "data source description",
    "timestamp_col": "event_time",
    "create_timestamp_col": "create_time_col",
}

feature_view_config = {
    "name": "income_view",
    "ttl_in_seconds": 3600000,
    "owner": "view@owner.com",
    "create_timestamps": True,
}

write_feast_features = {
    "entity": entity_config,
    "file_source": file_source_config,
    "feature_view": feature_view_config,
    "file_path": "../feast_repo",
    "service_name": "income_feature_service"
}
# read this from yml file in real world


file_source_config = write_feast_features["file_source"]
df = feast_exporter.add_timestamp_columns(df, file_source_config)

In [32]:
from anovos.data_ingest.data_ingest import write_dataset

In [37]:
write_dataset(df, outputPath, 'parquet',{'repartition':1, 'mode':'overwrite'})

In [None]:
# Example 1 - write feast feature configuration into feast repository
path = os.path.join(write_main["file_path"], "final_dataset", "part*")
filename = glob.glob(path)[0]
feast_exporter.generate_feature_description(df.dtypes, write_feast_features, filename)