In [8]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [9]:
# specify substep parameters for interactive run
# this cell will be replaced during job run with the parameters from json within params subfolder
substep_params={
    "seed":42,
    "test_size1":0.3,
     "test_size2": 0.5}

In [10]:
# load pipeline and step parameters - do not edit
from sinara.substep import get_pipeline_params, get_step_params
pipeline_params = get_pipeline_params(pprint=True)
step_params = get_step_params(pprint=True)

**Pipeline params:**


{'X': 'something',
 'env_name': 'user',
 'pipeline_name': 'pipeline',
 'zone_name': 'zone'}




**Step params:**


{'Y': 'something_else'}




In [11]:
#3 define substep interface
from sinara.substep import NotebookSubstep, ENV_NAME, PIPELINE_NAME, ZONE_NAME, STEP_NAME, RUN_ID, ENTITY_NAME, ENTITY_PATH, SUBSTEP_NAME

substep = NotebookSubstep(pipeline_params, step_params, substep_params)

substep.interface(
   
    inputs =
    [
        { STEP_NAME: "data_load", ENTITY_NAME: "california_dataset" }
    ],
    outputs = 
    [
        { ENTITY_NAME: "X_train" },
        { ENTITY_NAME: "X_test" },
        { ENTITY_NAME: "X_val" },
        { ENTITY_NAME: "y_train" },
        { ENTITY_NAME: "y_test" },
        { ENTITY_NAME: "y_val" },
    ]
)

substep.print_interface_info()

substep.exit_in_visualize_mode()

**STEP NAME:**


'data_prep'




**INPUTS:**


[{'user.pipeline.zone.data_load.california_dataset': '/data/home/jovyan/pipeline/zone/data_load/run-25-01-15-070529/california_dataset'}]




**OUTPUTS:**


[{'user.pipeline.zone.data_prep.X_train': '/data/home/jovyan/pipeline/zone/data_prep/run-25-01-15-074559/X_train'},
 {'user.pipeline.zone.data_prep.X_test': '/data/home/jovyan/pipeline/zone/data_prep/run-25-01-15-074559/X_test'},
 {'user.pipeline.zone.data_prep.X_val': '/data/home/jovyan/pipeline/zone/data_prep/run-25-01-15-074559/X_val'},
 {'user.pipeline.zone.data_prep.y_train': '/data/home/jovyan/pipeline/zone/data_prep/run-25-01-15-074559/y_train'},
 {'user.pipeline.zone.data_prep.y_test': '/data/home/jovyan/pipeline/zone/data_prep/run-25-01-15-074559/y_test'},
 {'user.pipeline.zone.data_prep.y_val': '/data/home/jovyan/pipeline/zone/data_prep/run-25-01-15-074559/y_val'}]




In [12]:
#4 run spark
from sinara.spark import SinaraSpark

spark = SinaraSpark.run_session(0)
SinaraSpark.ui_url()

Session is run


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/15 07:46:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [13]:
#5 read inputs 
prev_step_inputs = substep.inputs(step_name="data_load")
df_california = spark.read.parquet(prev_step_inputs.california_dataset).toPandas()
features = df_california.columns.tolist()
features.remove('target')

In [14]:
#6 make something to create artifacts
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
def preproc (df, feats):
    """
    Create pre-processed X_train,X_test,X_val,y_train,y_test,y_val
    Args:
        df(pandas.DataFrame):Input DataFrame
        feat(list): list of feature names
    Returns:
        6 pandas.DataFrame: X_train,X_test,X_val,y_train,y_test,y_val
    """
    df_copy = df.copy()
    df_copy = df_copy.fillna(0)

    scaler = StandardScaler()
    X = df_copy[feats]
    y = df_copy['target'].to_frame()
    
    X_scaled = scaler.fit_transform(X.to_numpy())
    X_scaled= pd.DataFrame(X_scaled, columns=feats)
    
    X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=substep_params['test_size1'], random_state=substep_params['seed'])
    X_test, X_val, y_test, y_val = train_test_split(X_test, y_test, test_size=substep_params['test_size2'], random_state=substep_params['seed'])
    
    return X_train,X_test,X_val,y_train,y_test,y_val

In [16]:
X_train,X_test,X_val,y_train,y_test,y_val = preproc (df = df_california, feats = features)   

In [18]:
X_train_spark = spark.createDataFrame(X_train)
X_test_spark = spark.createDataFrame(X_test)    
X_val_spark = spark.createDataFrame(X_val)

y_train_spark = spark.createDataFrame(y_train)
y_test_spark = spark.createDataFrame(y_test)
y_val_spark = spark.createDataFrame(y_val)

In [60]:
#7 write outputs
outputs = substep.outputs()

X_train_spark.write.parquet(outputs.X_train, mode='overwrite')
X_test_spark.write.parquet(outputs.X_test, mode='overwrite')
X_val_spark.write.parquet(outputs.X_val, mode='overwrite')
y_train_spark.write.parquet(outputs.y_train, mode='overwrite')
y_test_spark.write.parquet(outputs.y_test, mode='overwrite')
y_val_spark.write.parquet(outputs.y_val, mode='overwrite')




In [61]:
#8 stop spark
SinaraSpark.stop_session()