# Project - Divvy Bike Database with Microsoft Azure
![1671528760953.png](attachment:06e44686-d6a0-42fb-a8ee-7d2d2b3f3cc1.png)

### Overview

### Data Details

### Data Schemas Design

##  1. Load the data into the Postgres SQL server on Microsoft Azure

1. Creating the connection to the server

In [1]:
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

########################################
# Create the connection string info    #
########################################
host = "postgres-database-flex.postgres.database.azure.com"
user = "admin_user"
password = "WHRdsrs:991028"

# Create a new DB
sslmode = "require"
dbname = "divvy-database-postgres"
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)
conn = psycopg2.connect(conn_string)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT);
print("Connection established")

cursor = conn.cursor()
cursor.execute('DROP DATABASE IF EXISTS divvy_database_postgres')
cursor.execute("CREATE DATABASE divvy_database_postgres")
# Clean up initial connection
conn.commit()
cursor.close()
conn.close()

# Reconnect to the new DB
dbname = "divvy_database_postgres"
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)
conn = psycopg2.connect(conn_string)
print("Connection established")
cursor = conn.cursor()


Connection established
Connection established


2. Creating some helper functions to load the data

In [2]:
# Helper functions
# Drop the tables and recreate the table
def drop_recreate(c, tablename, create):
    c.execute("DROP TABLE IF EXISTS {0};".format(tablename))
    c.execute(create)
    print("Finished creating table {0}".format(tablename))

# Fast load the data from CSV files into the postgres database
def populate_table(c, filename, tablename):
    f = open(filename, 'r')
    try:
        ## use copy to bulk load the file into the specified table
        cursor.copy_from(f, tablename, sep=",", null = "")
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
    print("Finished populating {0}".format(tablename))


3. Creating the according tables

In [3]:
# Create Rider table
table = "rider"
filename = './data/riders.csv'
create = "CREATE TABLE rider (rider_id INTEGER PRIMARY KEY, first VARCHAR(50), last VARCHAR(50), address VARCHAR(100), birthday DATE, account_start_date DATE, account_end_date DATE, is_member BOOLEAN);"

drop_recreate(cursor, table, create)
populate_table(cursor, filename, table)

# Create Payment table
table = "payment"
filename = './data/payments.csv'
create = "CREATE TABLE payment (payment_id INTEGER PRIMARY KEY, date DATE, amount MONEY, rider_id INTEGER);"

drop_recreate(cursor, table, create)
populate_table(cursor, filename, table)

# Create Station table
table = "station"
filename = './data/stations.csv'
create = "CREATE TABLE station (station_id VARCHAR(50) PRIMARY KEY, name VARCHAR(75), latitude FLOAT, longitude FLOAT);"

drop_recreate(cursor, table, create)
populate_table(cursor, filename, table)

# Create Trip table
table = "trip"
filename = './data/trips.csv'
create = "CREATE TABLE trip (trip_id VARCHAR(50) PRIMARY KEY, rideable_type VARCHAR(75), start_at TIMESTAMP, ended_at TIMESTAMP, start_station_id VARCHAR(50), end_station_id VARCHAR(50), rider_id INTEGER);"

drop_recreate(cursor, table, create)
populate_table(cursor, filename, table)

# Clean up
conn.commit()
cursor.close()
conn.close()

print("All done!")

Finished creating table rider
Finished populating rider
Finished creating table payment
Finished populating payment
Finished creating table station
Finished populating station
Finished creating table trip
Finished populating trip
All done!


#### After creating the table, check if the table successfully created in the postgres server
![Screenshot 2025-04-07 at 9.18.57 PM.png](attachment:7e3160c4-3e96-4fcc-9436-d32f39291499.png)

## 2. Extract the data from the Postgres Server into the Blob Storage
![Screenshot 2025-04-07 at 9.33.56 PM.png](attachment:30d2b551-db7e-49ab-8f3b-7315c7039c2b.png)

## 3. Load the data from Blob Storage into the Staging Table

In [13]:
!pip install pyodbc

Collecting pyodbc
  Downloading pyodbc-5.2.0-cp313-cp313-macosx_11_0_arm64.whl.metadata (2.7 kB)
