## Introduction
This Jupyter notebook is designed to help you understand the fundamental concepts of data engineering and apply them in practice by building a dataset that you can use for your hackathon project.

Data engineering is the process of designing, building, testing, and maintaining the systems and infrastructure required to collect, store, process, and analyze large volumes of data. It involves a range of skills and technologies, including database design, data modeling, data integration, data processing, data warehousing, and data visualization.

In this workshop, we will cover some of the key concepts of data engineering at a basic level, including data collection, data cleaning, data integration, data storage, data processing and data visualization.

Data engineering is getting more and more important in software development with applications and SaaS products becoming more and more data-driven. Developing solutions with a data first mindset is a key skill for any software engineer.

## Step 1 - Analysis of the provided data

To begin the process, we first analyze the data provided by the client, which is in the form of a CSV file. This analysis involves understanding the different data types present and the overall distribution of the data. Additionally, we also examine the quality of the data, which includes assessing its completeness and consistency with the business rules.

In [None]:
import pandas as pd

# Read work order template provided - datasets/ville_gatineau_data.xlsx
work_order_template = pd.read_excel('/home/jovyan/work/datasets/ville_gatineau_data.xlsx', sheet_name='Bon de travail1')

# Read delivery order template provided - datasets/ville_gatineau_data.xlsx
delivery_order_template = pd.read_excel('/home/jovyan/work/datasets/ville_gatineau_data.xlsx', sheet_name='Feuille de livraison1')

work_order_template

### Results

Upon examining the data, we have observed that it is partially complete and inconsistent. However, before we can effectively use this data for analyses and to develop the features needed for the hackathon, there are a few issues that must be addressed.

1. The addresses provided in the dataset are not real addresses
2. The addresses lack geolocation information (latitude, longitude, geometry) for certain itineraries use cases
3. Several columns contain missing values (NaN)
4. There is unnamed columns at the end of the dataset that contains no data

## Step 2 - Data collection
Instead of relying on the provided data, we will collect our own data from public sources.

Data collection involves gathering data from various sources such as databases, APIs, web scraping, or user input. The data can be structured or unstructured, and may be stored in various formats such as CSV, JSON, or XML. Generally, data collection is automated using scripts or programs that can be run periodically to collect new data.

In the upcoming step, we aim to source data from public sources to address the previously identified issues. Acquiring high-quality data will help us in developing better features and conducting thorough analyses. For this purpose, we will obtain data from the Open Data section of the Ville de Gatineau website.

1. Addresses - Public locations within the city of Gatineau
2. Sectors - Administrative divisions of the city of Gatineau
3. Districts - Administrative divisions of the city of Gatineau

In [None]:

import pandas as pd

# Read lieux publics data from Gatineau open data portal without downloading
# https://www.gatineau.ca/upload/donneesouvertes/LIEU_PUBLIC.csv
addresses = pd.read_csv('https://www.gatineau.ca/upload/donneesouvertes/LIEU_PUBLIC.csv', sep=',')

# Read sectors data from Gatineau open data portal without downloading
# https://www.gatineau.ca/upload/donneesouvertes/LIEU_PUBLIC.csv
sectors = pd.read_csv('https://www.gatineau.ca/upload/donneesouvertes/DECOUPAGE_ADMINISTRATIF.csv', sep=',')

# Read districts data from Gatineau open data portal without downloading
# https://www.gatineau.ca/upload/donneesouvertes/DISTRICT_ELECTORAUX_A_VENIR_2021.csv
districts = pd.read_csv('https://www.gatineau.ca/upload/donneesouvertes/DISTRICT_ELECTORAUX_A_VENIR_2021.csv', sep=',')

# Read mock customer data from open data portals
customers1 = pd.read_csv('/home/jovyan/work/datasets/mock_customers1.csv', sep=',')
customers2 = pd.read_csv('/home/jovyan/work/datasets/mock_customers2.csv', sep=',')

In [None]:
addresses.head()

In [None]:
sectors.head()

In [None]:
districts.head()

In [None]:
customers = pd.concat([customers1, customers2], ignore_index=True)
customers

## Step 3 - Data cleaning
Now that we have real data to work with, we can begin the process of cleaning the data.

Data cleaning involves removing invalid or irrelevant data, handling missing values, formatting data, and ensuring data accuracy. Some of the techniques used in data cleaning include data profiling, data validation, and data standardization.


**Addresses - Selecting & Profiling**

