In [1]:
import findspark
findspark.init()

In [2]:
# Initialize spark sesssion
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("bookings-etl").getOrCreate()

In [3]:
# Import Library
from pyspark.sql.functions import concat, col, lit, to_date, date_add, expr, udf, when
from pyspark.sql.types import StringType, IntegerType

In [4]:
# Reading the file
data = spark.read.option('header','True').csv("hostel.csv",inferSchema=True)

In [5]:
# Find all unique element of column which datatype is of string
for col in data.dtypes:
    if col[1]=='string':
        print(col[0])
        data.select(col[0]).distinct().show()

hotel
+------------+
|       hotel|
+------------+
|  City Hotel|
|Resort Hotel|
+------------+

arrival_date_month
+------------------+
|arrival_date_month|
+------------------+
|              July|
|          November|
|          February|
|           January|
|             March|
|           October|
|               May|
|            August|
|             April|
|              June|
|          December|
|         September|
+------------------+

children
+--------+
|children|
+--------+
|       3|
|       0|
|      NA|
|       1|
|      10|
|       2|
+--------+

meal
+---------+
|     meal|
+---------+
|       SC|
|       FB|
|Undefined|
|       BB|
|       HB|
+---------+

country
+-------+
|country|
+-------+
|    POL|
|    LVA|
|    BRB|
|    ZMB|
|    JAM|
|    BRA|
|    ARM|
|    MOZ|
|    CUB|
|    JOR|
|    FRA|
|    ABW|
|    URY|
|    BOL|
|    GIB|
|    LBY|
|    ETH|
|     CN|
|    ITA|
|    UKR|
+-------+
only showing top 20 rows

market_segment
+--------------+
|market

In [6]:
# Filter Record based on Tour operator as market_segment designation 
filter_df=data.filter(data.market_segment == "Offline TA/TO")

In [7]:
# Create the arrival_date field by combining the arrival_date_year, arrival_date_month and arrival_date_day_of_month columns using column objects

filter_1 = filter_df.withColumn("arrival_date", to_date(concat(filter_df['arrival_date_year'].cast("string"), lit('-'), filter_df['arrival_date_month'], lit('-'), filter_df['arrival_date_day_of_month'].cast("string")),"yyyy-MMMM-d"))


# Create the departure_date field by adding the stays_in_weekend_nights and stays_in_week_nights columns to the arrival_date field using sql expresion
 
filter_2 = filter_1.withColumn("departure_date",expr("date_add(arrival_date,stays_in_weekend_nights+stays_in_week_nights)"))


In [8]:
# Create new column "with_family_breakfast" as 'yes' if sum of children and babies aregreater then 0 else 'No'
filter_3=filter_2.withColumn('with_family_breakfast',when(filter_2.children.cast('int')+filter_2.babies> 0,"Yes")
                             .otherwise("No"))

In [9]:
#Write output in a parquet file format
filter_3.write.parquet("booking-etl/hostel_booking.parquet") 

### Test case:

In [10]:
# Check the unique record in market_segment after applying filter on market_segment
filter_3.select('market_segment').distinct().show()

+--------------+
|market_segment|
+--------------+
| Offline TA/TO|
+--------------+



In [11]:
# Review the arrival_date, departure_date column
filter_3.select('arrival_date_year','arrival_date_month','arrival_date_day_of_month','arrival_date','stays_in_weekend_nights','stays_in_week_nights','departure_date').show(5)

+-----------------+------------------+-------------------------+------------+-----------------------+--------------------+--------------+
|arrival_date_year|arrival_date_month|arrival_date_day_of_month|arrival_date|stays_in_weekend_nights|stays_in_week_nights|departure_date|
+-----------------+------------------+-------------------------+------------+-----------------------+--------------------+--------------+
|             2015|              July|                        1|  2015-07-01|                      0|                   3|    2015-07-04|
|             2015|              July|                        1|  2015-07-01|                      0|                   4|    2015-07-05|
|             2015|              July|                        1|  2015-07-01|                      2|                   5|    2015-07-08|
|             2015|              July|                        1|  2015-07-01|                      2|                   5|    2015-07-08|
|             2015|              J

In [12]:
# Validate the derivation of with_family_breakfast column
filter_3.select('with_family_breakfast','children','babies').distinct().show()

+---------------------+--------+------+
|with_family_breakfast|children|babies|
+---------------------+--------+------+
|                  Yes|       0|     2|
|                  Yes|       1|     0|
|                  Yes|       0|     1|
|                  Yes|       3|     0|
|                   No|       0|     0|
|                  Yes|       2|     0|
|                  Yes|       2|     1|
|                  Yes|       1|     1|
|                  Yes|      10|     0|
+---------------------+--------+------+

