In [1]:
from pyspark import SparkConf, SparkContext

def getSparkContext():
    """
    Gets the Spark Context
    """
    conf = (SparkConf()
         .setMaster("local") # run on local
         .setAppName("Logistic Regression") # Name of App
         .set("spark.executor.memory", "1g")) # Set 1 gig of memory
    sc = SparkContext(conf = conf) 
    return sc

In [2]:
sc = getSparkContext()

In [13]:
from pyspark.sql import HiveContext
from pyspark.mllib.clustering import KMeans
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.stat import Statistics
from math import sqrt
import matplotlib.pyplot as plt
from pylab import *
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint
from numpy import array

%matplotlib inline

hiveContext = HiveContext(sc)

# Constructs a Spark DataFrame (SDF) from the reference asset table in Hive.
hvt_SDF = hiveContext.sql('SELECT model, serial, inceptionperiod, location FROM hvt')

# Constructs a Spark DataFrame (SDF) from the reference asset table in Hive.
hvt_data = hiveContext.sql('SELECT model, serial, location, inceptionperiod, age FROM hvt_data')

# Constructs a Spark DataFrame (SDF) from the asset failure table in Hive.
hvt_failure_SDF = hiveContext.sql('SELECT period, serialf FROM hvt_failuref')

# Constructs a Spark DataFrame (SDF) from the asset operations table in Hive.
hvt_operating_SDF= hiveContext.sql('SELECT serialo, period FROM hvt_operatingo')

In [71]:
test1 = hiveContext.sql("""
    select a.model, a.serial, a.location, b.period - a.inceptionperiod as age, 1.0 as label
    from hvt as a left join hvt_operatingo as b on a.serial = b.serialo
""")
test2 = hiveContext.sql("""
    select a.model, a.serial, a.location, b.period - a.inceptionperiod as age, 1.0 as label
    from hvt as a left join hvt_failuref as b on a.serial = b.serialf
""")
test = test1.unionAll(test2)
test.show()


model serial location age label
1     31     5        0   1.0  
1     31     5        1   1.0  
1     31     5        2   1.0  
1     31     5        3   1.0  
1     31     5        4   1.0  
1     31     5        5   1.0  
1     31     5        6   1.0  
1     31     5        7   1.0  
1     31     5        8   1.0  
1     31     5        9   1.0  
1     31     5        10  1.0  
1     31     5        11  1.0  
1     31     5        12  1.0  
1     31     5        13  1.0  
1     31     5        14  1.0  
1     31     5        15  1.0  
1     31     5        16  1.0  
1     31     5        17  1.0  
1     31     5        18  1.0  
1     31     5        19  1.0  


In [14]:
hvt_operating_SDF.count()

1000000L

In [15]:
hvt_failure_SDF.count()

31954L

In [16]:
hvt_SDF.count()

10000L

In [17]:
# Filter below 2010
hvt_failure_SDF1 = hvt_failure_SDF.filter(hvt_failure_SDF.period < 2010)
hvt_failure_SDF1.count()

12527L

In [18]:
# Join the reference data with the ops data
hvt_SDF_ops = hvt_SDF.join(hvt_operating_SDF, hvt_operating_SDF.serialo == hvt_SDF.serial, "left_outer")

In [19]:
hvt_SDF_ops.count()

1000000L

In [20]:
# Filter the data we need
hvt_SDF_ops_age = hvt_SDF_ops.select(hvt_SDF_ops.model, hvt_SDF_ops.serial, hvt_SDF_ops.location,
                                     hvt_SDF_ops.inceptionperiod, hvt_SDF_ops.period - hvt_SDF_ops.inceptionperiod)

In [21]:
hvt_SDF_ops_age.count()

1000000L

In [22]:
# Join hvt reference data with the failure data
hvt_SDF_fail = hvt_SDF.join(hvt_failure_SDF, hvt_failure_SDF.serialf == hvt_SDF.serial, "left_outer")

In [23]:
hvt_SDF_fail.count()

31954L

