In [2]:
%load_ext autoreload
%autoreload 2

import findspark
import pyspark
import pyspark.sql.functions as sqlFunctions
# import matplotlib.pyplot as plt
import config
import summary
import features

from operator import add
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler, StandardScaler
from etl import CriteoDataSets, CriteoData

# %matplotlib inline

In [3]:
data = CriteoDataSets(config.SPARK_CONTEXT, config.SPARK_SQL_CONTEXT)
train = data.train if not config.DEBUG else data.debug
train_3m = data.train_5m if not config.DEBUG else data.debug

In [4]:
# Label data histogram
summary.label_histogram(train)

([0, 1, 2], [77337, 22663])

In [5]:
def pretty_print_bin(bins, index):
    return "{:0,.3f} - {:0,.0f}".format(bins[i], bins[i + 1])


for col_num, histogram in summary.int_columns_histograms_iter(train):
    bins, counts = histogram
    print("Column %d:\n" % col_num)
    for i, count in enumerate(counts):
        print ("%20s: %s\n" % (pretty_print_bin(bins, i), "{:,}".format(count)))
    print("")

Column 2:

          0.000 - 56: 55,236

        55.600 - 111: 295

       111.200 - 167: 34

       166.800 - 222: 10

       222.400 - 278: 5

       278.000 - 334: 3

       333.600 - 389: 2

       389.200 - 445: 0

       444.800 - 500: 1

       500.400 - 556: 1


Column 3:

      -2.000 - 1,850: 98,530

   1,850.400 - 3,703: 1,397

   3,702.800 - 5,555: 52

   5,555.200 - 7,408: 12

   7,407.600 - 9,260: 7

  9,260.000 - 11,112: 0

 11,112.400 - 12,965: 1

 12,964.800 - 14,817: 0

 14,817.200 - 16,670: 0

 16,669.600 - 18,522: 1


Column 4:

       0.000 - 6,554: 80,863

  6,553.500 - 13,107: 11

 13,107.000 - 19,660: 1

 19,660.500 - 26,214: 20

 26,214.000 - 32,768: 0

 32,767.500 - 39,321: 0

 39,321.000 - 45,874: 0

 45,874.500 - 52,428: 0

 52,428.000 - 58,982: 1

 58,981.500 - 65,535: 2


Column 5:

          0.000 - 42: 79,210

         41.700 - 83: 1,178

        83.400 - 125: 44

       125.100 - 167: 8

       166.800 - 208: 12

       208.500 - 250: 6

       250.200 

In [6]:
categorical_counts = {col_name: counts for (col_name, counts) in summary.cat_column_counts_iter(train)}

In [None]:
if config.DEBUG:
    for col_name in train.categorical_column_names:
        counts = categorical_counts[col_name]
        counts.show()

In [7]:
if config.DEBUG:
    column_distinct_counts = {col_name: summary.column_distinct_count(train, col_name) 
                              for col_name in train.df.columns}

In [8]:
# Distinct value count for categorical features
if config.DEBUG:
    for col_name in train.categorical_column_names:
        count = "{:,}".format(column_distinct_counts.get(col_name))
        print("Column %s: %s distinct values" % (col_name, count))

Column C1: 541 distinct values
Column C2: 497 distinct values
Column C3: 43,870 distinct values
Column C4: 25,184 distinct values
Column C5: 145 distinct values
Column C6: 12 distinct values
Column C7: 7,623 distinct values
Column C8: 257 distinct values
Column C9: 3 distinct values
Column C10: 10,997 distinct values
Column C11: 3,799 distinct values
Column C12: 41,312 distinct values
Column C13: 2,796 distinct values
Column C14: 26 distinct values
Column C15: 5,238 distinct values
Column C16: 34,617 distinct values
Column C17: 10 distinct values
Column C18: 2,548 distinct values
Column C19: 1,303 distinct values
Column C20: 4 distinct values
Column C21: 38,618 distinct values
Column C22: 11 distinct values
Column C23: 14 distinct values
Column C24: 12,335 distinct values
Column C25: 51 distinct values
Column C26: 9,527 distinct values


In [9]:
integer_column_stats = {name: stats for (name, stats) in summary.integer_column_stats_iter(train)}

In [10]:
def pretty_print_stats(stats):
    return "\n".join(["\t{}: {:0,.3f}".format(key, val) for (key, val) in stats.items()])

