In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('clustering_spark').getOrCreate()


## Read Hotels Data

In [2]:
hotels_data = spark.read.csv("hotels_data.csv", header = True, inferSchema = True)

## Converts String to Dates 

In [3]:
from pyspark.sql.functions import udf, col
from datetime import datetime
from pyspark.sql.types import DateType, IntegerType

# Converts string to date
def str_to_date(str):    
    return datetime.strptime(str, '%m/%d/%Y %H:%M')

# convert a regular function to pyspark function
udf_strToDate = udf(str_to_date, DateType())

# convert "checking_date" and "snapshot_date" to date types
hotels_data_with_dates = hotels_data.withColumn("checkin_date", udf_strToDate(col("Checkin Date")))
hotels_data_with_dates = hotels_data_with_dates.withColumn("snapshot_date", udf_strToDate(col("Snapshot Date")))

## Get 150 common hotels

In [14]:
from pyspark.sql.functions import desc

# group by hotel name and count, take the first 150 hotels with the biggest count 
count_by_hotel_names = hotels_data_with_dates.groupBy('Hotel Name').count().sort(desc('count')).limit(150)

# get a list of the first 150 hotel names 
first_150_hotel_names = count_by_hotel_names.toPandas()['Hotel Name'].tolist()

# filter the most common 150 hotels from hotels_data_with_dates
hotels_150_data = hotels_data_with_dates.filter(col('Hotel Name').isin(first_150_hotel_names))

first_150_hotel_names