Downloading pyodbc-5.2.0-cp313-cp313-macosx_11_0_arm64.whl (72 kB)
Installing collected packages: pyodbc
Successfully installed pyodbc-5.2.0


In [109]:
## creating the connection to the Synapse workspace
import pyodbc
## test the package
print(pyodbc.drivers())

server = "divvy-database-synapse-ondemand.sql.azuresynapse.net"
database = "sql_divvy_database"
username = "sqladminuser"
password = "WHRdsrs:991028"
driver = "{ODBC Driver 17 for SQL Server}"  

conn = pyodbc.connect(
    f"DRIVER={driver};SERVER={server};DATABASE={database};UID={username};PWD={password};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;"
)
cursor = conn.cursor()

['ODBC Driver 17 for SQL Server']


#### run the T-SQL transcript

In [110]:
sql_script = [
    """
    IF NOT EXISTS (SELECT * FROM sys.external_file_formats WHERE name = 'SynapseDelimitedTextFormat') 
	CREATE EXTERNAL FILE FORMAT [SynapseDelimitedTextFormat] 
	WITH ( FORMAT_TYPE = DELIMITEDTEXT ,
	       FORMAT_OPTIONS (
			 FIELD_TERMINATOR = ',',
			 USE_TYPE_DEFAULT = FALSE
			))
    """,
    """
    IF NOT EXISTS (SELECT * FROM sys.external_data_sources WHERE name = 'gen2filesystem_haoranwang_dfs_core_windows_net') 
	CREATE EXTERNAL DATA SOURCE [gen2filesystem_haoranwang_dfs_core_windows_net] 
	WITH (
		LOCATION = 'abfss://gen2filesystem@haoranwang.dfs.core.windows.net' 
	)
    """,
    """
    CREATE EXTERNAL TABLE dbo.staging_payment (
	[PaymentId] bigint,
	[Date] datetime2(0),
	[Amount] float,
	[RiderId] bigint
	)
	WITH (
	LOCATION = 'payment.csv',
	DATA_SOURCE = [gen2filesystem_haoranwang_dfs_core_windows_net],
	FILE_FORMAT = [SynapseDelimitedTextFormat]
	)
    """]


for stmt in sql_script:
    try:
        cursor.execute(stmt)
        if cursor.description:  # means it's a SELECT or result-returning query
            rows = cursor.fetchall()
            for row in rows:
                print(row)
        conn.commit()
    except Exception as e:
        print(f"Error running statement:\n{stmt}\n{e}\n")


![Screenshot 2025-04-08 at 4.49.29 PM.png](attachment:e28bc53f-0f9e-4ecd-89ab-46488e2c20d4.png)
**repeat the above procedure, create the rest three staging tables**

In [112]:
sql_script = [
    """
    IF NOT EXISTS (SELECT * FROM sys.external_file_formats WHERE name = 'SynapseDelimitedTextFormat') 
	CREATE EXTERNAL FILE FORMAT [SynapseDelimitedTextFormat] 
	WITH ( FORMAT_TYPE = DELIMITEDTEXT ,
	       FORMAT_OPTIONS (
			 FIELD_TERMINATOR = ',',
			 USE_TYPE_DEFAULT = FALSE
			));
    """,
    """
    IF NOT EXISTS (SELECT * FROM sys.external_data_sources WHERE name = 'gen2filesystem_haoranwang_dfs_core_windows_net') 
	CREATE EXTERNAL DATA SOURCE [gen2filesystem_haoranwang_dfs_core_windows_net] 
	WITH (
		LOCATION = 'abfss://gen2filesystem@haoranwang.dfs.core.windows.net' 
	);
    """,
    """
    CREATE EXTERNAL TABLE dbo.staging_trip (
	[trip_id] nvarchar(500),
	[rideable_type] nvarchar(500),
	[started_at] datetime2(0),
	[ended_at] datetime2(0),
	[start_station_id] nvarchar(500),
	[end_station_id] nvarchar(500),
	[member_id] bigint
	)
	WITH (
	LOCATION = 'trip.csv',
	DATA_SOURCE = [gen2filesystem_haoranwang_dfs_core_windows_net],
	FILE_FORMAT = [SynapseDelimitedTextFormat]
	)
    """]

