In [0]:
from  pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

In [0]:
dbutils.fs.ls('/FileStore/tables/')

Out[22]: [FileInfo(path='dbfs:/FileStore/tables/bookings_etl/', name='bookings_etl/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/hotels-1.csv', name='hotels-1.csv', size=16855599, modificationTime=1685177400000),
 FileInfo(path='dbfs:/FileStore/tables/hotels.csv', name='hotels.csv', size=16855599, modificationTime=1685168971000)]

In [0]:
#Reading the data and creating a spark dataframe.
bookings_df=spark.read.format("csv").option("header", "true") \
    .option("inferSchema", "true") \
    .option("sep", ",") \
    .load('/FileStore/tables/hotels.csv')   

In [0]:
bookings_df.count()

Out[24]: 119390

In [0]:
# Fetching the records of bookings where market segment is Tour operator
Tour_operator_df=bookings_df.filter(bookings_df.market_segment == "Offline TA/TO")

In [0]:
# Creating  the arrival_date field by combining the arrival_date_year, arrival_date_month and arrival_date_day_of_month columns.
Arrival_date_df = Tour_operator_df.withColumn("arrival_date", to_date(concat(Tour_operator_df['arrival_date_year'].cast("string"), lit('-'), Tour_operator_df['arrival_date_month'], lit('-'), Tour_operator_df['arrival_date_day_of_month'].cast("string")),"yyyy-MMMM-d"))


In [0]:
# Creating the departure_date field by adding the stays_in_weekend_nights and stays_in_week_nights columns to the arrival_date field using sql 
Departure_date_df = Arrival_date_df.withColumn("departure_date",expr("date_add(arrival_date,stays_in_weekend_nights+stays_in_week_nights)"))

In [0]:
# Creating new column "with_family_breakfast" as 'yes' if sum of children and babies is greater then 0 else 'No'
Family_breakfast_df=Departure_date_df.withColumn('with_family_breakfast',when(Departure_date_df.children.cast('int')+Departure_date_df.babies> 0,"Yes").otherwise("No"))                          

In [0]:
file_path = "/FileStore/tables/bookings_etl"
# Write the DataFrame as a Parquet file in overwrite mode
Family_breakfast_df.write.mode("overwrite").parquet(file_path)

In [0]:
Family_breakfast_df.count()

Out[30]: 24219

# Validation

In [0]:
#For validating whether we have filtered only market segment is Tour operator
Filter_validation_df=Family_breakfast_df.select("market_segment").distinct()
Filter_validation_df.show(truncate=False)

+--------------+
|market_segment|
+--------------+
|Offline TA/TO |
+--------------+



In [0]:
#Validating whether write is successful and checking count by reading back the stored df
df_read=spark.read.parquet("/FileStore/tables/bookings_etl/")
df_read.count()

Out[32]: 24219

In [0]:
#Fetching 10 records for testing
df_read=df_read.orderBy(rand()).limit(10)
df_test=df_read.select("arrival_date_year","arrival_date_month","arrival_date_day_of_month","arrival_date","stays_in_weekend_nights","stays_in_week_nights","departure_date","children","babies","with_family_breakfast")
display(df_test)

arrival_date_year,arrival_date_month,arrival_date_day_of_month,arrival_date,stays_in_weekend_nights,stays_in_week_nights,departure_date,children,babies,with_family_breakfast
2016,November,19,2016-11-19,2,1,2016-11-22,0,0,No
2015,October,2,2015-10-02,1,2,2015-10-05,0,0,No
2016,November,25,2016-11-25,0,2,2016-11-27,0,0,No
2016,September,11,2016-09-11,2,1,2016-09-14,0,0,No
2017,March,16,2017-03-16,1,3,2017-03-20,0,0,No
2016,May,13,2016-05-13,1,2,2016-05-16,0,0,No
2016,February,21,2016-02-21,2,1,2016-02-24,0,0,No
2015,October,4,2015-10-04,2,0,2015-10-06,0,0,No
2016,May,20,2016-05-20,0,2,2016-05-22,0,0,No
2016,September,21,2016-09-21,0,4,2016-09-25,0,0,No