In [None]:
from ydata_profiling import ProfileReport

# Select the relevant columns and rename them to match the work order template
addresses = addresses[['CODEID', 'TYPE', 'ADR_COMPLE', 'GEOM']]
addresses.columns = ['ADDRESS_ID', 'TYPE_NAME', 'FULL_ADDRESS', 'GEOM']

# Execute ydata_profiling to gather intelligence about the data
profile = ProfileReport(addresses, title="Profiling Report")
profile.to_notebook_iframe()

**Addresses - Cleaning**

In [None]:
# Remove rows with null values, empty strings, and non-numeric values
addresses = addresses[
    addresses['FULL_ADDRESS'].notnull() &
    addresses['FULL_ADDRESS'].str.match(r'^\d+') &
    addresses['FULL_ADDRESS'].str.strip() != ''
]

# Remove rows with null values, empty strings, and non-numeric values
addresses = addresses[addresses['FULL_ADDRESS'].notna()]

# Identify non-unique/duplicate rows based on FULL_ADDRESS column
dups = addresses[addresses.duplicated(subset=['FULL_ADDRESS'], keep=False)].sort_values(by=['FULL_ADDRESS'])

if len(dups):
    print(f'Non-unique addresses identified: {len(dups)}')

# Remove non-unique/duplicate addresses
addresses = addresses.drop_duplicates(subset=['FULL_ADDRESS'], keep='first')

# Execute ydata_profiling to gather intelligence about the cleaned data
profile = ProfileReport(addresses, title="Addresses Profiling Report")
profile.to_notebook_iframe()

**Customers - Selecting & Profiling**

In [None]:
from ydata_profiling import ProfileReport

# Select the relevant columns and rename them to match the work order template
customers = customers[['first_name', 'last_name', 'email', 'phonenumber']]
customers.columns = ['FIRSTNAME', 'LASTNAME', 'EMAIL', 'PHONENUMBER']

# Execute ydata_profiling to gather intelligence about the data
profile = ProfileReport(customers, title="Customers Profiling Report")
profile.to_notebook_iframe()

**Sectors - Selecting & Profiling**

In [None]:
# Select the relevant columns and rename them to match the work order template
sectors = sectors[['CODEID', 'NOM', 'GEOM']]
sectors.columns = ['SECTOR_ID', 'SECTOR_NAME', 'GEOM']

# Identify non-unique/duplicate rows based on SECTOR_NAME column
dups = sectors[sectors.duplicated(subset=['SECTOR_NAME'], keep=False)].sort_values(by=['SECTOR_NAME'])
if len(dups):
    print(f'Non-unique sectors identified: {len(dups)}')

# Remove non-unique/duplicate sectors
sectors = sectors.drop_duplicates(subset=['SECTOR_NAME'], keep='first')

# Execute ydata_profiling to gather intelligence about the cleaned data
profile = ProfileReport(sectors.drop(columns=['GEOM']), title="Profiling Report")
profile.to_notebook_iframe()

**Districts - Selecting & Profiling**

In [None]:
# Districts - select the columns we need and rename to readable names
districts = districts[['CODEID', 'DISTRICT', 'GEOM']]
districts.columns = ['DISTRICT_ID', 'DISTRICT_NAME', 'GEOM']

# Identify non-unique/duplicate districts based on DISTRICT_NAME column
dups = districts[districts.duplicated(subset=['DISTRICT_NAME'], keep=False)].sort_values(by=['DISTRICT_NAME'])
if len(dups):
    print(f'Non-unique districts identified: {len(dups)}')

# Remove non-unique/duplicate districts
districts = districts.drop_duplicates(subset=['DISTRICT_NAME'], keep='first')

profile = ProfileReport(districts.drop(columns=['GEOM']), title="Profiling Report")
profile.to_notebook_iframe()

districts.head()

**Converting to Geopandas**

In this step, we convert the dataframes to geopandas dataframes. This will allow us to perform spatial operations on the dataframes. 

However, when attempting to convert the dataframes, we encounter errors. This is because the dataframes contain invalid geometries. To fix this issue, we need to convert the geometries to the correct format.

We use a simple string replace to fix the issue.

In [None]:
import geopandas as gpd
from shapely import wkt

# Create a geopandas dataframe from the addresses dataframe
addresses_gdf = gpd.GeoDataFrame(addresses, geometry=gpd.GeoSeries(addresses['GEOM'].apply(wkt.loads)))
addresses_gdf = addresses_gdf.drop(columns=['GEOM'])