['Newark Liberty International Airport Marriott',
 'Hilton Garden Inn Times Square',
 'Residence Inn Newark Elizabeth Liberty International Airport',
 'Westin New York at Times Square',
 'Loews Regency New York Hotel',
 'Viceroy New York',
 'Four Seasons Hotel New York',
 'Langham Place New York Fifth Avenue',
 'The Carlyle A Rosewood Hotel',
 'DoubleTree by Hilton Metropolitan - New York City',
 'Magnuson Convention Center Hotel',
 'Hilton Garden Inn New York West 35th Street',
 'Hilton Garden Inn New York-Times Square Central',
 'Conrad New York',
 'Wyndham Garden Brooklyn Sunset Park',
 'Hilton Newark Airport',
 'Omni Berkshire Place',
 'Hilton Times Square',
 'Park Hyatt New York',
 'Homewood Suites by Hilton NY Midtown Manhattan Times Square',
 'Grand Hyatt New York',
 'The Plaza Hotel',
 'Quality Inn Woodside',
 'Hyatt Union Square New York',
 'Le Parker Meridien New York',
 'The New York EDITION',
 'W New York - Union Square',
 'Renaissance Newark Airport Hotel',
 'Hampton Inn N

## Filter records by most common 40 checkin dates 

In [17]:
# group by checkin and count, take the first 40 with biggest count
count_by_checkin = hotels_150_data.groupBy('checkin_date').count().sort(desc('count')).limit(40)

count_by_checkin = count_by_checkin.collect()

# get a list of the most common 40 checkin dates
first_40_checkin = [row[0] for row in count_by_checkin]
# first_40_checkin = count_by_checkin.toPandas()['checkin_date'].tolist()

# filter hotels data by the 40 most common dates
hotels_by_40_checkin = hotels_150_data.filter(col('checkin_date').isin(first_40_checkin))

first_40_checkin

[datetime.date(2015, 11, 11),
 datetime.date(2015, 10, 14),
 datetime.date(2015, 11, 4),
 datetime.date(2015, 8, 19),
 datetime.date(2015, 10, 28),
 datetime.date(2015, 10, 21),
 datetime.date(2015, 11, 6),
 datetime.date(2015, 8, 12),
 datetime.date(2015, 11, 5),
 datetime.date(2015, 10, 22),
 datetime.date(2015, 11, 12),
 datetime.date(2015, 10, 29),
 datetime.date(2015, 9, 10),
 datetime.date(2015, 9, 9),
 datetime.date(2015, 11, 18),
 datetime.date(2015, 8, 26),
 datetime.date(2015, 11, 10),
 datetime.date(2015, 11, 13),
 datetime.date(2015, 10, 15),
 datetime.date(2015, 11, 21),
 datetime.date(2015, 9, 30),
 datetime.date(2015, 10, 30),
 datetime.date(2015, 9, 16),
 datetime.date(2015, 9, 17),
 datetime.date(2015, 11, 28),
 datetime.date(2015, 10, 1),
 datetime.date(2015, 11, 26),
 datetime.date(2015, 9, 11),
 datetime.date(2015, 9, 18),
 datetime.date(2015, 10, 16),
 datetime.date(2015, 11, 27),
 datetime.date(2015, 10, 2),
 datetime.date(2015, 10, 7),
 datetime.date(2015, 11, 7)

## Generating synthetic data

In [6]:
#creating unique list for Hotel Name - Checkin Date - Discount code dummy combination generating 
unique_hotels_names = hotels_by_40_checkin.select('Hotel Name').distinct().collect()
unique_hotels_names_list = [(row['Hotel Name']) for row in unique_hotels_names]

unique_checkins =  hotels_by_40_checkin.select("checkin_date").distinct().collect()
unique_checkins_list = [(row['checkin_date']) for row in unique_checkins]

unique_discount_code =  [1,2,3,4]

# Create records with max price for each discount code for each date
synth_data = []
import sys
for x in unique_hotels_names_list:
    for y in unique_checkins_list:
        for z in unique_discount_code:
            synth_data.append([x, y ,z, sys.maxsize])

#Making the schema of synth_data
from pyspark.sql.types import *
cSchema = StructType([StructField("Hotel Name", StringType()),StructField("checkin_date", DateType()),StructField("Discount Code",  IntegerType()),StructField("min(Discount Price)", LongType())])

#Creating dummy df
dummy_df = spark.createDataFrame(synth_data, schema=cSchema)

sliced_df = hotels_by_40_checkin.select('Hotel Name', 'checkin_date','Discount Code', 'Discount Price')

# joining dummy data with grouped data 
hotel_chekin_discountCode = sliced_df.union(dummy_df)

# group by Checkin - Hotel - Discount Code
hotel_chekin_discountCode = hotel_chekin_discountCode.groupBy('Hotel name','checkin_date', 'Discount Code').min('Discount Price')

#replacing sys.max with -1 
hotel_chekin_discountCode = hotel_chekin_discountCode.replace(sys.maxsize, -1)

#sorting date
hotel_chekin_discountCode = hotel_chekin_discountCode.orderBy(['Hotel name','checkin_date','Discount Code'])


## Normalizing data


In [7]:
#split to two groups: one with price of -1, one with greater than -1
groupMinus = hotel_chekin_discountCode.filter(col('min(Discount Price)') == -1)
groupGreater = hotel_chekin_discountCode.filter(col('min(Discount Price)') > -1)

In [8]:
from pyspark.sql import SQLContext
from pyspark.sql.window import Window
import pyspark.sql.functions as func

groupGreater.createOrReplaceTempView("normalization")

#Creating SQLContext for SQL converting
sqlContext = SQLContext(spark)
    
#Preparing data for normalization
dataFrame = sqlContext.table("normalization")
#Partioning data for groups so we can applay function on groups
windowSpec = Window.partitionBy(groupGreater['Hotel name'])  


#Normalize function
diff = func.max(dataFrame['min(Discount Price)']).over(windowSpec) - func.min(dataFrame['min(Discount Price)']).over(windowSpec)
normalize = func.when( diff == 0, 0).otherwise( ( dataFrame['min(Discount Price)'] - func.min(dataFrame['min(Discount Price)']).over(windowSpec) ) / diff * 100 )

# diff = func.max(dataFrame['min(Discount Price)']).over(windowSpec) - func.min(dataFrame['min(Discount Price)']).over(windowSpec) == 0, 0).otherwise( ( func.max(dataFrame['min(Discount Price)']).over(windowSpec) - func.min(dataFrame['min(Discount Price)']).over(windowSpec) ) * 100 )
# normalize =  ((dataFrame['min(Discount Price)'] - func.min(dataFrame['min(Discount Price)']).over(windowSpec)) / diff)

normalized_df = dataFrame.select(
  dataFrame['Hotel Name'],
  dataFrame['checkin_date'],
  dataFrame['Discount Code'],
  normalize.alias("Normal"))

# normalized_df.toPandas().isnull().values.any()


### Joining back -1 values

In [9]:
#Changing column name for same schema
groupMinus = groupMinus.withColumnRenamed('min(Discount Price)','Normal')

#Union all data frames with sorting
normalized_df = normalized_df.union(groupMinus)
normalized_df = normalized_df.orderBy(['Hotel name','checkin_date','Discount Code'])


In [10]:
# normalized_df.dtypes

## Flatten each hotel values to Vector Dense values

In [11]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml.linalg import SparseVector, DenseVector
from pyspark.ml.linalg import Vectors, VectorUDT

to_vector = udf(lambda a: Vectors.dense(a), VectorUDT())

normal_toList = normalized_df.groupBy('Hotel name').agg(F.collect_list('Normal').alias("features"))
normal_features = normal_toList.withColumn("features",to_vector("features")).drop("Hotel Name")

#normal_toList.select("Normal").flatMap(lambda x : x)

#parsing to df 
#all_hotels_df =  normal_toList.select([normal_toList["Hotel name"]] +  [normal_toList.Normal[i] for i in range(160)])

## Calculating Bisecting K-Means 

In [19]:
from pyspark.ml.clustering import BisectingKMeans

bkm = BisectingKMeans(k=2, minDivisibleClusterSize=1.0)
model = bkm.fit(normal_features)
centers = model.clusterCenters()
print(centers)


# NOTES
# The Bisecting algorithem is an top-down algorithem.
# First, it collect the data to the driver, and on each divide it's split the data into partitions to make it parellelize 
# without dependeny of data.

[array([6.78064200e+00, 5.00971299e+00, 4.74151800e+00, 1.01035078e+00,
       1.53957323e+00, 4.54731496e+00, 2.18055430e+00, 1.51242508e+00,
       2.40490691e+00, 2.34360919e+00, 3.04890332e+00, 7.95632616e-01,
       2.40714876e+00, 3.38832908e+00, 1.09663390e+00, 5.67565267e-01,
       2.02534642e+00, 3.60851283e+00, 2.15370354e+00, 6.21461435e-01,
       2.90935899e+00, 6.05531627e+00, 5.48503269e+00, 7.99441900e-01,
       4.86479988e+00, 1.12691275e+01, 1.06123528e+01, 4.19683575e+00,
       8.75847216e+00, 1.97741475e+01, 9.01797845e+00, 7.76225808e+00,
       6.25786848e+00, 1.70576385e+01, 8.05304841e+00, 2.34412752e+00,
       2.97021209e+00, 7.01486495e+00, 5.98886753e+00, 6.03631016e+00,
       4.21701376e+00, 8.61357780e+00, 5.58422962e+00, 3.16943914e+00,
       6.43481316e+00, 1.15902502e+01, 7.15791863e+00, 3.97121028e+00,
       9.03593481e+00, 8.48587660e+00, 8.65401889e+00, 5.02895988e+00,
       1.09501965e+01, 1.00908663e+01, 1.34504963e+01, 7.38894263e+00,
     

In [20]:
len(centers)

2

## Dendogram

In [None]:
#importing clustering libaries 
from scipy.cluster.hierarchy import dendrogram, linkage 
from matplotlib import pyplot as plt
from scipy import cluster
shc = cluster.hierarchy

# back to pandas
vector = all_hotels_df.toPandas()

print (vector[vector["Hotel name"] == "Bentley Hotel"])

#preproccesing data for clustering
labels = vector.values[:,0]
data = vector.values[:,1:160]
plt.figure(figsize=(20, 10))  
plt.title("Clustering Hotels")  

# "ward" - minimizes the variance between clusters, that means that each two clusters were combine if their variance is alike 
Z = shc.linkage(data, method='ward')
dend = shc.dendrogram(Z, labels=labels) 

## Hotel to Cluster

In [None]:
from sklearn.cluster import AgglomerativeClustering
import pandas as pd

#running the algorithem again in a diffrent way
cluster = AgglomerativeClustering(n_clusters=3, affinity='euclidean', linkage='ward')  
clusters = cluster.fit_predict(vector.values[:,1:160])  

hotels = pd.DataFrame.from_records(vector.values)

hotels["cluster"] = clusters
hotels = hotels[[0,"cluster"]]
hotels.sort_values(by=["cluster"],ascending=True,inplace=True)

hotels["Count"] = hotels.groupby("cluster")[0].transform("count")
# hotels[hotels[0] == "NOMO SOHO"]
hotels