for stmt in sql_script:
    try:
        cursor.execute(stmt)
        if cursor.description:  # means it's a SELECT or result-returning query
            rows = cursor.fetchall()
            for row in rows:
                print(row)
        conn.commit()
    except Exception as e:
        print(f"Error running statement:\n{stmt}\n{e}\n")

select top 10 entries from the trip table
![Screenshot 2025-04-08 at 5.11.09 PM.png](attachment:5548f648-fd1e-4d10-83c3-01caf17dc8b7.png)

In [107]:
sql_script = [
    """
    IF NOT EXISTS (SELECT * FROM sys.external_file_formats WHERE name = 'SynapseDelimitedTextFormat') 
	CREATE EXTERNAL FILE FORMAT [SynapseDelimitedTextFormat] 
	WITH ( FORMAT_TYPE = DELIMITEDTEXT ,
	       FORMAT_OPTIONS (
			 FIELD_TERMINATOR = ',',
			 USE_TYPE_DEFAULT = FALSE
			));
    """,
    """
    IF NOT EXISTS (SELECT * FROM sys.external_data_sources WHERE name = 'gen2filesystem_haoranwang_dfs_core_windows_net') 
	CREATE EXTERNAL DATA SOURCE [gen2filesystem_haoranwang_dfs_core_windows_net] 
	WITH (
		LOCATION = 'abfss://gen2filesystem@haoranwang.dfs.core.windows.net' 
	);
    """,
    """
    CREATE EXTERNAL TABLE dbo.staging_rider (
	[rider_id] bigint,
	[first_name] nvarchar(500),
	[last_name] nvarchar(500),
	[address] nvarchar(500),
	[birthday] datetime2(0),
	[start_date] datetime2(0),
	[end_date] datetime2(0),
	[Member] bit
	)
	WITH (
	LOCATION = 'rider.csv',
	DATA_SOURCE = [gen2filesystem_haoranwang_dfs_core_windows_net],
	FILE_FORMAT = [SynapseDelimitedTextFormat]
	);
    """
]

for stmt in sql_script:
    try:
        cursor.execute(stmt)
        if cursor.description:  # means it's a SELECT or result-returning query
            rows = cursor.fetchall()
            for row in rows:
                print(row)
        conn.commit()
    except Exception as e:
        print(f"Error running statement:\n{stmt}\n{e}\n")


select top 10 to check the query
![Screenshot 2025-04-08 at 5.03.15 PM.png](attachment:1edad22f-1082-4d5e-93a7-96f451aae486.png)

In [111]:
sql_script = [
    """
    IF NOT EXISTS (SELECT * FROM sys.external_file_formats WHERE name = 'SynapseDelimitedTextFormat') 
	CREATE EXTERNAL FILE FORMAT [SynapseDelimitedTextFormat] 
	WITH ( FORMAT_TYPE = DELIMITEDTEXT ,
	       FORMAT_OPTIONS (
			 FIELD_TERMINATOR = ',',
			 USE_TYPE_DEFAULT = FALSE
			));
    """,
    """
    IF NOT EXISTS (SELECT * FROM sys.external_data_sources WHERE name = 'gen2filesystem_haoranwang_dfs_core_windows_net') 
	CREATE EXTERNAL DATA SOURCE [gen2filesystem_haoranwang_dfs_core_windows_net] 
	WITH (
		LOCATION = 'abfss://gen2filesystem@haoranwang.dfs.core.windows.net' 
	);
    """,
    """
    CREATE EXTERNAL TABLE dbo.staging_station (
	[StationId] nvarchar(500),
	[name] nvarchar(500),
	[latitude] float,
	[longtitude] float
	)
	WITH (
	LOCATION = 'station.csv',
	DATA_SOURCE = [gen2filesystem_haoranwang_dfs_core_windows_net],
	FILE_FORMAT = [SynapseDelimitedTextFormat]
	);
    """
]

for stmt in sql_script:
    try:
        cursor.execute(stmt)
        if cursor.description:  # means it's a SELECT or result-returning query
            rows = cursor.fetchall()
            for row in rows:
                print(row)
        conn.commit()
    except Exception as e:
        print(f"Error running statement:\n{stmt}\n{e}\n")