# Create a geopandas dataframe from the sectors dataframe and convert the GEOM column to a valid POLYGON
sectors['GEOM'] = sectors['GEOM'].apply(lambda x: x.replace('POLYGON (', 'POLYGON ((').replace(')', '))'))
sectors_gdf = gpd.GeoDataFrame(sectors, geometry=gpd.GeoSeries(sectors['GEOM'].apply(wkt.loads)))
sectors_gdf = sectors_gdf.drop(columns=['GEOM'])

# Create a sectors dataframe without the GEOM column for application use
sectors = sectors.drop(columns=['GEOM', 'geometry'])

# Create a geopandas dataframe from the sectors dataframe
districts_gdf = gpd.GeoDataFrame(districts, geometry=gpd.GeoSeries(districts['GEOM'].apply(wkt.loads)))
districts_gdf = districts_gdf.drop(columns=['GEOM'])

# Create a districts dataframe without the GEOM column for application use
districts = districts.drop(columns=['GEOM', 'geometry'])

print('Geopandas conversion completed')


## Step 5 - Data integration

Data integration involves combining data from multiple sources and integrating them into a single dataset. This is typically done using tools such as ETL (extract, transform, load) or ELT (extract, load, transform) processes. Data integration is a critical step in the data engineering process as it allows us to combine data from multiple sources into a single dataset that can be used for analysis and reporting.

In [None]:
# Perform a spatial join between the addresses and districts dataframes based on the geometry columns
join1 = gpd.sjoin(addresses_gdf, districts_gdf, how='left', predicate='within')
join1 = join1[['ADDRESS_ID', 'FULL_ADDRESS', 'DISTRICT_NAME', 'geometry']]

# Perform a spatial join between addresses and sectors dataframes based on the geometry columns
join2 = gpd.sjoin(addresses_gdf, sectors_gdf, how='left', predicate='within')
join2 = join2[['ADDRESS_ID', 'FULL_ADDRESS', 'SECTOR_NAME', 'geometry']]

# Merge the join1 and join2 dataframes based on the geometry column
joined = addresses_gdf.merge(join1.merge(join2, on='geometry'), on='geometry')
joined = joined[['ADDRESS_ID', 'FULL_ADDRESS', 'SECTOR_NAME', 'DISTRICT_NAME', 'geometry']]

# Execute ydata_profiling to gather intelligence about the cleaned data
profile = ProfileReport(joined.drop(columns=['geometry']), title="Profiling Report")
profile.to_notebook_iframe()

## Step 6 - Creating a mock dataset
Data transformation involves converting the data into a format suitable for analysis (Dimensions and Facts), including normalization, aggregation, or discretization. Some common data transformation techniques include pivot tables, joins, and aggregations. Generally, performant tools like Spark and dbt will allow transformation at scale.

Generally, transformations are performed on a dataset that is stored in a data warehouse. However, for the purposes of this workshop, we will perform the transformations using our pandas dataframes.

In this step, we mimic the process of data transformation by creating a mock dataset. This dataset will be used for developing the features needed for the hackathon.

In [None]:
import random
pd.set_option('display.max_columns', None)

# List the supported bins for the application (coming from the work order template)
supported_bins = [
    'BAC BLEU - 360',
    'BAC BRUN - 45',
    'BAC BRUN - 80',
    'BAC BRUN - 120',
    'BAC BRUN - 240',
    'BAC GRIS - 120',
    'BAC GRIS - 240',
    'BAC GRIS - 360',
]

# Create a copy of geopandas dataframe work_order_template and remove all rows
orders = work_order_template.copy()
orders = orders.iloc[0:0]

# Remove unwanted columns Unnamed: 36,Unnamed: 37
orders = orders.drop(columns=['Unnamed: 36', 'Unnamed: 37'])

