In [0]:
df = spark.read.option("header", True) \
               .option("delimiter", "\t") \
               .csv("dbfs:/FileStore/shared_uploads/pekamwar.s@northeastern.edu/Seattle_Pet_Licenses_new__1_.tsv")


# Preview the data
df.show(5)

+------------------+--------------+------------+-------+-----------------+------------------+--------+
|License_Issue_Date|License_Number|Animals_Name|Species|    Primary_Breed|   Secondary_Breed|ZIP_Code|
+------------------+--------------+------------+-------+-----------------+------------------+--------+
|  December 18 2015|       S107948|         Zen|    Cat|Domestic Longhair|               Mix|   98117|
|      June 14 2016|       S116503|       Misty|    Cat|         Siberian|              null|   98117|
|    August 04 2016|       S119301|        Lyra|    Cat|              Mix|              null|   98121|
|    August 10 2019|       S133113|      Spider|    Cat|           LaPerm|              null|   98115|
|  November 20 2020|         77412|       Gemma|    Cat|          Siamese|American Shorthair|   98126|
+------------------+--------------+------------+-------+-----------------+------------------+--------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import to_date

# Convert License_Issue_Date to a valid date format
df = df.withColumn("License_Issue_Date", to_date("License_Issue_Date", "MMMM dd yyyy"))

# Verify the conversion
df.select("License_Issue_Date").show(5, truncate=False)

# Print schema to confirm data type change
df.printSchema()

+------------------+
|License_Issue_Date|
+------------------+
|2015-12-18        |
|2016-06-14        |
|2016-08-04        |
|2019-08-10        |
|2020-11-20        |
+------------------+
only showing top 5 rows

root
 |-- License_Issue_Date: date (nullable = true)
 |-- License_Number: string (nullable = true)
 |-- Animals_Name: string (nullable = true)
 |-- Species: string (nullable = true)
 |-- Primary_Breed: string (nullable = true)
 |-- Secondary_Breed: string (nullable = true)
 |-- ZIP_Code: string (nullable = true)



In [0]:
from pyspark.sql.functions import col

# Convert ZIP_Code and License_Number to integer data type
df = df.withColumn("ZIP_Code", col("ZIP_Code").cast("int")) \
       .withColumn("License_Number", col("License_Number").cast("int"))

# Display the schema to confirm the changes
df.printSchema()

# Show a few rows to verify the data
df.show()

root
 |-- License_Issue_Date: date (nullable = true)
 |-- License_Number: integer (nullable = true)
 |-- Animals_Name: string (nullable = true)
 |-- Species: string (nullable = true)
 |-- Primary_Breed: string (nullable = true)
 |-- Secondary_Breed: string (nullable = true)
 |-- ZIP_Code: integer (nullable = true)

+------------------+--------------+-------------+-------+------------------+------------------+--------+
|License_Issue_Date|License_Number| Animals_Name|Species|     Primary_Breed|   Secondary_Breed|ZIP_Code|
+------------------+--------------+-------------+-------+------------------+------------------+--------+
|        2015-12-18|          null|          Zen|    Cat| Domestic Longhair|               Mix|   98117|
|        2016-06-14|          null|        Misty|    Cat|          Siberian|              null|   98117|
|        2016-08-04|          null|         Lyra|    Cat|               Mix|              null|   98121|
|        2019-08-10|          null|       Spider|    

In [0]:
from pyspark.sql.functions import col, lit, monotonically_increasing_id, current_date, year, month, dayofweek, date_format

In [0]:
Breed_Dim = df.select("Species", "Primary_Breed", "Secondary_Breed") \
    .dropDuplicates() \
    .dropna() \
    .withColumn("Breed_SK", monotonically_increasing_id() + 1) \
    .withColumn("DI_LOAD_Dt", current_date()) \
    .select("Breed_SK", "Species", "Primary_Breed", "Secondary_Breed", "DI_LOAD_Dt")

In [0]:
Breed_Dim.show()  # Shows first 20 rows
Breed_Dim.printSchema()

