In [None]:
# download training data from https://www.kaggle.com/c/avazu-ctr-prediction/data
 
# upload the data to s3
# !aws s3 cp --profile ml_user ~/SageMaker/mastering-ml-on-aws/chapter4/train.gz s3://mastering-ml-aws/chater4/data/train.gz

In [1]:
s3_train_path = 's3://mastering-ml-aws/chapter4/data/train.gz'

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1548514458454_0001,pyspark,idle,,,✔


SparkSession available as 'spark'.


In [4]:
ctr_df = spark.read.csv(s3_train_path, header=True, inferSchema=True)

VBox()

In [9]:
ctr_df = ctr_df.repartition(100).cache()

VBox()

In [15]:
ctr_df.write.parquet('s3://mastering-ml-aws/chapter4/parquet/')

VBox()

In [6]:
summary_df = ctr_df.describe()

VBox()

In [7]:
summary_df.show()

VBox()

+-------+--------------------+-------------------+--------------------+------------------+-------------------+--------+-----------+-------------+--------+----------+------------+---------+---------+------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+-----------------+
|summary|                  id|              click|                hour|                C1|         banner_pos| site_id|site_domain|site_category|  app_id|app_domain|app_category|device_id|device_ip|device_model|       device_type|  device_conn_type|               C14|              C15|               C16|               C17|               C18|               C19|               C20|              C21|
+-------+--------------------+-------------------+--------------------+------------------+-------------------+--------+-----------+-------------+--------+----------+------------+---------+---------+--

In [1]:
ctr_df = spark.read.parquet('s3://mastering-ml-aws/chapter4/parquet/')

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1549485056937_0001,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import ChiSqSelector

def categorical_one_hot_encoding_stages(columns):
    indexers = [StringIndexer(inputCol=column, outputCol=column + "_index", handleInvalid='keep') for column in columns]
    encoders = [OneHotEncoder(inputCol=column + "_index", outputCol=column + "_encoded") for column in columns]
    return indexers + encoders

def categorical_encoding_stages(columns):
    return [StringIndexer(inputCol=column, outputCol=column + "_encoded", handleInvalid='keep') for column in columns]

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier

categorical_columns = ['C1', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21', 'site_id', 'site_domain',
                       'site_category', 'app_id', 'app_domain',
                       'app_category']  #, 'device_id', 'device_ip', 'device_model']
numerical_columns = ['banner_pos', 'device_type', 'device_conn_type']
encoded_columns = [column + '_encoded' for column in categorical_columns] + numerical_columns

categorical_stages = categorical_one_hot_encoding_stages(categorical_columns)
vector_assembler = VectorAssembler(inputCols=encoded_columns, outputCol="features")
selector = ChiSqSelector(numTopFeatures=100, featuresCol="features",
                         outputCol="selected_features", labelCol="click")
decision_tree = DecisionTreeClassifier(labelCol="click", featuresCol="selected_features")

encoding_pipeline = Pipeline(stages=categorical_stages + [vector_assembler, selector, decision_tree])


VBox()

In [7]:
train_df, test_df = ctr_df.randomSplit([0.8, 0.2], seed=17)


VBox()

In [8]:
train_df = train_df.repartition(200).cache()


VBox()

In [9]:
pipeline_model = encoding_pipeline.fit(train_df)


VBox()

In [11]:
train_transformed = pipeline_model.transform(train_df)
train_transformed.write.parquet("s3://mastering-ml-aws/chapter4/chi-training-vector-sampled/")


VBox()

In [10]:
pipeline_model.save("s3://mastering-ml-aws/chapter4/pipeline-sampled")


VBox()

In [12]:
test_transformed = pipeline_model.transform(test_df)
test_transformed.write.parquet("s3://mastering-ml-aws/chapter4/chi-test-vector-sampled/")


VBox()

In [14]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="click")
evaluator.evaluate(pipeline_model.transform(test_df, {evaluator.metricName: "areaUnderROC"}))


VBox()

0.49268198159721616

In [18]:
train_transformed = pipeline_model.transform(train_df)


VBox()

In [19]:
train_transformed.select(["click"] + encoded_columns).write.csv('s3://mastering-ml-aws/chapter4/training-vector/')


VBox()

In [20]:
test_transformed = pipeline_model.transform(test_df)
test_transformed.select(["click"] + encoded_columns).repartition(5).write.csv(
    's3://mastering-ml-aws/chapter4/test-vector/')


VBox()

In [1]:
training_df_with_features = spark.read.parquet('s3://mastering-ml-aws/chapter4/chi-training-vector-sampled/')



VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1549632706913_0005,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [5]:
training_df_with_features.select("click", "selected_features").show(5)


VBox()

+-----+-----------------+
|click|selected_features|
+-----+-----------------+
|    0|      (100,[],[])|
|    0| (100,[13],[1.0])|
|    1| (100,[82],[1.0])|
|    1| (100,[18],[1.0])|
|    0|  (100,[4],[1.0])|
+-----+-----------------+
only showing top 5 rows

In [9]:
training_df_with_features.select("click", "selected_features").show(5)


VBox()

+-----+-----------------+
|click|selected_features|
+-----+-----------------+
|    0|      (100,[],[])|
|    0| (100,[13],[1.0])|
|    1| (100,[82],[1.0])|
|    1| (100,[18],[1.0])|
|    0|  (100,[4],[1.0])|
+-----+-----------------+
only showing top 5 rows

In [36]:
def deconstruct_vector(row):
    arr = row['selected_features'].toArray()
    return tuple([row['click']] + arr.tolist())

df_for_csv = training_df_with_features.select("click", "selected_features").rdd.map(deconstruct_vector).toDF()
df_for_csv.write.csv('s3://mastering-ml-aws/chapter4/training-vector-csv/', header=False)


VBox()

In [37]:
test_df_with_features = spark.read.parquet('s3://mastering-ml-aws/chapter4/chi-test-vector-sampled/')
df_for_csv = test_df_with_features.select("click", "selected_features").rdd.map(deconstruct_vector).toDF()
df_for_csv.write.csv('s3://mastering-ml-aws/chapter4/test-vector-csv/', header=False)


VBox()

In [4]:
def deconstruct_vector_no_label(row):
    arr = row['selected_features'].toArray()
    return tuple(arr.tolist())

test_df_with_features = spark.read.parquet('s3://mastering-ml-aws/chapter4/chi-test-vector-sampled/')
df_for_csv = test_df_with_features.select("selected_features").rdd.map(deconstruct_vector_no_label).toDF().repartition(
    100)
df_for_csv.write.csv('s3://mastering-ml-aws/chapter4/test-vector-csv-no-label/', header=False)


VBox()