if config.DEBUG:
    for col_name in train.integer_column_names:
        stats = integer_column_stats[col_name]
        print("%s: %s\n" % (col_name, pretty_print_stats(stats)))

I1: 	skewness: 13.045
	kurtosis: 373.759
	stddev: 10.451
	mean: 3.769

I2: 	skewness: 7.264
	kurtosis: 100.708
	stddev: 401.523
	mean: 112.864

I3: 	skewness: 74.676
	kurtosis: 7,426.228
	stddev: 538.819
	mean: 40.745

I4: 	skewness: 5.891
	kurtosis: 115.569
	stddev: 10.836
	mean: 8.280

I5: 	skewness: 9.146
	kurtosis: 115.043
	stddev: 65,797.898
	mean: 17,592.599

I6: 	skewness: 11.409
	kurtosis: 251.298
	stddev: 371.776
	mean: 139.685

I7: 	skewness: 42.413
	kurtosis: 4,233.619
	stddev: 65.460
	mean: 15.222

I8: 	skewness: 68.854
	kurtosis: 5,577.523
	stddev: 46.542
	mean: 13.575

I9: 	skewness: 9.010
	kurtosis: 163.166
	stddev: 286.416
	mean: 125.295

I10: 	skewness: 1.125
	kurtosis: 2.495
	stddev: 0.677
	mean: 0.620

I11: 	skewness: 6.399
	kurtosis: 66.442
	stddev: 4.630
	mean: 2.400

I12: 	skewness: 42.705
	kurtosis: 3,362.307
	stddev: 5.328
	mean: 0.938

I13: 	skewness: 67.003
	kurtosis: 5,899.185
	stddev: 52.045
	mean: 11.608



In [7]:
row_count = summary.row_count(train)
print("Row count: %s" % "{:,}".format(row_count))

Row count: 100,000


In [33]:
from pyspark.sql import Row
from pyspark.sql.types import DoubleType

col_name_test = "C2"

cc = categorical_counts[col_name_test]
cc.select([cc[col_name_test], cc["count"].cast(DoubleType())]).map(lambda r: Row(count=r["count"] / row_count)).take(100)

[Row(count=0.12895),
 Row(count=0.06826),
 Row(count=0.04043),
 Row(count=0.03724),
 Row(count=0.03269),
 Row(count=0.03268),
 Row(count=0.02903),
 Row(count=0.02366),
 Row(count=0.01988),
 Row(count=0.01892),
 Row(count=0.01757),
 Row(count=0.01735),
 Row(count=0.01701),
 Row(count=0.01408),
 Row(count=0.01392),
 Row(count=0.01272),
 Row(count=0.01142),
 Row(count=0.01138),
 Row(count=0.01126),
 Row(count=0.01087),
 Row(count=0.00991),
 Row(count=0.00986),
 Row(count=0.00966),
 Row(count=0.00959),
 Row(count=0.00858),
 Row(count=0.0081),
 Row(count=0.00807),
 Row(count=0.00714),
 Row(count=0.00686),
 Row(count=0.0068),
 Row(count=0.00668),
 Row(count=0.00644),
 Row(count=0.00612),
 Row(count=0.00595),
 Row(count=0.00582),
 Row(count=0.00581),
 Row(count=0.00483),
 Row(count=0.00455),
 Row(count=0.00444),
 Row(count=0.00437),
 Row(count=0.00432),
 Row(count=0.00427),
 Row(count=0.00422),
 Row(count=0.00398),
 Row(count=0.00393),
 Row(count=0.00385),
 Row(count=0.00382),
 Row(count=0.00

In [12]:
df_counts = features.join_column_counts(train, categorical_counts)

In [1]:
df_counts.show(1)

NameError: name 'df_counts' is not defined

In [19]:
# df_counts.map(lambda row: row.C1_count / row_count).take(1)
# from pyspark.sql.types import DoubleType
# from pyspark.sql.functions import udf

# def make_rate_udf(denominator):
#     return udf(lambda val: val / denominator)

# rate = make_rate_udf(row_count)

In [18]:
# sqlc = config.SPARK_SQL_CONTEXT
# sqlc.registerFunction("rate", rate, DoubleType())

In [20]:
# df_counts.select(rate(df_counts["C1_count"]).alias('C1_rate')).show()
# df_counts.select("C1_count")

Py4JJavaError: An error occurred while calling o1103.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 611.0 failed 1 times, most recent failure: Lost task 1.0 in stage 611.0 (TID 45200, localhost): java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.lang.String.substring(String.java:1969)
	at java.lang.String.subSequence(String.java:2003)
	at java.util.regex.Matcher.getSubSequence(Matcher.java:1294)
	at java.util.regex.Matcher.group(Matcher.java:541)
	at org.apache.spark.network.util.JavaUtils.parseByteString(JavaUtils.java:222)
	at org.apache.spark.network.util.JavaUtils.byteStringAsBytes(JavaUtils.java:255)
	at org.apache.spark.util.Utils$.byteStringAsBytes(Utils.scala:1001)
	at org.apache.spark.SparkConf.getSizeAsBytes(SparkConf.scala:249)
	at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:160)
	at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1179)
	at org.apache.spark.storage.BlockManager$$anonfun$9.apply(BlockManager.scala:659)
	at org.apache.spark.storage.BlockManager$$anonfun$9.apply(BlockManager.scala:659)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:91)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
	at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
	at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
	at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
	at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125)
	at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537)
	at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544)
	at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414)
	at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1413)
	at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138)
	at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413)
	at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495)
	at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171)
	at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.lang.String.substring(String.java:1969)
	at java.lang.String.subSequence(String.java:2003)
	at java.util.regex.Matcher.getSubSequence(Matcher.java:1294)
	at java.util.regex.Matcher.group(Matcher.java:541)
	at org.apache.spark.network.util.JavaUtils.parseByteString(JavaUtils.java:222)
	at org.apache.spark.network.util.JavaUtils.byteStringAsBytes(JavaUtils.java:255)
	at org.apache.spark.util.Utils$.byteStringAsBytes(Utils.scala:1001)
	at org.apache.spark.SparkConf.getSizeAsBytes(SparkConf.scala:249)
	at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:160)
	at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1179)
	at org.apache.spark.storage.BlockManager$$anonfun$9.apply(BlockManager.scala:659)
	at org.apache.spark.storage.BlockManager$$anonfun$9.apply(BlockManager.scala:659)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:91)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 32837)
