In [46]:
area_dim=spark.read.format('parquet').load('Files/Dimension_model/area_dim')
contribution_collision=spark.read.format('parquet').load('Files/Dimension_model/contribution_collision')
contribution_dim=spark.read.format('parquet').load('Files/Dimension_model/contribution_dim')
date_dim=spark.read.format('parquet').load('Files/Dimension_model/date_dim')
time_dim=spark.read.format('parquet').load('Files/Dimension_model/time_dim')
vehicle_collision=spark.read.format('parquet').load('Files/Dimension_model/vehicle_collision')
vehicle_dim=spark.read.format('parquet').load('Files/Dimension_model/vehicle_dim')
stg_data=spark.read.format('parquet').load('Files/Cleaned_Data')

StatementMeta(, 6b831b08-61ea-4dcb-8338-88b8c3ada4b0, 48, Finished, Available)

In [53]:
area_dim.createOrReplaceTempView("area_dim")
contribution_collision.createOrReplaceTempView("contribution_collision")
vehicle_collision.createOrReplaceTempView("vehicle_collision")
contribution_dim.createOrReplaceTempView("contribution_dim")
date_dim.createOrReplaceTempView("date_dim")
time_dim.createOrReplaceTempView("time_dim")
stg_data.createOrReplaceTempView("stg_data")

StatementMeta(, 6b831b08-61ea-4dcb-8338-88b8c3ada4b0, 55, Finished, Available)

In [54]:
df = spark.read.parquet("Files/Dimension_model/date_dim")
# df now is a Spark DataFrame containing parquet data from "Files/Dimension_model/fact_collision/part-00000-bba4c2f2-3322-4446-b030-5da296589d68-c000.snappy.parquet".
display(df)

StatementMeta(, 6b831b08-61ea-4dcb-8338-88b8c3ada4b0, 56, Finished, Available)

SynapseWidget(Synapse.DataFrame, 6fd505f7-5936-45df-9aa3-8074a68da837)

In [56]:
spark.sql("""
   SELECT COLLISION_ID,count(vehicle_id) from vehicle_collision
   group by COLLISION_ID
""").show()

StatementMeta(, 6b831b08-61ea-4dcb-8338-88b8c3ada4b0, 58, Finished, Available)

+------------+-----------------+
|COLLISION_ID|count(vehicle_id)|
+------------+-----------------+
|     3507998|                5|
|     3482075|                3|
|     4658032|                1|
|     3162237|                2|
|     4631621|                2|
|     4184402|                1|
|     4669831|                1|
|     3547498|                2|
|     4077449|                2|
|     4716204|                2|
|     4333424|                1|
|     4690010|                2|
|     3706116|                2|
|     3802732|                2|
|     4091574|                2|
|     4174605|                2|
|     4596643|                2|
|     3445704|                2|
|     3808513|                2|
|     4695578|                2|
+------------+-----------------+
only showing top 20 rows



In [58]:
fact_collision=spark.sql("""

   SELECT s.COLLISION_ID,a.Area_ID,Date_id,time_ID,`NUMBER OF PERSONS INJURED`,`NUMBER OF PERSONS KILLED`,`NUMBER OF PEDESTRIANS INJURED`
           `NUMBER OF PEDESTRIANS KILLED`,`NUMBER OF CYCLIST INJURED`,`NUMBER OF CYCLIST KILLED`,
           `NUMBER OF MOTORIST INJURED`,`NUMBER OF MOTORIST KILLED`, Collision_cnt,Vehicle_cnt
           
        FROM stg_data s
        JOIN area_dim a
        ON s.`ZIP CODE` = a.`ZIP CODE`
        AND s.`LATITUDE` = a.`LATITUDE`
        AND s.`LONGITUDE` = a.`LONGITUDE`
        AND s.`BOROUGH` = a.`BOROUGH`

        join time_dim t
        on s.`CRASH TIME`=t.`CRASH TIME`

        join date_dim d
        on date_format(d.CRASH_DATE,"MM/dd/yyyy")=s.`CRASH DATE`
        
        join (
                SELECT COLLISION_ID,count(contribution_id) Collision_cnt from contribution_collision
                group by COLLISION_ID) c
                on s.COLLISION_ID=c.COLLISION_ID

        join (
                SELECT COLLISION_ID,count(vehicle_id) Vehicle_cnt from vehicle_collision 
                group by COLLISION_ID ) v   
                on s.COLLISION_ID=v.COLLISION_ID         
""")

StatementMeta(, 6b831b08-61ea-4dcb-8338-88b8c3ada4b0, 60, Finished, Available)

In [61]:
display(fact_collision)

StatementMeta(, 6b831b08-61ea-4dcb-8338-88b8c3ada4b0, 63, Finished, Available)

SynapseWidget(Synapse.DataFrame, 71ad373a-814c-4c48-91b7-42750cc9dd89)

In [62]:
fact_collision.count()

StatementMeta(, 6b831b08-61ea-4dcb-8338-88b8c3ada4b0, 64, Finished, Available)

2080787

In [63]:
fact_collision.write.mode("overwrite").parquet("Files/Dimension_model/fact_collision")

StatementMeta(, 6b831b08-61ea-4dcb-8338-88b8c3ada4b0, 65, Finished, Available)

### Creating Global Tables

In [64]:
from pyspark.sql.functions import col
#loading Fact in Tables
# Renaming columns to remove spaces and special characters
fact_collision = fact_collision.select(
    [col(c).alias(c.replace(' ', '_').replace('-', '_')) for c in fact_collision.columns]
)
# Now write the dataframe with the sanitized column names
fact_collision.write.mode("overwrite").saveAsTable("fact_collision")


StatementMeta(, 6b831b08-61ea-4dcb-8338-88b8c3ada4b0, 66, Finished, Available)

In [65]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col

def sanitize_column_names(df: DataFrame) -> DataFrame:
    """
    Sanitize column names of the DataFrame by replacing spaces, dashes, and other special characters with underscores.
    """
    return df.select(
        [col(c).alias(c.replace(' ', '_').replace('-', '_').replace('.', '_')) for c in df.columns]
    )

def save_sanitized_tables(table_tuples: list):
    """
    Takes a list of tuples where each tuple contains a DataFrame and its new table name.
    Sanitizes the column names of each DataFrame and saves it as a table.
    
    :param table_tuples: List of tuples (DataFrame, new_table_name)
    """
    for df, new_table_name in table_tuples:
        sanitized_df = sanitize_column_names(df)
        sanitized_df.write.mode("overwrite").saveAsTable(new_table_name)

# Assuming you have DataFrame variables named exactly as the table names:
tables_to_save = [
    (area_dim, "area_dim"),
    (contribution_collision, "contribution_collision"),
    (contribution_dim, "contribution_dim"),
    (date_dim, "date_dim"),
    (time_dim, "time_dim"),
    (fact_collision, "fact_collision"),
    (vehicle_dim, "vehicle_dim"),
    (vehicle_collision, "vehicle_collision")
]

save_sanitized_tables(tables_to_save)

StatementMeta(, 6b831b08-61ea-4dcb-8338-88b8c3ada4b0, 67, Finished, Available)