In [2]:
import sys
import os

# Add url to root folder
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))

from pyspark.sql import SparkSession
from delta import *
from etl.extract_data import extract_data
from etl.transform_data import clean_data, format_column_name
from etl.load_data import load_data


Set up Spark + Delta Lake

In [3]:
# Setup Spark + Delta Lake ( define DeltaSparkSessionExtension and DeltaCatalog)
builder = SparkSession.builder.appName("DeltaLake") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()
print("Spark + Delta Lake setup is successful!")

Spark + Delta Lake setup is successful!


Extract and Transform data from a file

In [4]:
# Extract data from a file .csv
df = extract_data(spark, "../data/tourism_dataset.csv")
df_clean = clean_data(df)
df_formatted = format_column_name(df_clean)
new_df = df_formatted
try: 
    new_df.show(5)
except Exception as e:
    print(e)

+----------+---+--------------------+-----------------------+-------------+------------+--------------------+-------------+--------+--------------+--------------------+-----------------------+---------------------+------------+
|Tourist_ID|Age|           Interests|Preferred_Tour_Duration|Accessibility|   Site_Name|       Sites_Visited|Tour_Duration|Route_ID|Tourist_Rating|System_Response_Time|Recommendation_Accuracy|VR_Experience_Quality|Satisfaction|
+----------+---+--------------------+-----------------------+-------------+------------+--------------------+-------------+--------+--------------+--------------------+-----------------------+---------------------+------------+
|         1| 48|['Architecture', ...|                      5|        False|Eiffel Tower|['Eiffel Tower', ...|            7|    1000|           1.6|                3.73|                     97|                  4.5|           3|
|         2| 37|['Cultural', 'Nat...|                      6|        False|   Colosseum|

Save data to a Delta table


In [5]:
# Save data to a Delta table
load_data(new_df, "../data/delta_table")

In [7]:
# Read data from a Delta table
df_delta = spark.read.format("delta").load("../data/delta_table")
# try:
#     df_delta.show(5)
# except Exception as e:
#     print(e)


In [9]:
df_delta.show(5)
df_delta.createOrReplaceTempView("tourisms")

+----------+---+--------------------+-----------------------+-------------+------------+--------------------+-------------+--------+--------------+--------------------+-----------------------+---------------------+------------+
|Tourist_ID|Age|           Interests|Preferred_Tour_Duration|Accessibility|   Site_Name|       Sites_Visited|Tour_Duration|Route_ID|Tourist_Rating|System_Response_Time|Recommendation_Accuracy|VR_Experience_Quality|Satisfaction|
+----------+---+--------------------+-----------------------+-------------+------------+--------------------+-------------+--------+--------------+--------------------+-----------------------+---------------------+------------+
|         1| 48|['Architecture', ...|                      5|        False|Eiffel Tower|['Eiffel Tower', ...|            7|    1000|           1.6|                3.73|                     97|                  4.5|           3|
|         2| 37|['Cultural', 'Nat...|                      6|        False|   Colosseum|

In [10]:
result = spark.sql("""
        SELECT Age, SUM(Preferred_Tour_Duration) AS Count_Tour_Duration
        FROM tourisms
        GROUP BY Age
        ORDER BY Age ASC
    """)
result.show()

+---+-------------------+
|Age|Count_Tour_Duration|
+---+-------------------+
| 18|              518.0|
| 19|              451.0|
| 20|              499.0|
| 21|              466.0|
| 22|              448.0|
| 23|              526.0|
| 24|              486.0|
| 25|              664.0|
| 26|              537.0|
| 27|              602.0|
| 28|              632.0|
| 29|              564.0|
| 30|              520.0|
| 31|              460.0|
| 32|              524.0|
| 33|              557.0|
| 34|              490.0|
| 35|              471.0|
| 36|              505.0|
| 37|              483.0|
+---+-------------------+
only showing top 20 rows