----------------------------------------


Traceback (most recent call last):
  File "/usr/lib/python2.7/SocketServer.py", line 295, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python2.7/SocketServer.py", line 321, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python2.7/SocketServer.py", line 334, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python2.7/SocketServer.py", line 649, in __init__
    self.handle()
  File "/home/sean/bin/spark/current/python/pyspark/accumulators.py", line 235, in handle
    num_updates = read_int(self.rfile)
  File "/home/sean/bin/spark/current/python/pyspark/serializers.py", line 545, in read_int
    raise EOFError
EOFError


# TODO

In [9]:
# Pull just the mean values from the integer column stats
integer_column_means = {col_name: round(integer_column_stats[col_name]["mean"])
                        for col_name in train_3m.integer_column_names}

In [None]:
# Replace nulls with averages for the column before building feature array
train_filled_df = train_3m.df.fillna(integer_column_means)

train_3m.df.select(train_3m.integer_column_names).show(10)
train_filled_df.select(train_3m.integer_column_names).show(10)
train_filled_df.show()

In [None]:
vecAssembler = VectorAssembler(inputCols=target_df_ints.select(int_column_names).columns, outputCol="features")
target_df_ints = vecAssembler.transform(target_df_ints)
target_df_ints.show(5)
target_df.show(5)

# target_df.unionAll(int_featutes_df.select('features'))
# int_featutes_df.head(5)

# target_df.show(5)

# target_df.withColumn('features', vecAssembler.transform(target_df_ints).features)

# int_rows = target_df_ints.map(lambda row: row.asDict().values()).map(Vectors.dense).map(Row)
# int_features_rdd.take(5)

# int_features_df = sqlContext.createDataFrame(int_features_rdd)

# column_scaler_models = {
#     col_name: standard_scale_column(target_df, col_name) 
#     for i, col_name in enumerate(target_df.columns)
#     if is_integer_col_num(i + 1)
# }



In [None]:
def standard_scale_column(df, col_name):
    output_col_name = "%s%s" % (col_name, "_scaled")
    scaler = StandardScaler(inputCol=col_name, outputCol=output_col_name)
    return scaler.fit(df)

In [None]:
int_sscaler_model = standard_scale_column(target_df_ints, "features")

In [None]:
target_df_ints.select("features").show(5)
int_sscaler_model.transform(target_df_ints).select("features_scaled").show(5)