In [1]:
from pyspark import SparkConf, SparkContext

def getSparkContext():
    """
    Gets the Spark Context
    """
    conf = (SparkConf()
         .setMaster("local") # run on local
         .setAppName("Logistic Regression") # Name of App
         .set("spark.executor.memory", "1g")) # Set 1 gig of memory
    sc = SparkContext(conf = conf) 
    return sc

In [2]:
sc = getSparkContext()

In [3]:
from pyspark.sql import HiveContext
from pyspark.mllib.clustering import KMeans
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.stat import Statistics
from math import sqrt
import matplotlib.pyplot as plt
from pylab import *
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint
from numpy import array

%matplotlib inline

hiveContext = HiveContext(sc)

#Row(period=1964, serial=1520, model=1, repaircost=11447.252, totalcost=18947.252)

hvt_failure_SDF = hiveContext.sql('SELECT period, serialf FROM hvt_failuref')

# Constructs a Spark DataFrame (SDF) from the table in Hive.
hvt_SDF = hiveContext.sql('SELECT model, serial, inceptionperiod, location FROM hvt')

hvt_operating_SDF= hiveContext.sql('SELECT serialo, period FROM hvt_operatingo')

In [4]:
hvt_operating_SDF.count()

1000000L

In [5]:
hvt_failure_SDF.count()

31954L

In [6]:
hvt_SDF.count()

10000L

In [7]:
hvt_failure_SDF1 = hvt_failure_SDF.filter(hvt_failure_SDF.period < 2010)
hvt_failure_SDF1.count()

12527L

In [8]:
hvt_SDF_ops = hvt_SDF.join(hvt_operating_SDF, hvt_operating_SDF.serialo == hvt_SDF.serial, "left_outer")
#hvt_SDF_ops.show()

In [9]:
hvt_SDF_ops.count()

1000000L

In [10]:
hvt_SDF_ops_age = hvt_SDF_ops.select(hvt_SDF_ops.model, hvt_SDF_ops.serial, hvt_SDF_ops.location, hvt_SDF_ops.inceptionperiod ,hvt_SDF_ops.period - hvt_SDF_ops.inceptionperiod)
#hvt_SDF_ops_age.show()

In [11]:
hvt_SDF_ops_age.count()

1000000L

In [12]:
# Join hvt reference data with the failure data

hvt_SDF_fail = hvt_SDF.join(hvt_failure_SDF, hvt_failure_SDF.serialf == hvt_SDF.serial, "left_outer")
#hvt_SDF_fail.show()

In [13]:
hvt_SDF_fail.count()

31954L

In [14]:
# Converte from period to age at fail
hvt_SDF_fail_age = hvt_SDF_fail.select(hvt_SDF_fail.model, hvt_SDF_fail.serial, hvt_SDF_fail.location, hvt_SDF_fail.inceptionperiod ,hvt_SDF_fail.period - hvt_SDF_fail.inceptionperiod)

#hvt_SDF_fail_age.show()

In [15]:
hvt_SDF_fail_age.count()

31954L

In [16]:
hvt_SDF_ops_age_vector = hvt_SDF_ops_age.map(lambda x: Vectors.dense(x)).cache()
hvt_SDF_fail_age_vector = hvt_SDF_fail_age.map(lambda x: Vectors.dense(x)).cache()


In [17]:
hvt_SDF_ops_age_vector_labeledPoint = hvt_SDF_ops_age_vector.map(lambda x: LabeledPoint(1.0,x)) 
hvt_SDF_fail_age_vector_labeledPoint = hvt_SDF_fail_age_vector.map(lambda x: LabeledPoint(0.0,x))
#hvt_SDF_ops_age_vector_labeledPoint.count()

In [42]:
hvt_SDF_labeledPoint = hvt_SDF_ops_age_vector_labeledPoint + hvt_SDF_fail_age_vector_labeledPoint
hvt_SDF_labeledPoint.count()

1031954

In [19]:
# Build and train the model
parsedData = hvt_SDF_labeledPoint
model = LogisticRegressionWithSGD.train(parsedData)

# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))


Training Error = 0.0309645584978


In [41]:
labelsAndPreds.count()

1031954

In [43]:
# Use the model to predict
model.clearThreshold()
# Test Data
#model.predict([1.0, 31.0, 5.0, 1935.0, 100.0])
#2012	2754	2	42904.893	117904.89
#2	2754	1994	5	1500	-8	5	0	30	3	100
model.predict([2.0, 2754.0, 5.0, 1994.0, 19.0])

1.0

In [36]:
labelsAndPreds.first()
#schemaString = "coords"
# Convert back to the orginal DF schema - DataFrame[leak_no: bigint, cause: bigint, x_coord: double, y_coord: double, diameter: bigint]
labelsAndPreds_with_schema = labelsAndPreds.map(lambda v: {"label": float(v[0]), "preds": float(v[1])})

labelsAndPreds_with_schema.first()

{'label': 1.0, 'preds': 1.0}

In [37]:
data_frame_table = hiveContext.createDataFrame(labelsAndPreds_with_schema)
print data_frame_table
data_frame_table.count()
data_frame_table.printSchema()

DataFrame[label: double, preds: double]
root
 |-- label: double (nullable = true)
 |-- preds: double (nullable = true)



In [39]:
data_frame_table.registerTempTable("hvt_labelsAndPreds") 
hiveContext.sql('create table hvt_labelsAndPreds as select label, preds from hvt_labelsAndPreds')

DataFrame[]

In [45]:
labelsAndPreds.saveAsTextFile("hdfs://sandbox.hortonworks.com:8020/user/hue/file.txt")

In [None]:
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.regression import LabeledPoint

feature_matrix_labeledPoint = feature_matrix_vectors.map(lambda v: LabeledPoint(1.0 if NaN in v else 0, v)).collect()