# Create work orders for each addresses in the joined dataframe
orders['ID']                    = joined['FULL_ADDRESS'].apply(lambda x: random.randint(5330000000, 5520000000))
orders['STREET_NUMBER']         = joined['FULL_ADDRESS'].str.extract(r'^(\d+)')
orders['STREET']                = joined['FULL_ADDRESS'].str.extract(r'^\d+\s(.*)')
orders['SECTOR']                = joined['SECTOR_NAME']
orders['CITY']                  = 'Gatineau'
orders['DISTRICT']              = joined['DISTRICT_NAME']
orders['NOTE']                  = orders['NOTE'].apply(lambda x: ['RUE', 'COUR'][random.randint(0, 1)])
orders['CLIENT_TYPE']           = 'AUTRES'
orders['BILLING_TYPE']          = orders['BILLING_TYPE'].apply(lambda x: ['RESIDENTIEL', 'MUNICIPAL'][random.randint(0, 1)])
orders['REQUEST_NUMBER']        = orders['REQUEST_NUMBER'].apply(lambda x: f'F{random.randint(400000, 500000)}')
orders['REQUEST']               = 'Réparation'
orders['BIN']                   = orders['BIN'].apply(lambda x: supported_bins[random.randint(0, 1)])
orders['PART']                  = orders['PART'].apply(lambda x: ['BODY', 'COUVERCLE', 'ROUE'][random.randint(0, 2)])
orders['STATUS']                = orders['PART'].apply(lambda x: ['En attente', 'En progrès', 'Complété'][random.randint(0, 2)])
orders['EXCEPTION']             = None
orders['NOTE_EXCEPTION']        = None
orders['STATUS_QUARANTAINE']    = None
orders['geometry']              = joined['geometry']

orders

In the previous section, we created mock values for the rows coming from the addresses dataset. The values are based on the data in the work orders template.

In the next section, we apply some business logic to mock the dates and times of the work orders while having an interesting distribution of data. This is done by randomly selecting a date and time from the provided ranges following the next rules:

1. A work order that is completed must have been opened between 90 and 240 days before today
2. A work order that is completed must have been completed between 14 and 45 days after its opening date
3. A work order that is 'In progress' must have been opened between 0 and 60 days before today
4. A work order that is 'In waiting' must have been opened between 0 and 60 days before today

In [None]:
import numpy as np
from datetime import datetime, timedelta

# Define date ranges for each status
date_ranges = {'En attente': 60, 'En progrès': 60, 'Complété': 240}
last_3_months_range = 90  # days

# Define current date and date format
now = datetime.now()
date_format = '%m/%d/%Y'

# Define a function to generate random dates within a given range
def random_order_date(status):
    range_days = date_ranges[status]
    return (now - timedelta(days=np.random.randint(0 if status != 'Complété' else 60, range_days))).strftime(date_format)

def random_closed_date(order_date):
    return (datetime.strptime(order_date, date_format) + timedelta(days=np.random.randint(14, 45))).strftime(date_format)

# Generate ORDER_DATE and ORDER_TIME columns
orders['ORDER_DATE'] = [random_order_date(status) for status in orders['STATUS']]
orders['ORDER_TIME'] = '12:00:00 AM'

# Generate CLOSED_DATE and CLOSED_TIME columns
orders['CLOSED_DATE'] = orders.apply(lambda x: random_closed_date(x['ORDER_DATE']) if x['STATUS'] == 'Complété' else None, axis=1)
orders['CLOSED_TIME'] = orders['STATUS'].apply(lambda x: '12:00:00 AM' if x == 'Complété' else None)

# Create a GeoDataFrame and drop the geometry column from the original DataFrame
orders_gdf = gpd.GeoDataFrame(orders, geometry=gpd.GeoSeries(orders['geometry']))
orders.drop(columns=['geometry'], inplace=True)

orders

## Step 7 - Data storage

Data storage involves storing the data in a database, data warehouse, or data lake. Some popular databases include MySQL, PostgreSQL, and MongoDB, while popular data warehouses include Amazon Redshift and Google BigQuery.

A new type of solution that is gaining massive popularity is the data cloud. Data clouds are data warehouses that are hosted on the cloud and are accessible via a REST API. Some popular data clouds include Snowflake and Databricks.

Without having to dive to deep in analytical database modeling, we will use the dataframe structure to store data in a database and in an exportable data format (CSV).

We will use two types of tables:

1. Raw - The raw data is stored as is in the database (from the dataframes)
2. Modeled - The data is transformed and stored in a database with proper data types and relationships

In [None]:
import psycopg2
from sqlalchemy import create_engine

# Create CSV files for each dataframe
orders.to_csv('/home/jovyan/work/orders.csv', index=False)
customers.to_csv('/home/jovyan/work/customers.csv', index=False)
districts.to_csv('/home/jovyan/work/districts.csv', index=False)
sectors.to_csv('/home/jovyan/work/sectors.csv', index=False)
print('Dataframes written to CSV files')

