In [1]:
dataset = spark.table("d5")
cols = dataset.columns

In [2]:
display(dataset)

In [3]:
dataset = dataset.drop("item_description")
display(dataset)

In [4]:
dataset.printSchema()

In [5]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

categoricalColumns = ["brand_name", "category_name", "name"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
  # Category Indexing with StringIndexer
  stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
  # Use OneHotEncoder to convert categorical variables into binary SparseVectors
  encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec")
  # Add stages.  These are not run here, but will run all at once later on.
  stages += [stringIndexer, encoder]

In [6]:
from pyspark.sql.types import DoubleType
dataset = dataset.withColumn("price", dataset["price"].cast("double"))
dataset = dataset.withColumn("shipping", dataset["shipping"].cast("double"))
dataset = dataset.withColumn("item_condition_id", dataset["item_condition_id"].cast("double"))
dataset = dataset.withColumn("train_id", dataset["train_id"].cast("double"))


dataset.printSchema()

In [7]:
label_stringIdx = StringIndexer(inputCol = "price", outputCol = "label")
stages += [label_stringIdx]

In [8]:
numericCols = ["item_condition_id", "price", "shipping","train_id"]
assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [9]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(dataset)
dataset = pipelineModel.transform(dataset)

# Keep relevant columns
selectedcols = ["label", "features"] + cols
#dataset = dataset.select(selectedcols)
display(dataset)

In [10]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print trainingData.count()
print testData.count()

In [11]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [12]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)

In [13]:
predictions.printSchema()

In [14]:
selected = predictions.select("label", "prediction", "probability", "price", "item_condition_id")
display(selected)

In [15]:
selected = predictions.select("label", "prediction","price", "category_name")
display(selected)

In [16]:
selected = predictions.select("label", "prediction","price", "shipping")
display(selected)

In [17]:
selected = predictions.select("label", "prediction","price", "item_condition_id")
display(selected)

In [18]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

In [19]:
evaluator.getMetricName()

In [20]:
print lr.explainParams()

In [21]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [22]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)

In [23]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [24]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [25]:
print 'Model Intercept: ', cvModel.bestModel.interceptVector