In [28]:
# Filter the data we need
hvt_SDF_fail_age = hvt_SDF_fail.select(hvt_SDF_fail.model, hvt_SDF_fail.serial, hvt_SDF_fail.location, 
                                       hvt_SDF_fail.inceptionperiod ,hvt_SDF_fail.period - hvt_SDF_fail.inceptionperiod)

In [29]:
hvt_SDF_fail_age.count()

31954L

In [30]:
#Create a vector matrix of ops and fail dataframes
hvt_SDF_ops_age_vector = hvt_SDF_ops_age.map(lambda x: Vectors.dense(x)).cache()
hvt_SDF_fail_age_vector = hvt_SDF_fail_age.map(lambda x: Vectors.dense(x)).cache()


In [31]:
# Create a vector of labeled points based on the ops and fail vector matrixes 
hvt_SDF_ops_age_vector_labeledPoint = hvt_SDF_ops_age_vector.map(lambda x: LabeledPoint(1.0,x)) 
hvt_SDF_fail_age_vector_labeledPoint = hvt_SDF_fail_age_vector.map(lambda x: LabeledPoint(0.0,x))

In [32]:
#Combine the two labeled points together
hvt_SDF_labeledPoint = hvt_SDF_ops_age_vector_labeledPoint + hvt_SDF_fail_age_vector_labeledPoint
hvt_SDF_labeledPoint.count()

1031954

In [33]:
# Build and train the model
parsedData = hvt_SDF_labeledPoint
model = LogisticRegressionWithSGD.train(parsedData)

# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))


Training Error = 0.0309645584978


In [34]:
labelsAndPreds.count()

1031954

In [38]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
#tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(hvt_SDF_fail_age)

Py4JJavaError: An error occurred while calling o197.transform.
: java.lang.IllegalArgumentException: Field "text" does not exist.
	at org.apache.spark.sql.types.StructType$$anonfun$apply$25.apply(dataTypes.scala:1032)
	at org.apache.spark.sql.types.StructType$$anonfun$apply$25.apply(dataTypes.scala:1032)
	at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
	at scala.collection.AbstractMap.getOrElse(Map.scala:58)
	at org.apache.spark.sql.types.StructType.apply(dataTypes.scala:1031)
	at org.apache.spark.ml.UnaryTransformer.transformSchema(Transformer.scala:90)
	at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:58)
	at org.apache.spark.ml.UnaryTransformer.transform(Transformer.scala:101)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:745)


In [43]:
# Use the model to predict
model.clearThreshold()
# Test Data
#model.predict([1.0, 31.0, 5.0, 1935.0, 100.0])
#2012	2754	2	42904.893	117904.89
#2	2754	1994	5	1500	-8	5	0	30	3	100
model.predict([2.0, 2754.0, 5.0, 1994.0, 19.0])
hvt_data.

1.0

In [36]:
labelsAndPreds.first()
#schemaString = "coords"
# Convert back to the orginal DF schema - DataFrame[leak_no: bigint, cause: bigint, x_coord: double, y_coord: double, diameter: bigint]
labelsAndPreds_with_schema = labelsAndPreds.map(lambda v: {"label": float(v[0]), "preds": float(v[1])})

labelsAndPreds_with_schema.first()

{'label': 1.0, 'preds': 1.0}

In [37]:
data_frame_table = hiveContext.createDataFrame(labelsAndPreds_with_schema)
print data_frame_table
data_frame_table.count()
data_frame_table.printSchema()

DataFrame[label: double, preds: double]
root
 |-- label: double (nullable = true)
 |-- preds: double (nullable = true)



In [39]:
data_frame_table.registerTempTable("hvt_labelsAndPreds") 
hiveContext.sql('create table hvt_labelsAndPreds as select label, preds from hvt_labelsAndPreds')

DataFrame[]

In [45]:
labelsAndPreds.saveAsTextFile("hdfs://sandbox.hortonworks.com:8020/user/hue/file.txt")

In [None]:
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.regression import LabeledPoint

feature_matrix_labeledPoint = feature_matrix_vectors.map(lambda v: LabeledPoint(1.0 if NaN in v else 0, v)).collect()