# Create a connection to the postgres database
engine = create_engine('postgresql://postgres:postgres@postgres:5432/postgres')

# Write the dataframes to the database
orders.to_sql('orders_raw', engine, if_exists='replace', index=False)
customers.to_sql('customers_raw', engine, if_exists='replace', index=False)
districts.to_sql('districts_raw', engine, if_exists='replace', index=False)
sectors.to_sql('sectors_raw', engine, if_exists='replace', index=False)
print('Dataframes written to database')

In [None]:
# Create a write connection
connection = psycopg2.connect(
    host="postgres",
    user="postgres",
    password="postgres",
    database="postgres"
)
cursor = connection.cursor()
print('Connection to database established')

In [None]:
# Create a simple SQL transformation to create the orders table
sql = '''
    DROP TABLE IF EXISTS sectors;
    CREATE TABLE sectors AS
    SELECT
        cast("SECTOR_ID" as integer) as sector_id,
        "SECTOR_NAME" as sector_name
    FROM sectors_raw;
'''

# Execute the SQL transformation and commit the changes to the database
cursor.execute(sql)
connection.commit()
print('SQL transformation executed')

In [None]:
# Create a simple SQL transformation to create the orders table
sql = '''
    DROP TABLE IF EXISTS districts;
    CREATE TABLE districts AS
    SELECT
        cast("DISTRICT_ID" as integer) as district_id,
        "DISTRICT_NAME" as district_name
    FROM districts_raw;
'''

# Execute the SQL transformation and commit the changes to the database
cursor.execute(sql)
connection.commit()
print('SQL transformation executed')

In [None]:
# Create a simple SQL transformation to create the orders table
sql = '''
    DROP TABLE IF EXISTS customers;
    CREATE TABLE customers AS
    SELECT
        cast("FIRSTNAME" as varchar) as firstname,
        cast("LASTNAME" as varchar) as lastname,
        cast("EMAIL" as varchar) as email,
        cast("PHONENUMBER" as varchar) as phone_number
    FROM customers_raw;
'''

# Execute the SQL transformation and commit the changes to the database
cursor.execute(sql)
connection.commit()
print('SQL transformation executed')

In [None]:
# Create a simple SQL transformation to create the orders table
sql = '''
    DROP VIEW IF EXISTS delivery_summary;
    DROP TABLE IF EXISTS orders;
    CREATE TABLE orders AS
    SELECT
        cast("ID" as bigint) as order_id,
        cast("STREET_NUMBER" as integer) as street_number,
        cast("STREET" as varchar) as street,
        cast("SECTOR" as varchar) as sector,
        cast("CITY" as varchar) as city,
        cast("DISTRICT" as varchar) as district,
        cast("NOTE" as varchar) as note,
        cast("CLIENT_TYPE" as varchar) as client_type,
        cast("BILLING_TYPE" as varchar) as billing_type,
        cast("REQUEST_NUMBER" as varchar) as request_number,
        cast("REQUEST" as varchar) as request,
        cast("BIN" as varchar) as bin,
        cast("PART" as varchar) as part,
        cast("STATUS" as varchar) as status,
        cast("EXCEPTION" as varchar) as exception,
        cast("NOTE_EXCEPTION" as varchar) as note_exception,
        cast("STATUS_QUARANTAINE" as varchar) as status_quarantine,
        cast("ORDER_DATE" as date) as order_date,
        cast("ORDER_TIME" as time) as order_time,
        cast("CLOSED_DATE" as date) as closed_date,
        cast("CLOSED_TIME" as time) as closed_time
    FROM orders_raw;
'''

# Execute the SQL transformation and commit the changes to the database
cursor.execute(sql)
connection.commit()
print('SQL transformation executed')

To mock the delivery template provided, we will create a database view that will allow us to query the data in the same way as the template.

Views are a powerful tool that allow us to create a virtual table that is based on the result of a query. This allows us to create a table that is based on the result of a query without having to store the data in a table.

When data is changed in the source tables, the view will automatically reflect the changes.

This is a great feature for analytical purposes like reporting, dashboards and summarizing data from other tables.

