## Get the data from data Breaks datasets

In [0]:
%fs ls "/databricks-datasets/"

path,name,size,modificationTime
dbfs:/databricks-datasets/COVID/,COVID/,0,1732274950282
dbfs:/databricks-datasets/README.md,README.md,976,1561418533000
dbfs:/databricks-datasets/Rdatasets/,Rdatasets/,0,1732274950282
dbfs:/databricks-datasets/SPARK_README.md,SPARK_README.md,3359,1561418605000
dbfs:/databricks-datasets/adult/,adult/,0,1732274950282
dbfs:/databricks-datasets/airlines/,airlines/,0,1732274950282
dbfs:/databricks-datasets/amazon/,amazon/,0,1732274950282
dbfs:/databricks-datasets/asa/,asa/,0,1732274950282
dbfs:/databricks-datasets/atlas_higgs/,atlas_higgs/,0,1732274950282
dbfs:/databricks-datasets/bikeSharing/,bikeSharing/,0,1732274950282


In [0]:
files = dbutils.fs.ls("/databricks-datasets/weather/")
display(files)


path,name,size,modificationTime
dbfs:/databricks-datasets/weather/README.weather_history.md,README.weather_history.md,750,1561431420000
dbfs:/databricks-datasets/weather/high_temps,high_temps,19180,1561431420000
dbfs:/databricks-datasets/weather/low_temps,low_temps,19180,1561431420000


In [0]:
%python
# List available datasets in the /databricks-datasets directory
dbutils.fs.ls("/databricks-datasets")

# Read the specified file from the dataset with header
data_high = spark.read.option("header", "true").csv("/databricks-datasets/weather/high_temps")

# Display the data
display(data_high)

date,temp
2015-01-01,42
2015-01-02,42
2015-01-03,41
2015-01-04,51
2015-01-05,54
2015-01-06,54
2015-01-07,46
2015-01-08,46
2015-01-09,50
2015-01-10,46


In [0]:
%python
# List available datasets in the /databricks-datasets directory
dbutils.fs.ls("/databricks-datasets")

# Read the specified file from the dataset with header
data_low = spark.read.option("header", "true").csv("/databricks-datasets/weather/low_temps")

# Display the data
display(data_low)

date,temp
2015-01-01,26
2015-01-02,32
2015-01-03,35
2015-01-04,38
2015-01-05,49
2015-01-06,43
2015-01-07,42
2015-01-08,35
2015-01-09,38
2015-01-10,43


## store the raw data in the Bronze Layer

In [0]:
dbutils.fs.mounts()


[MountInfo(mountPoint='/databricks-datasets', source='databricks-datasets', encryptionType=''),
 MountInfo(mountPoint='/Volumes', source='UnityCatalogVolumes', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-tracking', source='databricks/mlflow-tracking', encryptionType=''),
 MountInfo(mountPoint='/databricks-results', source='databricks-results', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-registry', source='databricks/mlflow-registry', encryptionType=''),
 MountInfo(mountPoint='/Volume', source='DbfsReserved', encryptionType=''),
 MountInfo(mountPoint='/volumes', source='DbfsReserved', encryptionType=''),
 MountInfo(mountPoint='/', source='DatabricksRoot', encryptionType=''),
 MountInfo(mountPoint='/volume', source='DbfsReserved', encryptionType='')]

In [0]:
data_low.createOrReplaceTempView("data_low_view")

query = """
CREATE OR REPLACE TABLE low_temps_bronze_table AS
SELECT * FROM data_low_view
"""

spark.sql(query)

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
%python
data_high.createOrReplaceTempView("data_high_view")

query = """
CREATE OR REPLACE TABLE high_temps_bronze_table AS
SELECT * FROM data_high_view
"""

spark.sql(query)

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

## Data Cleaning and Transformation then Storing in Silver Layer

In [0]:
from pyspark.sql.functions import to_date, col

# Convert date to standard format and temp to integer
data_high_cleaned = data_high.withColumn("date", to_date(col("date"), "yyyy-MM-dd")) \
                             .withColumn("temp", col("temp").cast("integer"))

# Remove duplicates
data_high_cleaned = data_high_cleaned.dropDuplicates()

# Display the cleaned data
display(data_high_cleaned)

date,temp
2015-03-03,51
2015-03-12,64
2015-04-15,57
2015-08-09,83
2016-06-30,73
2016-09-13,77
2018-09-06,85
2015-04-14,53
2015-07-02,93
2015-11-02,52


In [0]:

# Convert date to standard format and temp to integer
data_low_cleaned = data_low.withColumn("date", to_date(col("date"), "yyyy-MM-dd")) \
                             .withColumn("temp", col("temp").cast("integer"))

# Remove duplicates
data_low_cleaned = data_low_cleaned.dropDuplicates()

# Display the cleaned data
display(data_low_cleaned)

date,temp
2015-12-07,47
2016-02-25,39
2016-07-03,56
2017-01-02,26
2017-12-21,29
2018-04-18,41
2018-09-27,52
2015-03-30,51
2015-07-19,63
2015-10-09,54


In [0]:
data_low_cleaned.createOrReplaceTempView("data_low_cleaned_view")

query = """
CREATE OR REPLACE TABLE low_temps_silver_table AS
SELECT * FROM data_low_cleaned_view
"""

spark.sql(query)

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
data_high_cleaned.createOrReplaceTempView("data_high_cleaned_view")

query = """
CREATE OR REPLACE TABLE high_temps_silver_table AS
SELECT * FROM data_high_cleaned_view
"""

spark.sql(query)

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

## Create the fact table

In [0]:
from pyspark.sql.functions import col, year, month, dayofmonth

# Create the fact table by joining the cleaned high and low temperature data
fact_temps = data_high_cleaned.alias("high").join(
    data_low_cleaned.alias("low"),
    on="date",
    how="inner"
).select(
    col("high.date").alias("date"),
    col("high.temp").alias("high_temp"),
    col("low.temp").alias("low_temp")
).withColumn(
    "year", year(col("date"))
).withColumn(
    "month", month(col("date"))
).withColumn(
    "day", dayofmonth(col("date"))
)

display(fact_temps)

date,high_temp,low_temp,year,month,day
2015-03-03,51,32,2015,3,3
2015-03-12,64,49,2015,3,12
2015-04-15,57,38,2015,4,15
2015-08-09,83,59,2015,8,9
2016-06-30,73,55,2016,6,30
2016-09-13,77,51,2016,9,13
2018-09-06,85,56,2018,9,6
2015-04-14,53,37,2015,4,14
2015-07-02,93,64,2015,7,2
2015-11-02,52,45,2015,11,2


In [0]:
fact_temps.createOrReplaceTempView("fact_temps_view")
query = """
CREATE OR REPLACE TABLE gold_fact_temps AS
SELECT * FROM fact_temps_view
"""

spark.sql(query)

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

## 