+--------+-------+--------------------+--------------------+----------+
|Breed_SK|Species|       Primary_Breed|     Secondary_Breed|DI_LOAD_Dt|
+--------+-------+--------------------+--------------------+----------+
|       1|    Cat|            Balinese|                 Mix|2025-04-05|
|       2|    Dog| Retriever, Labrador|               Hound|2025-04-05|
|       3|    Dog|         Poodle, Toy|            Shepherd|2025-04-05|
|       4|    Dog|   Poodle, Miniature|Dachshund, Miniat...|2025-04-05|
|       5|    Dog|      Great Pyrenees|     German Shepherd|2025-04-05|
|       6|    Dog|Chihuahua, Short ...|     Chinese Crested|2025-04-05|
|       7|    Dog|             Whippet|Terrier, American...|2025-04-05|
|       8|    Dog|   Poodle, Miniature|      Terrier, Cairn|2025-04-05|
|       9|    Dog|   Doberman Pinscher|      Collie, Smooth|2025-04-05|
|      10|    Dog|Australian Cattle...|        Basset Hound|2025-04-05|
|      11|    Dog| Australian Shepherd|Dachshund, Standa...|2025

In [0]:
Species_Dim = df.select("Species").dropDuplicates().na.drop() \
    .withColumn("Species_SK", monotonically_increasing_id() + 1) \
    .withColumn("DI_LOAD_Dt", current_date()) \
    .select("Species_SK", "Species", "DI_LOAD_Dt")

In [0]:
Species_Dim.show()  # Shows first 20.printSchema()

+----------+-------+----------+
|Species_SK|Species|DI_LOAD_Dt|
+----------+-------+----------+
|         1|    Pig|2025-04-05|
|         2|    Cat|2025-04-05|
|         3|   Goat|2025-04-05|
|         4|    Dog|2025-04-05|
+----------+-------+----------+



In [0]:
Species_Dim.printSchema()

root
 |-- Species_SK: long (nullable = false)
 |-- Species: string (nullable = true)
 |-- DI_LOAD_Dt: date (nullable = false)



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, date_format, current_date, expr
from pyspark.sql.types import DateType
import datetime

# Define the start and end dates for the range
start_date = datetime.date(2015, 1, 1)  # Starting date
end_date = datetime.date(2025, 12, 31)  # Ending date

# Generate a list of dates between start_date and end_date
date_list = [(start_date + datetime.timedelta(days=x)) for x in range((end_date - start_date).days + 1)]

# Create a DataFrame from the date list
spark = SparkSession.builder.getOrCreate()
date_df = spark.createDataFrame(date_list, DateType()).toDF("Date")
date_df.show(5)  # Display the first few rows

+----------+
|      Date|
+----------+
|2015-01-01|
|2015-01-02|
|2015-01-03|
|2015-01-04|
|2015-01-05|
+----------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import year, month, dayofmonth, dayofweek, date_format

# Add attributes to create the Date Dimension Table
Date_Dim = date_df \
    .withColumn("Date_SK", monotonically_increasing_id() + 1) \
    .withColumn("Year", year("Date")) \
    .withColumn("Month_Name", date_format("Date", "MMMM")) \
    .withColumn("Day_Name", date_format("Date", "EEEE")) \
    .withColumn("DI_LOAD_Dt", current_date()) \
    .select("Date_SK", "Year", "Month_Name", "Day_Name", "DI_LOAD_Dt")

# Display the Date Dimension Table
Date_Dim.show(10)

+-------+----+----------+---------+----------+
|Date_SK|Year|Month_Name| Day_Name|DI_LOAD_Dt|
+-------+----+----------+---------+----------+
|      1|2015|   January| Thursday|2025-04-05|
|      2|2015|   January|   Friday|2025-04-05|
|      3|2015|   January| Saturday|2025-04-05|
|      4|2015|   January|   Sunday|2025-04-05|
|      5|2015|   January|   Monday|2025-04-05|
|      6|2015|   January|  Tuesday|2025-04-05|
|      7|2015|   January|Wednesday|2025-04-05|
|      8|2015|   January| Thursday|2025-04-05|
|      9|2015|   January|   Friday|2025-04-05|
|     10|2015|   January| Saturday|2025-04-05|
+-------+----+----------+---------+----------+
only showing top 10 rows