In [4]:
## clean up the connection
cursor.close()
conn.close()

![Screenshot 2025-04-08 at 5.07.48 PM.png](attachment:19871c43-4661-4445-a263-fd74aa4ac773.png)

**All the data has been successfully loaded into Blob Storage. We will now use the staging tables in the Gen2 Data Lake to transform the data schemas into star schemas, completing the TRANSFORM step.**

## 4. Transform the staging tables into the star schema illustrated above to facilitate the OLAP process.

### the following T-SQL queries ran in the Synapse Workspace

In [None]:
## Creating the CETAS - the remodlled star schema tables

IF NOT EXISTS (SELECT * FROM sys.external_file_formats WHERE name = 'SynapseDelimitedTextFormat') 
	CREATE EXTERNAL FILE FORMAT [SynapseDelimitedTextFormat] 
	WITH ( FORMAT_TYPE = DELIMITEDTEXT ,
	       FORMAT_OPTIONS (
			 FIELD_TERMINATOR = ',',
			 USE_TYPE_DEFAULT = FALSE
			));
GO

 IF NOT EXISTS (SELECT * FROM sys.external_data_sources WHERE name = 'gen2filesystem_haoranwang_dfs_core_windows_net') 
	CREATE EXTERNAL DATA SOURCE [gen2filesystem_haoranwang_dfs_core_windows_net] 
	WITH (
		LOCATION = 'abfss://gen2filesystem@haoranwang.dfs.core.windows.net' 
	);
GO

IF OBJECT_ID('dbo.fact_payment') IS NOT NULL
    BEGIN
    DROP EXTERNAL TABLE [dbo].[fact_payment];
    END
GO

CREATE EXTERNAL TABLE dbo.fact_payment
    WITH (
    LOCATION     = 'fact_payment',
    DATA_SOURCE = [gen2filesystem_haoranwang_dfs_core_windows_net],
    FILE_FORMAT = [SynapseDelimitedTextFormat]
    )  AS
    SELECT [PaymentId], [Amount], [Date] AS [payment_date], [RiderId] AS [account_number]
    FROM [dbo].[staging_payment]
GO

SELECT TOP 10 * FROM dbo.fact_payment;
GO

**Table Demonstration**
![Screenshot 2025-04-09 at 9.38.53 PM.png](attachment:5dd2bc91-bb1f-4690-a246-c2d83d608482.png)

In [None]:
IF NOT EXISTS (SELECT * FROM sys.external_file_formats WHERE name = 'SynapseDelimitedTextFormat') 
	CREATE EXTERNAL FILE FORMAT [SynapseDelimitedTextFormat] 
	WITH ( FORMAT_TYPE = DELIMITEDTEXT ,
	       FORMAT_OPTIONS (
			 FIELD_TERMINATOR = ',',
			 USE_TYPE_DEFAULT = FALSE
			));
GO

 IF NOT EXISTS (SELECT * FROM sys.external_data_sources WHERE name = 'gen2filesystem_haoranwang_dfs_core_windows_net') 
	CREATE EXTERNAL DATA SOURCE [gen2filesystem_haoranwang_dfs_core_windows_net] 
	WITH (
		LOCATION = 'abfss://gen2filesystem@haoranwang.dfs.core.windows.net' 
	);
GO

IF OBJECT_ID('dbo.fact_trip') IS NOT NULL
    BEGIN
    DROP EXTERNAL TABLE [dbo].[fact_trip];
    END
GO

CREATE EXTERNAL TABLE dbo.fact_trip
    WITH (
    LOCATION     = 'fact_trip',
    DATA_SOURCE = [gen2filesystem_haoranwang_dfs_core_windows_net],
    FILE_FORMAT = [SynapseDelimitedTextFormat]
    )  AS
    SELECT [trip_id], [started_at], [ended_at], [member_id],[start_station_id], [end_station_id], DATEDIFF(SECOND, started_at, ended_at) AS [ride_duration_seconds], [rideable_type], [birthday]
    FROM [dbo].[staging_trip] AS T JOIN [dbo].[staging_rider] AS R
    ON T.[member_id] = R.[rider_id]
GO