In [None]:
# Create a simple SQL transformation to create the orders table
sql = """
    DROP VIEW IF EXISTS delivery_summary;
    CREATE VIEW delivery_summary AS
    SELECT 
        district,
        street_number,
        street,
        part,
        SUM(CASE WHEN bin = 'BAC BLEU - 360' THEN 1 ELSE 0 END) AS "BAC BLEU - 360",
        SUM(CASE WHEN bin = 'BAC BRUN - 45' THEN 1 ELSE 0 END) AS "BAC BRUN - 45",
        SUM(CASE WHEN bin = 'BAC BRUN - 80' THEN 1 ELSE 0 END) AS "BAC BRUN - 80",
        SUM(CASE WHEN bin = 'BAC BRUN - 120' THEN 1 ELSE 0 END) AS "BAC BRUN - 120",
        SUM(CASE WHEN bin = 'BAC BRUN - 240' THEN 1 ELSE 0 END) AS "BAC BRUN - 240",
        SUM(CASE WHEN bin = 'BAC GRIS - 120' THEN 1 ELSE 0 END) AS "BAC GRIS - 120",
        SUM(CASE WHEN bin = 'BAC GRIS - 240' THEN 1 ELSE 0 END) AS "BAC GRIS - 240",
        SUM(CASE WHEN bin = 'BAC GRIS - 360' THEN 1 ELSE 0 END) AS "BAC GRIS - 360"
    FROM orders
    WHERE status = 'En attente'
    GROUP BY district, street_number, street, part;
"""

# Execute the SQL transformation and commit the changes to the database
cursor.execute(sql)
connection.commit()

connection.close()
print('SQL statement executed')

### Step 8 - Data processing & analysis

Data processing involves performing data analysis operations such as filtering, sorting, and calculating descriptive statistics. This is typically done using programming languages such as Python or R, or using specialized tools such as Apache Spark. Data visualization involves creating charts, graphs, or other visualizations to communicate the insights obtained from the data. Some popular data visualization tools include Tableau, PowerBI, and ggplot.


**Folium - Visualizing the distribution of data on a map**

In [None]:
import folium

# Create a folium map with the sectors using the mean of the coordinates as the center
map = folium.Map(location=[joined.geometry.centroid.y.mean(), joined.geometry.centroid.x.mean()], zoom_start=10, tiles='cartodbpositron')

# Add districts to the map including a tooltip
tooltip = folium.features.GeoJsonTooltip(fields=['DISTRICT_NAME'], aliases=['District: '])
folium.GeoJson(districts_gdf[['geometry', 'DISTRICT_NAME']].to_json(), tooltip=tooltip, name='geojson').add_to(map)

# Add districts to the map including a tooltip
addresses_tooltip = folium.features.GeoJsonTooltip(fields=['SECTOR_NAME', 'DISTRICT_NAME', 'FULL_ADDRESS'], aliases=['Sector: ', 'District: ', 'Address: '])
folium.GeoJson(joined[['geometry', 'SECTOR_NAME', 'DISTRICT_NAME', 'FULL_ADDRESS']].to_json(), tooltip=addresses_tooltip, name='geojson').add_to(map)


# Add a marker to the map at the coordinates of the Ville de Gatineau building
folium.Marker([45.4684952,-75.7634862], popup='Marker', icon=folium.Icon(color='red', icon='info-sign')).add_to(map)

map

**AgglomerativeClustering - Hierarchical clustering**

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import AgglomerativeClustering
from scipy.spatial.distance import pdist, squareform

n_clusters = 20

# Calculate the pairwise distances between the data points
orders_gdf['Latitude'] = orders_gdf['geometry'].y
orders_gdf['Longitude'] = orders_gdf['geometry'].x

distances = squareform(pdist(orders_gdf[['Latitude', 'Longitude']]))
 
# Perform hierarchical clustering
model = AgglomerativeClustering(n_clusters=n_clusters, metric='precomputed', linkage='complete')
labels = model.fit_predict(distances)

orders_gdf['AC_Cluster'] = labels

# Create a polygon for each cluster
cluster_polygons = []
for cluster in range(n_clusters):
    cluster_polygons.append(orders_gdf[orders_gdf['AC_Cluster'] == cluster].unary_union.convex_hull)

# Create a GeoDataFrame from the list of polygons and keep AC_Cluster property
ac_clusters_gdf = gpd.GeoDataFrame({'AC_Cluster': range(n_clusters)}, geometry=cluster_polygons)

# Scatter plot of the clusters
%matplotlib inline
plt.figure(figsize=(10, 10))
plt.scatter(orders_gdf['Longitude'], orders_gdf['Latitude'], c=orders_gdf['AC_Cluster'])
plt.show()