In [0]:
Date_Dim.printSchema()

root
 |-- Date_SK: long (nullable = false)
 |-- Year: integer (nullable = true)
 |-- Month_Name: string (nullable = true)
 |-- Day_Name: string (nullable = true)
 |-- DI_LOAD_Dt: date (nullable = false)



In [0]:
# Get the current notebook path
notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()

# Extract the folder path (excluding the notebook name)
folder_path = notebook_path[:notebook_path.rfind("/")]

print(f"Notebook Path: {notebook_path}")
print(f"Folder Path: {folder_path}")

Notebook Path: /Users/pekamwar.s@northeastern.edu/Untitled Notebook 2025-04-05 14:51:11
Folder Path: /Users/pekamwar.s@northeastern.edu


In [0]:
# Define Delta table save path based on notebook folder
delta_save_path = f"/Users/pekamwar.s@northeastern.edu/Breed_Dim"

# Save Breed_Dim table
Breed_Dim.write.format("delta").mode("overwrite").save(delta_save_path)

print(f"Breed_Dim saved at: {delta_save_path}")

Breed_Dim saved at: /Users/pekamwar.s@northeastern.edu/Breed_Dim


In [0]:
dbutils.fs.ls(delta_save_path)

Out[19]: [FileInfo(path='dbfs:/Users/pekamwar.s@northeastern.edu/Breed_Dim/_delta_log/', name='_delta_log/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/Users/pekamwar.s@northeastern.edu/Breed_Dim/part-00000-2ba9428e-ddb6-4ac5-9b6c-55913e0ab69c-c000.snappy.parquet', name='part-00000-2ba9428e-ddb6-4ac5-9b6c-55913e0ab69c-c000.snappy.parquet', size=28620, modificationTime=1743879859000)]

In [0]:
# Define Delta table save paths based on notebook folder
species_dim_save_path = f"/Users/pekamwar.s@northeastern.edu/Species_Dim"
date_dim_save_path = f"/Users/pekamwar.s@northeastern.edu/Date_Dim"


# Save Species_Dim table
Species_Dim.write.format("delta").mode("overwrite").save(species_dim_save_path)
print(f"Species_Dim saved at: {species_dim_save_path}")

# Save Date_Dim table
Date_Dim.write.format("delta").mode("overwrite").save(date_dim_save_path)
print(f"Date_Dim saved at: {date_dim_save_path}")


Species_Dim saved at: /Users/pekamwar.s@northeastern.edu/Species_Dim
Date_Dim saved at: /Users/pekamwar.s@northeastern.edu/Date_Dim


In [0]:
df_geo = spark.read.option("header", True) \
               .csv("dbfs:/FileStore/shared_uploads/pekamwar.s@northeastern.edu/geo_data__1_.csv")


# Preview the data
df_geo.show(5)

+----------+-------+----------+-------+----------+----------+
|state_fips|  state|state_abbr|zipcode|    county|      city|
+----------+-------+----------+-------+----------+----------+
|         1|Alabama|        AL|  35004| St. Clair|     Acmar|
|         1|Alabama|        AL|  35005| Jefferson|Adamsville|
|         1|Alabama|        AL|  35006| Jefferson|     Adger|
|         1|Alabama|        AL|  35007|    Shelby|  Keystone|
|         1|Alabama|        AL|  35010|Tallapoosa|  New site|
+----------+-------+----------+-------+----------+----------+
only showing top 5 rows



In [0]:
df_geo = df_geo.withColumnRenamed("zipcode", "ZIP_Code")
df_geo.show(5)

+----------+-------+----------+--------+----------+----------+
|state_fips|  state|state_abbr|ZIP_Code|    county|      city|
+----------+-------+----------+--------+----------+----------+
|         1|Alabama|        AL|   35004| St. Clair|     Acmar|
|         1|Alabama|        AL|   35005| Jefferson|Adamsville|
|         1|Alabama|        AL|   35006| Jefferson|     Adger|
|         1|Alabama|        AL|   35007|    Shelby|  Keystone|
|         1|Alabama|        AL|   35010|Tallapoosa|  New site|
+----------+-------+----------+--------+----------+----------+
only showing top 5 rows



