# Training with pyspark
In this example, we'll use a previously logged dataset to train a pyspark model and then log the model to Verta

## Loading the dataset
We'll load the daset previously logged into Verta. We only saved the metadata, so we assume the dataset is still there. We could also have used Verta managed datasets to create a snapshot for it.

In [6]:
from verta import Client
client = Client("https://point72.app.verta.ai")
dataset = client.get_or_create_dataset("Census parquet - pyspark example", workspace="p72-mi-data")
dataset_version = dataset.get_latest_version()

set email from environment
set developer key from environment
connection successfully established
got existing Dataset: Census parquet - pyspark example
got existing dataset version: 7ec5763db7685e86c16f950815502d8a48be81bc6a5b8b1e8c3c638ca130bf50


The we load the parquet files into a dataframe.

In [7]:
import functools

dfs = map(spark.read.parquet, dataset_version.get_content().list_paths())
df = functools.reduce(lambda a,b: a.union(b), dfs)

And process the dataframe to create a dataframe that is compatible with SparkML.

In [8]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col
assembler = VectorAssembler(
    inputCols=[x for x in df.columns if x != '>50k'],
    outputCol='features')

df = assembler.transform(df)
df = df.withColumn("label", col(">50k"))
df = df["features", "label"]
df.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label: integer (nullable = true)



## Training the model
We'll just use a simple logistic regression to fit the data.

In [9]:
from pyspark.ml.classification import LogisticRegression

hyperparameters = {
    "maxIter": 10,
    "regParam": 0.3,
    "elasticNetParam": 0.8,
}

lr = LogisticRegression(**hyperparameters)
lrModel = lr.fit(df)
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

Coefficients: (43,[],[])
Intercept: -1.1069221254176718


Then we log the artifact and metadata to Verta.

In [10]:
# Create an experiment run, which is a model instance
project = client.set_project(name="Census Income - pyspark example", workspace="p72-mi-data")
experiment = client.set_experiment(name="Logistic regression")
run = client.set_experiment_run()

# Log the hyperparameters
run.log_hyperparameters(hyperparameters)

# Save the model as an artifact
lrModel.save("spark_model")
run.log_artifact("model", "spark_model") # This will package the folder created as a zip

# Link the dataset to the model
run.log_dataset_version("training", dataset_version)

created new Project: Census Income - pyspark example in workspace: p72-mi-data
created new Experiment: Logistic regression
created new ExperimentRun: Run 62604161195224355382
uploading part 1
upload complete (model)