**DBSCAN - Density-based clustering**

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import DBSCAN

# Define the eps and min_samples parameters
eps = 0.005
min_samples = 5

# Create a latitude and longitude for joined
orders_gdf['Latitude'] = orders_gdf['geometry'].y
orders_gdf['Longitude'] = orders_gdf['geometry'].x

# Convert the geodataframe to a numpy array of coordinates
coords = orders_gdf[['Latitude', 'Longitude']].to_numpy()

# Apply DBSCAN clustering to the coordinates
dbscan = DBSCAN(eps=eps, min_samples=min_samples).fit(coords)

# Add the cluster labels to the geodataframe
orders_gdf['DB_Cluster'] = dbscan.labels_

# Plot all the points with a different color for each cluster where the cluster is not -1 
%matplotlib inline
plt.figure(figsize=(10, 10))
plt.scatter(orders_gdf['Longitude'], orders_gdf['Latitude'], c=orders_gdf['DB_Cluster'])
plt.show()

**KMeans - Centroid-based clustering**

In [None]:
from sklearn.cluster import KMeans
import geopandas as gpd
from shapely.geometry import Point, Polygon

# Number of clusters
k = 20

# Perform KMeans clustering on the location data
orders_gdf['Latitude'] = orders_gdf['geometry'].y
orders_gdf['Longitude'] = orders_gdf['geometry'].x
kmeans = KMeans(n_clusters=k, random_state=0, n_init='auto').fit(orders_gdf[['Latitude', 'Longitude']])

# Add the cluster labels to the dataframe
orders_gdf['K_Cluster'] = kmeans.labels_

# Create a GeoDataFrame from the dataframe with location data
gdf = gpd.GeoDataFrame(orders_gdf, geometry=gpd.points_from_xy(orders_gdf.Longitude, orders_gdf.Latitude))

# Group the GeoDataFrame by the cluster labels and create polygons around the clusters using convex hull method
polygons = gdf.groupby('K_Cluster')['geometry'].apply(lambda x: Polygon(x.unary_union.convex_hull.exterior.coords))

# Create a GeoDataFrame from the polygons
poly_gdf = gpd.GeoDataFrame(polygons, geometry='geometry')

# Plot the polygons with matplotlib
%matplotlib inline
poly_gdf.plot(figsize=(10, 10))
plt.show()

**Calculation - Average wait time per cluster**

In [None]:
# Calcualte the average time between ORDER_DATE and CLOSED_DATE for each cluster
completion_gdf = orders_gdf[['ORDER_DATE', 'CLOSED_DATE', 'AC_Cluster']]

# Filter out the rows where the CLOSED_DATE is null
completion_gdf = completion_gdf[completion_gdf['CLOSED_DATE'].notnull()]

completion_gdf['ORDER_DATE'] = pd.to_datetime(completion_gdf['ORDER_DATE'])
completion_gdf['CLOSED_DATE'] = pd.to_datetime(completion_gdf['CLOSED_DATE'])

completion_gdf['TIME_TO_COMPLETE'] = completion_gdf['CLOSED_DATE'] - completion_gdf['ORDER_DATE']
completion_gdf['TIME_TO_COMPLETE'] = completion_gdf['TIME_TO_COMPLETE'].dt.days

# Calculate the average time to complete for each cluster and create a gdf of cluster_label and avg_time_to_complete
avg_time_to_complete = round(completion_gdf.groupby('AC_Cluster')['TIME_TO_COMPLETE'].mean())
avg_time_to_complete_gdf = gpd.GeoDataFrame({'AC_Cluster': avg_time_to_complete.index, 'AVG_TIME_TO_COMPLETE': avg_time_to_complete.values})

# Merge the average time to complete gdf with the cluster polygons gdf
time_estimate_polygons = ac_clusters_gdf.merge(avg_time_to_complete_gdf, on='AC_Cluster')

# Plot the polygons on a map using GeoPandas
%matplotlib inline
ax = time_estimate_polygons.plot(column='AVG_TIME_TO_COMPLETE', cmap='OrRd', alpha=0.5, edgecolor='black')
gdf.plot(ax=ax, markersize=10, color='red')


**Visualization - Average wait time per cluster**

In [None]:
import folium

