In [3]:
## Set Python - Spark environment.
import os
import sys
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

In [4]:
## Create SparkContext, SparkSession
from os.path import expanduser, join, abspath

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark import SparkContext
sc = SparkContext()

# warehouse_location points to the default location for managed databases and tables
warehouse_location = 'hdfs:///apps/hive/warehouse/'

spark = SparkSession \
    .builder \
    .appName("Spark Machine Learning Example") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()

In [5]:
from numpy import array
from math import sqrt

In [6]:
data = sc.textFile("/user/atul/uber/HistData/FlumeData.*")

In [5]:
data.count()

3296451

In [7]:
data.take(2)

[u'"11/1/2016 0:00:00",40.7293,-73.992,"B02512"',
 u'"11/1/2016 0:01:00",40.7131,-74.0097,"B02512"']

In [8]:
def convertDataFloat(line):
    return array([float(line[1]),float(line[2])])

In [9]:
fea_data = data.map(lambda data:data.split(','))

In [10]:
fea_data.take(2)

[[u'"11/1/2016 0:00:00"', u'40.7293', u'-73.992', u'"B02512"'],
 [u'"11/1/2016 0:01:00"', u'40.7131', u'-74.0097', u'"B02512"']]

In [13]:
fea_data1 = fea_data.map(lambda x:(float(x[1]),float(x[2])))

In [14]:
fea_data1.take(3)

[(40.7293, -73.992), (40.7131, -74.0097), (40.3461, -74.661)]

In [11]:
parsedData = fea_data.map(lambda line : convertDataFloat(line))

In [12]:
parsedData.take(2)

[array([ 40.7293, -73.992 ]), array([ 40.7131, -74.0097])]

In [16]:
type(parsedData)

pyspark.rdd.PipelinedRDD

In [15]:
from pyspark.mllib.clustering import KMeans, KMeansModel

clusters = KMeans.train(parsedData,8, 
                        maxIterations=10, 
                        initializationMode="random")

In [17]:
clusters.centers

[array([ 40.69829832, -74.20328151]),
 array([ 40.75894782, -73.97751224]),
 array([ 40.79134724, -73.86140043]),
 array([ 40.78763797, -73.9537063 ]),
 array([ 40.68691456, -73.95937887]),
 array([ 40.66554691, -73.7592567 ]),
 array([ 40.71643625, -74.00120514]),
 array([ 40.74090457, -73.99602132])]

In [18]:
def wsssError(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

In [19]:
WSSSE = parsedData.map(lambda point: wsssError(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

Within Set Sum of Squared Error = 63776.0076986


In [20]:
clusters.predict(array([40.6988701 , -74.20341933]))

0

In [1]:
sqrt(sum((array([40.7204,-74.0047]) - array([ 40.71743048, -74.002436  ])) ** 2))

In [14]:
#40.7204 - 40.71743048

In [15]:
#-74.0047 - -74.002436

In [23]:
clusters.save(sc, "/user/atul/Uber/kmeanModel")