# Data Sience Project - Task 4

### Spark with Clustering (task 3 in spark)

In [461]:
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql.window import Window
import pyspark.sql.functions as func

In [462]:
spark = SparkSession.builder.appName('clustering_in_spark').getOrCreate()

#### Read data from csv

In [463]:
df=spark.read.csv('hotels_data.csv',inferSchema=True,header=True)

In [464]:
df.printSchema()

root
 |-- Snapshot ID: integer (nullable = true)
 |-- Snapshot Date: string (nullable = true)
 |-- Checkin Date: string (nullable = true)
 |-- Days: integer (nullable = true)
 |-- Original Price: integer (nullable = true)
 |-- Discount Price: integer (nullable = true)
 |-- Discount Code: integer (nullable = true)
 |-- Available Rooms: integer (nullable = true)
 |-- Hotel Name: string (nullable = true)
 |-- Hotel Stars: integer (nullable = true)



## 4.a

In [465]:
from pyspark.sql.functions import udf, col
from datetime import datetime
from pyspark.sql.types import DateType, IntegerType
from pyspark.sql.functions import desc

#### Converts string to date - function

In [466]:
def str_date(str):    
    return datetime.strptime(str, '%m/%d/%Y %H:%M')

#### Convert function to pyspark function

In [467]:
toDate = udf(str_date, DateType())

#### Converts date columns to dates

In [468]:
# convert "checking_date" and "snapshot_date" to date types
df_dates = df.withColumn("checkin_date", toDate(col("Checkin Date")))
df_dates = df_dates.withColumn("snapshot_date", toDate(col("Snapshot Date")))

In [469]:
df_dates.printSchema()

root
 |-- Snapshot ID: integer (nullable = true)
 |-- Snapshot Date: string (nullable = true)
 |-- Checkin Date: string (nullable = true)
 |-- Days: integer (nullable = true)
 |-- Original Price: integer (nullable = true)
 |-- Discount Price: integer (nullable = true)
 |-- Discount Code: integer (nullable = true)
 |-- Available Rooms: integer (nullable = true)
 |-- Hotel Name: string (nullable = true)
 |-- Hotel Stars: integer (nullable = true)
 |-- checkin_date: date (nullable = true)
 |-- snapshot_date: date (nullable = true)



#### Get the 150 hotels with maximum records

In [470]:
# group by hotel name and count,with sortig 
hotel_names = df_dates.groupBy('Hotel Name').count().sort(desc('count')).limit(150)

#### Get list of the first (max) 150 hotels

In [471]:
maxHotel_names = hotel_names.toPandas()['Hotel Name'].tolist()

In [472]:
maxHotel_names