# Create a folium map with the sectors using the mean of the coordinates as the center
# HINT - the coordinates are in the GEOM column
map = folium.Map(location=[time_estimate_polygons.geometry.centroid.y.mean(), time_estimate_polygons.geometry.centroid.x.mean()], zoom_start=10, tiles='cartodbpositron')

# # Add the districts to the map using WKT
tooltip = folium.features.GeoJsonTooltip(fields=['AVG_TIME_TO_COMPLETE'], aliases=['Avg. completion (days): '])
folium.GeoJson(time_estimate_polygons[['geometry', 'AVG_TIME_TO_COMPLETE']].to_json(), tooltip=tooltip, name='geojson').add_to(map)

map

## Step 9 - Data visualization

Data visualization involves creating charts, graphs, or other visualizations to communicate the insights obtained from the data. Some popular data visualization tools include Tableau, PowerBI, and ggplot.

While generating visualizations is a critical yet relatively easy step in the data science process, it is not the focus of this workshop. However, we will generate a few performance indicators that bring value to business application users.

Here are some queries/models that could be stored in a database view for dynamic querying. 

1. Upcoming work orders - Number of work orders that are in progress or in waiting
2. Incomplete work orders (10 days slice) - Number of work orders that are in progress or in waiting grouped by slices of 10 days
3. Average wait-time by district - Average wait time for work orders grouped by district
4. Completed work orders by date - Number of work orders that are completed grouped by date

In [None]:
from sqlalchemy import create_engine, text
# Create a new database connection
engine = create_engine('postgresql://postgres:postgres@postgres:5432/postgres')
connection = engine.connect()

print('Database connection created')

In [None]:
# Execute the query and return the results as a pandas dataframe
upcoming = pd.read_sql_query(text('''
    SELECT * FROM orders
    WHERE STATUS = 'En attente'
'''), con=connection)

# Render the dataframe
upcoming

In [None]:
# Execute the query and return the results as a pandas dataframe
slices = pd.read_sql_query(text('''
    WITH slices as (
        WITH RECURSIVE num_seq(n, max_val) AS (
        SELECT 0, (
            SELECT
                MAX(DATE_PART('day', NOW() - order_date))
            FROM orders
            WHERE status = 'En attente'
        )
        UNION ALL
        SELECT n + 1, max_val FROM num_seq WHERE n < max_val
        )
        SELECT (n / 10) * 10 as slice_start, ((n / 10) + 1) * 10 - 1 as slice_end
        FROM num_seq
        GROUP BY 1, 2
    ),

    _orders as (
        SELECT
            order_id,
            DATE_PART('day', NOW() - order_date)::int as "days_to_complete"
        FROM orders
        WHERE status = 'En attente'
    )

    select
        COUNT(*),
        slice_start,
        slice_end
    FROM _orders
    LEFT JOIN slices on
        days_to_complete >= slice_start and days_to_complete <= slice_end
    GROUP BY 2, 3
    ORDER BY slice_start
'''), con=connection)

# Render the dataframe
slices

In [None]:
# Execute the query and return the results as a pandas dataframe
avg_time_to_complete = pd.read_sql_query(text('''
    SELECT 
        sector,
        district, 
        round(AVG(DATE_PART('day', closed_date::timestamp - order_date::timestamp)::int), 2) as avg_wait_time
    FROM orders
    WHERE status = 'Complété'
    GROUP BY 1, 2
    ORDER BY avg_wait_time desc
'''), con=connection)

# Render the dataframe
avg_time_to_complete

In [None]:
# Execute the query and return the results as a pandas dataframe
completed_work_orders_per_day = pd.read_sql_query(text('''
    SELECT
        DATE(order_date) as order_date,
        COUNT(*) as completed_work_orders
    FROM orders
    WHERE status = 'Complété'
    GROUP BY 1
    ORDER BY 1
'''), con=connection)

# Render the dataframe
completed_work_orders_per_day

## Next steps

Now that you have learned more about the data engineering process, you can apply your new knowledge to the hackathon. Here are some tips to help you get started:

1. Ensure you have real data to work with. If you are unable to find real data, you can source and collect data from public sources.
2. Source, collect or create a deep dataset. The more data you have, the better your features and interfaces will be and the easier it will be to develop features.
3. Ensure you understand the business problem the application is trying to solve. This will help you in identifying the features that will be most useful to the business.
4. Try to go beyond the expected features of your application. Integrating data features that are not expected will help you stand out from the competition. Keep in mind that managers and business users will appreciate having an understanding of their business at a glance.