In [0]:
%sql
SELECT * 
FROM delta.`/Users/pekamwar.s@northeastern.edu/Breed_Dim`
LIMIT 10;

Breed_SK,Species,Primary_Breed,Secondary_Breed,DI_LOAD_Dt
1,Cat,Balinese,Mix,2025-04-05
2,Dog,"Retriever, Labrador",Hound,2025-04-05
3,Dog,"Poodle, Toy",Shepherd,2025-04-05
4,Dog,"Poodle, Miniature","Dachshund, Miniature Smooth Haired",2025-04-05
5,Dog,Great Pyrenees,German Shepherd,2025-04-05
6,Dog,"Chihuahua, Short Coat",Chinese Crested,2025-04-05
7,Dog,Whippet,"Terrier, American Pit Bull",2025-04-05
8,Dog,"Poodle, Miniature","Terrier, Cairn",2025-04-05
9,Dog,Doberman Pinscher,"Collie, Smooth",2025-04-05
10,Dog,Australian Cattle Dog,Basset Hound,2025-04-05


In [0]:
df_geo.printSchema()

root
 |-- state_fips: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_abbr: string (nullable = true)
 |-- ZIP_Code: integer (nullable = true)
 |-- county: string (nullable = true)
 |-- city: string (nullable = true)



In [0]:
from pyspark.sql.types import IntegerType

# Convert zipcode column to integer type
df_geo = df_geo.withColumn("ZIP_Code", df_geo["ZIP_Code"].cast("int"))


# Verify the schema to confirm the change
df_geo.printSchema()

# Show a few rows to verify the data
df_geo.show()

root
 |-- state_fips: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_abbr: string (nullable = true)
 |-- ZIP_Code: integer (nullable = true)
 |-- county: string (nullable = true)
 |-- city: string (nullable = true)

+----------+-------+----------+--------+----------+------------+
|state_fips|  state|state_abbr|ZIP_Code|    county|        city|
+----------+-------+----------+--------+----------+------------+
|         1|Alabama|        AL|   35004| St. Clair|       Acmar|
|         1|Alabama|        AL|   35005| Jefferson|  Adamsville|
|         1|Alabama|        AL|   35006| Jefferson|       Adger|
|         1|Alabama|        AL|   35007|    Shelby|    Keystone|
|         1|Alabama|        AL|   35010|Tallapoosa|    New site|
|         1|Alabama|        AL|   35014| Talladega|      Alpine|
|         1|Alabama|        AL|   35016|  Marshall|        Arab|
|         1|Alabama|        AL|   35019|   Cullman|   Baileyton|
|         1|Alabama|        AL|   35020| Je

In [0]:
Location_Dim = df_geo.select("State", "City", "ZIP_Code").dropDuplicates().dropna() \
    .withColumn("Location_SK", monotonically_increasing_id() +1) \
    .withColumn("DI_LOAD_Dt", current_date()) \
    .select("Location_SK", "State", "City", "ZIP_Code", "DI_LOAD_Dt")


In [0]:
Location_Dim.show()  # Shows first 20 rows
Location_Dim.printSchema()

+-----------+----------+---------------+--------+----------+
|Location_SK|     State|           City|ZIP_Code|DI_LOAD_Dt|
+-----------+----------+---------------+--------+----------+
|          1|   Alabama|    Mount olive|   35117|2025-04-05|
|          2|   Alabama|      Beaverton|   35544|2025-04-05|
|          3|   Alabama|         Dozier|   36028|2025-04-05|
|          4|   Arizona|        Phoenix|   85019|2025-04-05|
|          5|  Arkansas|          Poyen|   72128|2025-04-05|
|          6|  Arkansas|        Proctor|   72376|2025-04-05|
|          7|  Arkansas|            Bay|   72411|2025-04-05|
|          8|  Arkansas|           Onia|   72663|2025-04-05|
|          9|  Arkansas|    Natural dam|   72948|2025-04-05|
|         10|California|    Los angeles|   90013|2025-04-05|
|         11|California|North hollywood|   91605|2025-04-05|
|         12|California|         Covina|   91723|2025-04-05|
|         13|California|       La verne|   91750|2025-04-05|
|         14|California|

