In [None]:
!pip install pyspark



In [None]:
import pyspark
import pandas as pd
import numpy as np
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from google.colab import drive
import time
from pyspark.sql import SQLContext
drive.mount('/content/drive')
import collections
from pyspark.mllib.linalg import *
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover
from pyspark.mllib.linalg import SparseVector
from scipy.spatial import distance
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
import json

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
begin = time.time()
conf = SparkConf().setMaster("local[8]").setAppName("Exercise")
sc = SparkContext.getOrCreate(conf=conf)
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType, StringType, FloatType
from pyspark.sql.functions import concat, lit
from pyspark.sql.functions import monotonically_increasing_id 
sql_sc = SparkSession(sc)
# load data
schema = StructType([\
    StructField("buying", StringType(), True),\
    StructField("maint", StringType(), True),\
    StructField("doors", StringType(), True),\
    StructField("persons", StringType(), True),\
    StructField("lug_boot", StringType(), True),\
    StructField("safety", StringType(), True)])
cars_df = sql_sc.read.format('csv').option("delimiter", ",").schema(schema).option("header", "true").load("/content/drive/MyDrive/BIGDATA/BT/car.data")


cars_df = cars_df.select("*").withColumn("id", monotonically_increasing_id())
cars_df.show()

+------+-----+-----+-------+--------+------+---+
|buying|maint|doors|persons|lug_boot|safety| id|
+------+-----+-----+-------+--------+------+---+
| vhigh|vhigh|    2|      2|   small|   med|  0|
| vhigh|vhigh|    2|      2|   small|  high|  1|
| vhigh|vhigh|    2|      2|     med|   low|  2|
| vhigh|vhigh|    2|      2|     med|   med|  3|
| vhigh|vhigh|    2|      2|     med|  high|  4|
| vhigh|vhigh|    2|      2|     big|   low|  5|
| vhigh|vhigh|    2|      2|     big|   med|  6|
| vhigh|vhigh|    2|      2|     big|  high|  7|
| vhigh|vhigh|    2|      4|   small|   low|  8|
| vhigh|vhigh|    2|      4|   small|   med|  9|
| vhigh|vhigh|    2|      4|   small|  high| 10|
| vhigh|vhigh|    2|      4|     med|   low| 11|
| vhigh|vhigh|    2|      4|     med|   med| 12|
| vhigh|vhigh|    2|      4|     med|  high| 13|
| vhigh|vhigh|    2|      4|     big|   low| 14|
| vhigh|vhigh|    2|      4|     big|   med| 15|
| vhigh|vhigh|    2|      4|     big|  high| 16|
| vhigh|vhigh|    2|

In [None]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(cars_df) for column in list(set(cars_df.columns)-set(['date'])) ]
pipeline = Pipeline(stages=indexers)
df_r = pipeline.fit(cars_df).transform(cars_df)
df_r = df_r.drop(df_r.id_index)
df_r.show()

+------+-----+-----+-------+--------+------+---+------------+------------+-------------+-----------+--------------+-----------+
|buying|maint|doors|persons|lug_boot|safety| id|safety_index|buying_index|persons_index|maint_index|lug_boot_index|doors_index|
+------+-----+-----+-------+--------+------+---+------------+------------+-------------+-----------+--------------+-----------+
| vhigh|vhigh|    2|      2|   small|   med|  0|         1.0|         3.0|          2.0|        3.0|           2.0|        3.0|
| vhigh|vhigh|    2|      2|   small|  high|  1|         0.0|         3.0|          2.0|        3.0|           2.0|        3.0|
| vhigh|vhigh|    2|      2|     med|   low|  2|         2.0|         3.0|          2.0|        3.0|           1.0|        3.0|
| vhigh|vhigh|    2|      2|     med|   med|  3|         1.0|         3.0|          2.0|        3.0|           1.0|        3.0|
| vhigh|vhigh|    2|      2|     med|  high|  4|         0.0|         3.0|          2.0|        3.0|    

In [None]:
from pyspark.ml.feature import VectorAssembler

feature_names = df_r.columns[7:]
print(feature_names)
assembler = VectorAssembler()
assembler.setInputCols(feature_names).setOutputCol('features')
transformed_data = assembler.transform(df_r)

transformed_data.show()

['safety_index', 'buying_index', 'persons_index', 'maint_index', 'lug_boot_index', 'doors_index']
+------+-----+-----+-------+--------+------+---+------------+------------+-------------+-----------+--------------+-----------+--------------------+
|buying|maint|doors|persons|lug_boot|safety| id|safety_index|buying_index|persons_index|maint_index|lug_boot_index|doors_index|            features|
+------+-----+-----+-------+--------+------+---+------------+------------+-------------+-----------+--------------+-----------+--------------------+
| vhigh|vhigh|    2|      2|   small|   med|  0|         1.0|         3.0|          2.0|        3.0|           2.0|        3.0|[1.0,3.0,2.0,3.0,...|
| vhigh|vhigh|    2|      2|   small|  high|  1|         0.0|         3.0|          2.0|        3.0|           2.0|        3.0|[0.0,3.0,2.0,3.0,...|
| vhigh|vhigh|    2|      2|     med|   low|  2|         2.0|         3.0|          2.0|        3.0|           1.0|        3.0|[2.0,3.0,2.0,3.0,...|
| vhigh|

In [None]:
import pyspark.sql.functions as F
from scipy.spatial import distance

# input
input_row = transformed_data.select("features").take(1)[0][0]
# print(input_row)
distance_udf = F.udf(lambda x: float(distance.euclidean(x, input_row)), FloatType())
transformed_data = transformed_data.withColumn('euclidean_distances', distance_udf(F.col('features')))
transformed_data.show()
b = transformed_data.rdd.map(lambda x: x['euclidean_distances'])
result = b.sortBy(lambda x: x,ascending = True).collect()
print("result:")
print(result[:10])

+------+-----+-----+-------+--------+------+---+------------+------------+-------------+-----------+--------------+-----------+--------------------+-------------------+
|buying|maint|doors|persons|lug_boot|safety| id|safety_index|buying_index|persons_index|maint_index|lug_boot_index|doors_index|            features|euclidean_distances|
+------+-----+-----+-------+--------+------+---+------------+------------+-------------+-----------+--------------+-----------+--------------------+-------------------+
| vhigh|vhigh|    2|      2|   small|   med|  0|         1.0|         3.0|          2.0|        3.0|           2.0|        3.0|[1.0,3.0,2.0,3.0,...|                0.0|
| vhigh|vhigh|    2|      2|   small|  high|  1|         0.0|         3.0|          2.0|        3.0|           2.0|        3.0|[0.0,3.0,2.0,3.0,...|                1.0|
| vhigh|vhigh|    2|      2|     med|   low|  2|         2.0|         3.0|          2.0|        3.0|           1.0|        3.0|[2.0,3.0,2.0,3.0,...|       