['Newark Liberty International Airport Marriott',
 'Hilton Garden Inn Times Square',
 'Residence Inn Newark Elizabeth Liberty International Airport',
 'Westin New York at Times Square',
 'Loews Regency New York Hotel',
 'Viceroy New York',
 'Four Seasons Hotel New York',
 'Langham Place New York Fifth Avenue',
 'The Carlyle A Rosewood Hotel',
 'DoubleTree by Hilton Metropolitan - New York City',
 'Magnuson Convention Center Hotel',
 'Hilton Garden Inn New York West 35th Street',
 'Hilton Garden Inn New York-Times Square Central',
 'Conrad New York',
 'Wyndham Garden Brooklyn Sunset Park',
 'Hilton Newark Airport',
 'Omni Berkshire Place',
 'Hilton Times Square',
 'Park Hyatt New York',
 'Homewood Suites by Hilton NY Midtown Manhattan Times Square',
 'Grand Hyatt New York',
 'The Plaza Hotel',
 'Quality Inn Woodside',
 'Hyatt Union Square New York',
 'Le Parker Meridien New York',
 'The New York EDITION',
 'W New York - Union Square',
 'Renaissance Newark Airport Hotel',
 'Hampton Inn N

#### Adding and filtering the other features

In [473]:
hotels_data = df_dates.filter(col('Hotel Name').isin(maxHotel_names))

#### Check

In [474]:
hotels_data

DataFrame[Snapshot ID: int, Snapshot Date: string, Checkin Date: string, Days: int, Original Price: int, Discount Price: int, Discount Code: int, Available Rooms: int, Hotel Name: string, Hotel Stars: int, checkin_date: date, snapshot_date: date]

## 4.b

In [475]:
# group by checkin and count, with sorting
checkin = hotels_data.groupBy('Checkin Date').count().sort(desc('count')).limit(40)

In [476]:
checkin

DataFrame[Checkin Date: string, count: bigint]

#### get list of the first (max) 40 checkin dates

In [477]:
maxCheckin = checkin.toPandas()['Checkin Date'].tolist()

In [478]:
maxCheckin

['11/11/2015 0:00',
 '10/14/2015 0:00',
 '11/4/2015 0:00',
 '8/19/2015 0:00',
 '10/28/2015 0:00',
 '10/21/2015 0:00',
 '11/6/2015 0:00',
 '8/12/2015 0:00',
 '11/5/2015 0:00',
 '10/22/2015 0:00',
 '11/12/2015 0:00',
 '10/29/2015 0:00',
 '9/10/2015 0:00',
 '9/9/2015 0:00',
 '11/18/2015 0:00',
 '8/26/2015 0:00',
 '11/10/2015 0:00',
 '11/13/2015 0:00',
 '10/15/2015 0:00',
 '11/21/2015 0:00',
 '9/30/2015 0:00',
 '10/30/2015 0:00',
 '9/16/2015 0:00',
 '9/17/2015 0:00',
 '11/28/2015 0:00',
 '10/1/2015 0:00',
 '11/26/2015 0:00',
 '9/11/2015 0:00',
 '9/18/2015 0:00',
 '10/16/2015 0:00',
 '11/27/2015 0:00',
 '10/2/2015 0:00',
 '10/7/2015 0:00',
 '11/7/2015 0:00',
 '8/28/2015 0:00',
 '8/27/2015 0:00',
 '10/27/2015 0:00',
 '11/3/2015 0:00',
 '8/13/2015 0:00',
 '11/25/2015 0:00']

#### Adding and filtering the other features

In [479]:
hotelsCheckin = hotels_data.filter(col('Checkin Date').isin(maxCheckin))

#### Check

In [480]:
hotelsCheckin

DataFrame[Snapshot ID: int, Snapshot Date: string, Checkin Date: string, Days: int, Original Price: int, Discount Price: int, Discount Code: int, Available Rooms: int, Hotel Name: string, Hotel Stars: int, checkin_date: date, snapshot_date: date]

## 4.c

#### We need to take for each checkin the 4 prices for the 4 discount codes, for this we need to make a new table and do join with the table that we already have

In [None]:
#the unique hotels name
names=hotelsCheckin.select('Hotel Name').distinct().collect()

In [None]:
#the unique hotels name- list
namesList= [(row['Hotel Name']) for row in names]

In [None]:
namesList

In [None]:
#the unique checking date
checkins=hotelsCheckin.select('Checkin Date').distinct().collect()

In [None]:
#the unique checking date- list
checkinsList= [(row['Checkin Date']) for row in checkins]

In [None]:
checkinsList

In [None]:
#the unique discount code
uniqueCode= [1,2,3,4]

#### Build the table that will help us insert the 4 codes for each checkin

In [None]:
helpTable=[]
for x in namesList:
    for y in checkinsList:
        for z in uniqueCode:
            helpTable.append([x, y ,z,-1])

In [None]:
helpTable

#### Making the schema for dataframe

In [None]:
dfSchema = StructType([StructField("Hotel Name", StringType()),StructField("Checkin Date", StringType()),StructField("Discount Code",  IntegerType()),StructField("Discount Price", LongType())])

#### Creating spark dataframe

In [None]:
s_df= spark.createDataFrame(helpTable, schema=dfSchema)

In [None]:
s_df = s_df.withColumn("Checkin Date", toDate(col("Checkin Date")))

In [None]:
s_df

#### Filter from dataframe that we already have only :Hotel Name, Checkin Date, Discount Code, Discount Price

In [None]:
hotels_df=hotelsCheckin.select('Hotel Name', 'checkin_date','Discount Code', 'Discount Price')

#### Union the dataframe with the table that we build

In [None]:
all_df=s_df.union(hotels_df)

In [None]:
all_df

#### Group by to order the details

In [None]:
all_df = all_df.groupBy('Hotel name','Checkin Date', 'Discount Code').min('Discount Price')

In [None]:
all_df

#### Partition data by hotel name

In [None]:
all_dfparts=all_df.repartition("Hotel name")

## 4.d

#### Split to two groups:  with price  -1,  without -1(the others)

In [None]:
groupMinus = all_df.filter(col('min(Discount Price)') == -1)
groupWithout = all_df.filter(col('min(Discount Price)') > -1)

### Normalization

In [None]:
groupWithout.createOrReplaceTempView("normalization")

#### Creating SQLContext for SQL converting

In [None]:
sqlContext = SQLContext(spark)

#### Preparing data for normalization

In [None]:
dataFrame = sqlContext.table("normalization")

In [None]:
#Partioning data for groups so we can applay function on groups
windowSpec = Window.partitionBy(groupWithout['Hotel name']) 

### Normalize function

In [None]:
normalize =  ((dataFrame['min(Discount Price)'] - func.min(dataFrame['min(Discount Price)']).over(windowSpec)) / (func.max(dataFrame['min(Discount Price)']).over(windowSpec) - func.min(dataFrame['min(Discount Price)']).over(windowSpec)) * 100)

In [None]:
normalized = dataFrame.select(
  dataFrame['Hotel Name'],
  dataFrame['Checkin Date'],
  dataFrame['Discount Code'],
  normalize.alias("Normal"))

#### Getting back the values -1

In [None]:
#Changing column name for same schema
groupMinus = groupMinus.withColumnRenamed('min(Discount Price)','Normal')

#### Union all data frames with sorting

In [None]:
normalized = normalized.union(groupMinus)
normalized = normalized.orderBy(['Hotel name','Checkin Date','Discount Code'])

In [None]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

In [None]:
normal_toList = normalized.groupBy('Hotel name').agg(F.collect_list('Normal').alias("Normal"))

In [None]:
#parsing to df 
all_hotels =  normal_toList.select([normal_toList["Hotel name"]] +  [normal_toList.Normal[i] for i in range(160)])

In [None]:
all_hotels