In [0]:
Pet_Lic_Fact = df \
    .join(Breed_Dim, ["Species", "Primary_Breed", "Secondary_Breed"], "left") \
    .join(Location_Dim, "ZIP_Code", "left") \
    .withColumn("Pet_Lic_SK", monotonically_increasing_id()+1) \
    .withColumn("DI_LOAD_Dt", current_date()) \
    .select(
        "Pet_Lic_SK",
        "License_Issue_Date",
        "ZIP_Code",
        "Breed_SK",
        "Location_SK",
        "DI_LOAD_Dt"
    )


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-1309401348079964>:1[0m
[0;32m----> 1[0m Pet_Lic_Fact [38;5;241m=[39m [43mdf[49m[43m [49m[43m\[49m
[1;32m      2[0m [43m    [49m[38;5;241;43m.[39;49m[43mjoin[49m[43m([49m[43mBreed_Dim[49m[43m,[49m[43m [49m[43m[[49m[38;5;124;43m"[39;49m[38;5;124;43mSpecies[39;49m[38;5;124;43m"[39;49m[43m,[49m[43m [49m[38;5;124;43m"[39;49m[38;5;124;43mPrimary_Breed[39;49m[38;5;124;43m"[39;49m[43m,[49m[43m [49m[38;5;124;43m"[39;49m[38;5;124;43mSecondary_Breed[39;49m[38;5;124;43m"[39;49m[43m][49m[43m,[49m[43m [49m[38;5;124;43m"[39;49m[38;5;124;43mleft[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m [49m[43m\[49m
[1;32m      3[0m [43m    [49m[38;5;241;43m.[39;49m[43mjoin[49m[43m([49m[43mLocation_Dim[49m[43m,[49m[43m [49m[38;5;124;43m"

In [0]:
from pyspark.sql.functions import col, current_date, monotonically_increasing_id

# Add aliases to DataFrames
df_alias = df.alias("fact")
Breed_Dim_alias = Breed_Dim.alias("breed")
Location_Dim_alias = Location_Dim.alias("location")

# Perform the join with aliases
Pet_Lic_Fact = df_alias \
    .join(Breed_Dim_alias, ["Species", "Primary_Breed", "Secondary_Breed"], "left") \
    .join(Location_Dim_alias, ["ZIP_Code"], "left") \
    .withColumn("Pet_Lic_SK", monotonically_increasing_id() + 1) \
    .withColumn("DI_LOAD_Dt", current_date()) \
    .select(
        col("Pet_Lic_SK"),
        col("fact.License_Issue_Date"),
        col("fact.ZIP_Code"),
        col("breed.Breed_SK"),
        col("location.Location_SK")
    )

# Display schema and preview data
Pet_Lic_Fact.printSchema()
Pet_Lic_Fact.show(5, truncate=False)


root
 |-- Pet_Lic_SK: long (nullable = false)
 |-- License_Issue_Date: date (nullable = true)
 |-- ZIP_Code: integer (nullable = true)
 |-- Breed_SK: long (nullable = true)
 |-- Location_SK: long (nullable = true)

+----------+------------------+--------+--------+-----------+
|Pet_Lic_SK|License_Issue_Date|ZIP_Code|Breed_SK|Location_SK|
+----------+------------------+--------+--------+-----------+
|1         |2015-12-18        |98117   |262     |8172       |
|2         |2016-06-14        |98117   |null    |8172       |
|3         |2016-08-04        |98121   |null    |8970       |
|4         |2019-08-10        |98115   |null    |23767      |
|5         |2020-11-20        |98126   |1049    |16409      |
+----------+------------------+--------+--------+-----------+
only showing top 5 rows



In [0]:
# Define Delta table save paths based on notebook folder
location_dim_save_path = f"/Users/pekamwar.s@northeastern.edu/Location_Dim"
fact_save_path = f"/Users/pekamwar.s@northeastern.edu/Pet_Lic_Fact"


# Save Species_Dim table
Location_Dim.write.format("delta").mode("overwrite").save(location_dim_save_path)
print(f"Location_Dim saved at: {species_dim_save_path}")

# Save Date_Dim table
Pet_Lic_Fact.write.format("delta").mode("overwrite").save(fact_save_path)
print(f"Pet_Lic_Fact saved at: {date_dim_save_path}")

Location_Dim saved at: /Users/pekamwar.s@northeastern.edu/Species_Dim
Pet_Lic_Fact saved at: /Users/pekamwar.s@northeastern.edu/Date_Dim


In [0]:
%sql
SELECT * 
FROM delta.`/Users/pekamwar.s@northeastern.edu/Breed_Dim`
LIMIT 10;

Breed_SK,Species,Primary_Breed,Secondary_Breed,DI_LOAD_Dt
1,Cat,Balinese,Mix,2025-04-05
2,Dog,"Retriever, Labrador",Hound,2025-04-05
3,Dog,"Poodle, Toy",Shepherd,2025-04-05
4,Dog,"Poodle, Miniature","Dachshund, Miniature Smooth Haired",2025-04-05
5,Dog,Great Pyrenees,German Shepherd,2025-04-05
6,Dog,"Chihuahua, Short Coat",Chinese Crested,2025-04-05
7,Dog,Whippet,"Terrier, American Pit Bull",2025-04-05
8,Dog,"Poodle, Miniature","Terrier, Cairn",2025-04-05
9,Dog,Doberman Pinscher,"Collie, Smooth",2025-04-05
10,Dog,Australian Cattle Dog,Basset Hound,2025-04-05


In [0]:
%sql
SELECT * 
FROM delta.`/Users/pekamwar.s@northeastern.edu/Species_Dim`;

Species_SK,Species,DI_LOAD_Dt
1,Pig,2025-04-05
2,Cat,2025-04-05
3,Goat,2025-04-05
4,Dog,2025-04-05


In [0]:
%sql
SELECT * 
FROM delta.`/Users/pekamwar.s@northeastern.edu/Date_Dim`
LIMIT 10;

Date_SK,Year,Month_Name,Day_Name,DI_LOAD_Dt
1,2015,January,Thursday,2025-04-05
2,2015,January,Friday,2025-04-05
3,2015,January,Saturday,2025-04-05
4,2015,January,Sunday,2025-04-05
5,2015,January,Monday,2025-04-05
6,2015,January,Tuesday,2025-04-05
7,2015,January,Wednesday,2025-04-05
8,2015,January,Thursday,2025-04-05
9,2015,January,Friday,2025-04-05
10,2015,January,Saturday,2025-04-05


In [0]:
%sql
SELECT * 
FROM delta.`/Users/pekamwar.s@northeastern.edu/Location_Dim`
LIMIT 10;

Location_SK,State,City,ZIP_Code,DI_LOAD_Dt
1,Alabama,Mount olive,35117,2025-04-05
2,Alabama,Beaverton,35544,2025-04-05
3,Alabama,Dozier,36028,2025-04-05
4,Arizona,Phoenix,85019,2025-04-05
5,Arkansas,Poyen,72128,2025-04-05
6,Arkansas,Proctor,72376,2025-04-05
7,Arkansas,Bay,72411,2025-04-05
8,Arkansas,Onia,72663,2025-04-05
9,Arkansas,Natural dam,72948,2025-04-05
10,California,Los angeles,90013,2025-04-05


In [0]:
%sql
SELECT * 
FROM delta.`/Users/pekamwar.s@northeastern.edu/Pet_Lic_Fact`
where Breed_SK is NOT NULL
LIMIT 10;

Pet_Lic_SK,License_Issue_Date,ZIP_Code,Breed_SK,Location_SK
1,2015-12-18,98117,262,8172
5,2020-11-20,98126,1049,16409
6,2021-02-03,98136,2746,31093
8,2021-04-09,98117,2746,8172
9,2021-04-28,98125,2824,6615
10,2021-05-07,98125,2259,6615
11,2021-05-28,98104,2746,6280
12,2021-07-19,98105,200,31582
14,2021-07-23,98117,262,8172
15,2021-08-03,98125,2746,6615