SELECT TOP 10 * FROM [dbo].[fact_trip];
GO

**This is the result CETAS of the fact trip table**
![Screenshot 2025-04-09 at 9.52.17 PM.png](attachment:6d8c4852-3451-4312-9ae1-5c938d23e6cf.png)

In [None]:
IF NOT EXISTS (SELECT * FROM sys.external_file_formats WHERE name = 'SynapseDelimitedTextFormat') 
	CREATE EXTERNAL FILE FORMAT [SynapseDelimitedTextFormat] 
	WITH ( FORMAT_TYPE = DELIMITEDTEXT ,
	       FORMAT_OPTIONS (
			 FIELD_TERMINATOR = ',',
			 USE_TYPE_DEFAULT = FALSE
			));
GO

 IF NOT EXISTS (SELECT * FROM sys.external_data_sources WHERE name = 'gen2filesystem_haoranwang_dfs_core_windows_net') 
	CREATE EXTERNAL DATA SOURCE [gen2filesystem_haoranwang_dfs_core_windows_net] 
	WITH (
		LOCATION = 'abfss://gen2filesystem@haoranwang.dfs.core.windows.net' 
	);
GO

IF OBJECT_ID('dbo.dim_station') IS NOT NULL
    BEGIN
    DROP EXTERNAL TABLE [dbo].[dim_station];
    END
GO

CREATE EXTERNAL TABLE dbo.dim_station
    WITH (
    LOCATION     = 'dim_station',
    DATA_SOURCE = [gen2filesystem_haoranwang_dfs_core_windows_net],
    FILE_FORMAT = [SynapseDelimitedTextFormat]
    )  AS
    SELECT [StationId], [name], [latitude], [longtitude]
    FROM [dbo].[staging_station]
GO


SELECT TOP 10 * FROM dbo.dim_station;
GO

**Result for the Dim_station table**
![Screenshot 2025-04-09 at 10.09.06 PM.png](attachment:a70bf991-4334-4231-ba1d-c1cadedbce70.png)

In [None]:
IF NOT EXISTS (SELECT * FROM sys.external_file_formats WHERE name = 'SynapseDelimitedTextFormat') 
	CREATE EXTERNAL FILE FORMAT [SynapseDelimitedTextFormat] 
	WITH ( FORMAT_TYPE = DELIMITEDTEXT ,
	       FORMAT_OPTIONS (
			 FIELD_TERMINATOR = ',',
			 USE_TYPE_DEFAULT = FALSE
			));
GO

 IF NOT EXISTS (SELECT * FROM sys.external_data_sources WHERE name = 'gen2filesystem_haoranwang_dfs_core_windows_net') 
	CREATE EXTERNAL DATA SOURCE [gen2filesystem_haoranwang_dfs_core_windows_net] 
	WITH (
		LOCATION = 'abfss://gen2filesystem@haoranwang.dfs.core.windows.net' 
	);
GO

IF OBJECT_ID('dbo.dim_rider') IS NOT NULL
    BEGIN
    DROP EXTERNAL TABLE [dbo].[dim_rider];
    END
GO

CREATE EXTERNAL TABLE dbo.dim_rider
    WITH (
    LOCATION     = 'dim_rider',
    DATA_SOURCE = [gen2filesystem_haoranwang_dfs_core_windows_net],
    FILE_FORMAT = [SynapseDelimitedTextFormat]
    )  AS
    SELECT [rider_id], [first_name], [last_name], [address],[birthday],[start_date],[end_date],[member]
    FROM [dbo].[staging_rider]
GO


SELECT TOP 10 * FROM dbo.dim_rider;
GO

**Dim_Rider details**
![Screenshot 2025-04-09 at 10.31.57 PM.png](attachment:8675bc70-3b08-44bb-8506-855c0d05ae8f.png)

**Repeat all the steps above, create the date table**
![Screenshot 2025-04-10 at 1.45.04 AM.png](attachment:7a072bd6-70b4-4d98-9791-033af5e75153.png)

**Now we have done the remodelling for the schemas, with 2 fact tables and 3 dimension tables**
![Screenshot 2025-04-10 at 1.48.44 AM.png](attachment:d5357aba-6cce-4787-b5b4-930c544338ad.png)