In [0]:
# File location and type
file_location = "/FileStore/tables/posenet_data-1.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

#display(df)


In [0]:
from math import atan2, degrees
def angle_between(x1, y1, x2, y2, x3, y3):
    deg1 = (360 + degrees(atan2(x1 - x2, y1 - y2))) % 360
    print(deg1)
    deg2 = (360 + degrees(atan2(x3 - x2, y3 - y2))) % 360
    return deg2 - deg1 if deg1 <= deg2 else 360 - (deg1 - deg2)

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
angle_udf = udf(angle_between, DoubleType())

In [0]:
angles = {
  'angle1': ['nose_x', 'nose_y', 'leftWrist_x', 'leftWrist_y', 'rightWrist_x', 'rightWrist_y'],
  'angle2': ['leftShoulder_x', 'leftShoulder_y', 'leftElbow_x', 'leftElbow_y', 'leftWrist_x', 'leftWrist_y'],
  'angle3': ['rightWrist_x', 'rightWrist_y', 'rightElbow_x', 'rightElbow_y', 'rightShoulder_x', 'rightShoulder_y'],
  'angle4': ['rightAnkle_x', 'rightAnkle_y', 'rightKnee_x', 'rightKnee_y', 'rightHip_x', 'rightHip_y'],
  'angle5': ['leftAnkle_x', 'leftAnkle_y', 'leftKnee_x', 'leftKnee_y', 'leftHip_x', 'leftHip_y'],
  'angle6': ['leftWrist_x', 'leftWrist_y', 'leftShoulder_x', 'leftShoulder_y', 'leftHip_x', 'leftHip_y'],
  'angle7': ['rightWrist_x', 'rightWrist_y', 'rightShoulder_x', 'rightShoulder_y', 'rightHip_x', 'rightHip_y'],
  'angle8': ['rightAnkle_x', 'rightAnkle_y', 'nose_x', 'nose_y', 'leftAnkle_x', 'leftAnkle_y'],
  'angle9': ['rightWrist_x', 'rightWrist_y', 'rightHip_x', 'rightHip_y', 'rightAnkle_x', 'rightAnkle_y'],
  'angle10': ['leftWrist_x', 'leftWrist_y', 'leftHip_x', 'leftHip_y', 'leftAnkle_x', 'leftAnkle_y'],
  'angle11': ['leftAnkle_x', 'leftAnkle_y', 'leftHip_x', 'leftHip_y', 'rightAnkle_x', 'rightAnkle_y'],
  'angle12': ['rightAnkle_x', 'rightAnkle_y', 'rightHip_x', 'rightHip_y', 'leftAnkle_x', 'leftAnkle_y'],
}

In [0]:
for i in angles:
  print(i)
  df =df.withColumn(i, angle_udf(angles[i][0], angles[i][1], angles[i][2], angles[i][3], angles[i][4], angles[i][5]))
  

angle1
angle2
angle3
angle4
angle5
angle6
angle7
angle8
angle9
angle10
angle11
angle12


In [0]:
df.printSchema()

root
 |-- filename: string (nullable = true)
 |-- nose_x: double (nullable = true)
 |-- nose_y: double (nullable = true)
 |-- nose_score: double (nullable = true)
 |-- leftEye_x: double (nullable = true)
 |-- leftEye_y: double (nullable = true)
 |-- leftEye_score: double (nullable = true)
 |-- rightEye_x: double (nullable = true)
 |-- rightEye_y: double (nullable = true)
 |-- rightEye_score: double (nullable = true)
 |-- leftEar_x: double (nullable = true)
 |-- leftEar_y: double (nullable = true)
 |-- leftEar_score: double (nullable = true)
 |-- rightEar_x: double (nullable = true)
 |-- rightEar_y: double (nullable = true)
 |-- rightEar_score: double (nullable = true)
 |-- leftShoulder_x: double (nullable = true)
 |-- leftShoulder_y: double (nullable = true)
 |-- leftShoulder_score: double (nullable = true)
 |-- rightShoulder_x: double (nullable = true)
 |-- rightShoulder_y: double (nullable = true)
 |-- rightShoulder_score: double (nullable = true)
 |-- leftElbow_x: double (nullable =

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

#### Clustering Task 1
inputCols = ['angle1', 'angle11', 'angle12', 'leftWrist_y', 'rightWrist_y', 'nose_y']
assemble=VectorAssembler(inputCols=inputCols, outputCol='features')
assembled_data=assemble.transform(df)
#display(assembled_data.select('features'))


In [0]:
### Standardize the data
scale=StandardScaler(inputCol='features',outputCol='standardized')
data_scale=scale.fit(assembled_data)
data_scale_output=data_scale.transform(assembled_data)
#display(data_scale_output.select('standardized'))

In [0]:
### KMeans
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
silhouette_score=[]
evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='standardized', \
                                metricName='silhouette', distanceMeasure='squaredEuclidean')
    
KMeans_algo=KMeans(featuresCol='standardized', k=2)

KMeans_fit=KMeans_algo.fit(data_scale_output)

output=KMeans_fit.transform(data_scale_output)


score=evaluator.evaluate(output)

silhouette_score.append(score)

print("Silhouette Score:",score, "Number of Clusters:", 2)

#display(output)

Silhouette Score: 0.32429472095198947 Number of Clusters: 2


In [0]:
### Clustering Task2
### Change your input
inputCols = ['angle1', 'leftWrist_y', 'rightWrist_y', 'nose_y']
assemble=VectorAssembler(inputCols=inputCols, outputCol='features')
assembled_data=assemble.transform(df)
scale=StandardScaler(inputCol='features',outputCol='standardized')
data_scale=scale.fit(assembled_data)
data_scale_output=data_scale.transform(assembled_data)
silhouette_score=[]
evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='standardized', \
                                metricName='silhouette', distanceMeasure='squaredEuclidean')
    
KMeans_algo=KMeans(featuresCol='standardized', k=2)

KMeans_fit=KMeans_algo.fit(data_scale_output)

output=KMeans_fit.transform(data_scale_output)


score=evaluator.evaluate(output)

silhouette_score.append(score)

print("Silhouette Score:",score, "Number of Clusters:", 2)

Silhouette Score: 0.44213495304002204 Number of Clusters: 2


In [0]:
### Clustering Task 3 (change your input cols)
inputCols = ['angle1', 'angle2', 'angle3', 'angle4', 'angle5', 'angle6','angle7', 'angle8', 'angle9','angle10', 'angle11', 'angle12']
assemble=VectorAssembler(inputCols=inputCols, outputCol='features')
assembled_data=assemble.transform(df)

In [0]:
scale=StandardScaler(inputCol='features',outputCol='standardized')
data_scale=scale.fit(assembled_data)
data_scale_output=data_scale.transform(assembled_data)

In [0]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
silhouette_score=[]
evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='standardized', \
                                metricName='silhouette', distanceMeasure='squaredEuclidean')
for i in range(2,5):
    
    KMeans_algo=KMeans(featuresCol='standardized', k=i)
    
    KMeans_fit=KMeans_algo.fit(data_scale_output)
    
    output=KMeans_fit.transform(data_scale_output)
    
    
    
    score=evaluator.evaluate(output)
    
    silhouette_score.append(score)
    
    print("Silhouette Score:",score, "Number of Clusters:", i)

Silhouette Score: 0.34807979679315687 Number of Clusters: 2
Silhouette Score: 0.21599871586127883 Number of Clusters: 3
Silhouette Score: 0.21325802074198827 Number of Clusters: 4
